2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.lifx.internal.util;
15 import static org.openhab.binding.lifx.internal.util.LifxNetworkUtil.isRemoteAddress;
16 import static org.openhab.binding.lifx.internal.util.LifxSelectorUtil.CastType.*;
18 import java.io.IOException;
19 import java.net.InetSocketAddress;
20 import java.net.StandardProtocolFamily;
21 import java.net.StandardSocketOptions;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.DatagramChannel;
24 import java.nio.channels.SelectableChannel;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.nio.channels.SocketChannel;
28 import java.util.ConcurrentModificationException;
29 import java.util.Iterator;
30 import java.util.function.BiConsumer;
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.binding.lifx.internal.LifxSelectorContext;
35 import org.openhab.binding.lifx.internal.dto.Packet;
36 import org.openhab.binding.lifx.internal.dto.PacketFactory;
37 import org.openhab.binding.lifx.internal.dto.PacketHandler;
38 import org.openhab.binding.lifx.internal.fields.MACAddress;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
43 * Utility class for sharing {@link Selector} logic between objects.
45 * @author Wouter Born - Initial contribution
48 public class LifxSelectorUtil {
50 private static final Logger LOGGER = LoggerFactory.getLogger(LifxSelectorUtil.class);
51 private static final int MAX_SEND_SELECT_RETRIES = 10;
52 private static final int SEND_SELECT_TIMEOUT = 200;
59 @SuppressWarnings("resource")
60 public static @Nullable SelectionKey openBroadcastChannel(@Nullable Selector selector, String logId,
61 int broadcastPort) throws IOException {
62 if (selector == null) {
65 DatagramChannel broadcastChannel = DatagramChannel.open(StandardProtocolFamily.INET)
66 .setOption(StandardSocketOptions.SO_REUSEADDR, true)
67 .setOption(StandardSocketOptions.SO_BROADCAST, true);
68 broadcastChannel.configureBlocking(false);
69 LOGGER.debug("{} : Binding the broadcast channel on port {}", logId, broadcastPort);
70 broadcastChannel.bind(new InetSocketAddress(broadcastPort));
71 return broadcastChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
74 @SuppressWarnings("resource")
75 public static @Nullable SelectionKey openUnicastChannel(@Nullable Selector selector, String logId,
76 @Nullable InetSocketAddress address) throws IOException {
77 if (selector == null || address == null) {
80 DatagramChannel unicastChannel = DatagramChannel.open(StandardProtocolFamily.INET)
81 .setOption(StandardSocketOptions.SO_REUSEADDR, true);
82 unicastChannel.configureBlocking(false);
83 unicastChannel.connect(address);
84 LOGGER.trace("{} : Connected to light via {}", logId, unicastChannel.getLocalAddress().toString());
85 return unicastChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
88 public static void closeSelector(@Nullable Selector selector, String logId) {
89 if (selector == null) {
99 selector.keys().stream().forEach(key -> cancelKey(key, logId));
100 done = true; // continue until all keys are cancelled
101 } catch (ConcurrentModificationException e) {
102 LOGGER.debug("{} while closing selection keys of the light ({}): {}", e.getClass().getSimpleName(),
103 logId, e.getMessage());
108 } catch (IOException e) {
109 LOGGER.warn("{} while closing the selector of the light ({}): {}", e.getClass().getSimpleName(), logId,
114 public static void cancelKey(@Nullable SelectionKey key, String logId) {
120 key.channel().close();
121 } catch (IOException e) {
122 LOGGER.error("{} while closing a channel of the light ({}): {}", e.getClass().getSimpleName(), logId,
128 @SuppressWarnings("resource")
129 public static void receiveAndHandlePackets(Selector selector, String logId,
130 BiConsumer<Packet, InetSocketAddress> packetConsumer) {
132 selector.selectNow();
133 } catch (IOException e) {
134 LOGGER.error("{} while selecting keys for the light ({}) : {}", e.getClass().getSimpleName(), logId,
138 ByteBuffer readBuffer = ByteBuffer.allocate(LifxNetworkUtil.getBufferSize());
139 Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
141 while (keyIterator.hasNext()) {
145 key = keyIterator.next();
146 } catch (ConcurrentModificationException e) {
147 // when a StateServiceResponse packet is handled a new unicastChannel may be registered
148 // in the selector which causes this exception, recover from it by restarting the iteration
149 LOGGER.debug("{} : Restarting iteration after ConcurrentModificationException", logId);
150 keyIterator = selector.selectedKeys().iterator();
154 if (key.isValid() && key.isReadable()) {
155 if (LOGGER.isTraceEnabled()) {
156 LOGGER.trace("{} : Channel is ready for reading", logId);
159 SelectableChannel channel = key.channel();
163 if (channel instanceof DatagramChannel) {
164 InetSocketAddress address = (InetSocketAddress) ((DatagramChannel) channel).receive(readBuffer);
165 if (address == null) {
166 if (LOGGER.isTraceEnabled()) {
167 LOGGER.trace("{} : No datagram is available", logId);
169 } else if (isRemoteAddress(address.getAddress())) {
170 supplyParsedPacketToConsumer(readBuffer, address, packetConsumer, logId);
172 } else if (channel instanceof SocketChannel) {
173 ((SocketChannel) channel).read(readBuffer);
174 InetSocketAddress address = (InetSocketAddress) ((SocketChannel) channel).getRemoteAddress();
175 if (address == null) {
176 if (LOGGER.isTraceEnabled()) {
177 LOGGER.trace("{} : Channel socket is not connected", logId);
179 } else if (isRemoteAddress(address.getAddress())) {
180 supplyParsedPacketToConsumer(readBuffer, address, packetConsumer, logId);
184 } catch (Exception e) {
185 LOGGER.debug("{} while reading data for the light ({}) : {}", e.getClass().getSimpleName(), logId,
192 private static void supplyParsedPacketToConsumer(ByteBuffer readBuffer, InetSocketAddress address,
193 BiConsumer<Packet, InetSocketAddress> packetConsumer, String logId) {
194 int messageLength = readBuffer.position();
197 ByteBuffer packetSize = readBuffer.slice();
198 packetSize.position(0);
200 int size = Packet.FIELD_SIZE.value(packetSize);
202 if (messageLength == size) {
203 ByteBuffer packetType = readBuffer.slice();
204 packetType.position(32);
205 packetType.limit(34);
206 int type = Packet.FIELD_PACKET_TYPE.value(packetType);
208 PacketHandler<?> handler = PacketFactory.createHandler(type);
210 if (handler == null) {
211 LOGGER.trace("{} : Unknown packet type: {} (source: {})", logId, String.format("0x%02X", type),
214 Packet packet = handler.handle(readBuffer);
215 packetConsumer.accept(packet, address);
220 public static boolean broadcastPacket(@Nullable LifxSelectorContext context, Packet packet) {
221 if (context == null) {
225 packet.setSource(context.getSourceId());
226 packet.setSequence(context.getSequenceNumberSupplier().get());
228 boolean success = true;
229 for (InetSocketAddress address : LifxNetworkUtil.getBroadcastAddresses()) {
230 success = success && sendPacket(context, packet, address, BROADCAST);
235 public static String getLogId(@Nullable MACAddress macAddress, @Nullable InetSocketAddress host) {
236 return (macAddress != null ? macAddress.getHex() : (host != null ? host.getHostString() : "Unknown"));
239 public static boolean sendPacket(@Nullable LifxSelectorContext context, Packet packet) {
240 if (context == null) {
244 InetSocketAddress host = context.getHost();
249 packet.setSource(context.getSourceId());
250 packet.setTarget(context.getMACAddress());
251 packet.setSequence(context.getSequenceNumberSupplier().get());
252 return sendPacket(context, packet, host, UNICAST);
255 public static boolean resendPacket(@Nullable LifxSelectorContext context, Packet packet) {
256 if (context == null) {
260 InetSocketAddress host = context.getHost();
265 packet.setSource(context.getSourceId());
266 packet.setTarget(context.getMACAddress());
267 return sendPacket(context, packet, host, UNICAST);
270 @SuppressWarnings("resource")
271 private static boolean sendPacket(@Nullable LifxSelectorContext context, Packet packet, InetSocketAddress address,
273 if (context == null) {
278 if (castType == UNICAST) {
279 packet.setTagged(false);
280 LifxThrottlingUtil.lock(packet.getTarget());
282 packet.setTagged(true);
283 LifxThrottlingUtil.lock();
286 for (int i = 0; i <= MAX_SEND_SELECT_RETRIES; i++) {
287 context.getSelector().select(SEND_SELECT_TIMEOUT);
289 for (Iterator<SelectionKey> it = context.getSelector().selectedKeys().iterator(); it.hasNext();) {
290 SelectionKey key = it.next();
291 SelectionKey castKey = castType == UNICAST ? context.getUnicastKey() : context.getBroadcastKey();
293 if (key.isValid() && key.isWritable() && key.equals(castKey)) {
294 SelectableChannel channel = key.channel();
295 if (channel instanceof DatagramChannel) {
296 if (LOGGER.isTraceEnabled()) {
298 "{} : Sending packet type '{}' from '{}' to '{}' for '{}' with sequence '{}' and source '{}'",
299 new Object[] { context.getLogId(), packet.getClass().getSimpleName(),
300 ((InetSocketAddress) ((DatagramChannel) channel).getLocalAddress())
302 address.toString(), packet.getTarget().getHex(), packet.getSequence(),
303 Long.toString(packet.getSource(), 16) });
305 ((DatagramChannel) channel).send(packet.bytes(), address);
307 } else if (channel instanceof SocketChannel) {
308 ((SocketChannel) channel).write(packet.bytes());
314 if (i == MAX_SEND_SELECT_RETRIES) {
315 LOGGER.debug("Failed to send packet after {} select retries to the light ({})", i,
319 } catch (Exception e) {
320 LOGGER.debug("{} while sending a packet to the light ({}): {}", e.getClass().getSimpleName(),
321 context.getLogId(), e.getMessage());
323 if (castType == UNICAST) {
324 LifxThrottlingUtil.unlock(packet.getTarget());
326 LifxThrottlingUtil.unlock();