]> git.basschouten.com Git - openhab-addons.git/blob
24c53a286bf60d2329ce159730a0a1ac6869ced8
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.lifx.internal.util;
14
15 import static org.openhab.binding.lifx.internal.util.LifxNetworkUtil.isRemoteAddress;
16 import static org.openhab.binding.lifx.internal.util.LifxSelectorUtil.CastType.*;
17
18 import java.io.IOException;
19 import java.net.InetSocketAddress;
20 import java.net.StandardProtocolFamily;
21 import java.net.StandardSocketOptions;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.DatagramChannel;
24 import java.nio.channels.SelectableChannel;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.nio.channels.SocketChannel;
28 import java.util.ConcurrentModificationException;
29 import java.util.Iterator;
30 import java.util.function.BiConsumer;
31
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.binding.lifx.internal.LifxSelectorContext;
35 import org.openhab.binding.lifx.internal.dto.Packet;
36 import org.openhab.binding.lifx.internal.dto.PacketFactory;
37 import org.openhab.binding.lifx.internal.dto.PacketHandler;
38 import org.openhab.binding.lifx.internal.fields.MACAddress;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * Utility class for sharing {@link Selector} logic between objects.
44  *
45  * @author Wouter Born - Initial contribution
46  */
47 @NonNullByDefault
48 public class LifxSelectorUtil {
49
50     private static final Logger LOGGER = LoggerFactory.getLogger(LifxSelectorUtil.class);
51     private static final int MAX_SEND_SELECT_RETRIES = 10;
52     private static final int SEND_SELECT_TIMEOUT = 200;
53
54     enum CastType {
55         BROADCAST,
56         UNICAST
57     }
58
59     @SuppressWarnings("resource")
60     public static @Nullable SelectionKey openBroadcastChannel(@Nullable Selector selector, String logId,
61             int broadcastPort) throws IOException {
62         if (selector == null) {
63             return null;
64         }
65         DatagramChannel broadcastChannel = DatagramChannel.open(StandardProtocolFamily.INET)
66                 .setOption(StandardSocketOptions.SO_REUSEADDR, true)
67                 .setOption(StandardSocketOptions.SO_BROADCAST, true);
68         broadcastChannel.configureBlocking(false);
69         LOGGER.debug("{} : Binding the broadcast channel on port {}", logId, broadcastPort);
70         broadcastChannel.bind(new InetSocketAddress(broadcastPort));
71         return broadcastChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
72     }
73
74     @SuppressWarnings("resource")
75     public static @Nullable SelectionKey openUnicastChannel(@Nullable Selector selector, String logId,
76             @Nullable InetSocketAddress address) throws IOException {
77         if (selector == null || address == null) {
78             return null;
79         }
80         DatagramChannel unicastChannel = DatagramChannel.open(StandardProtocolFamily.INET)
81                 .setOption(StandardSocketOptions.SO_REUSEADDR, true);
82         unicastChannel.configureBlocking(false);
83         unicastChannel.connect(address);
84         LOGGER.trace("{} : Connected to light via {}", logId, unicastChannel.getLocalAddress().toString());
85         return unicastChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
86     }
87
88     public static void closeSelector(@Nullable Selector selector, String logId) {
89         if (selector == null) {
90             return;
91         }
92
93         try {
94             selector.wakeup();
95
96             boolean done = false;
97             while (!done) {
98                 try {
99                     selector.keys().stream().forEach(key -> cancelKey(key, logId));
100                     done = true; // continue until all keys are cancelled
101                 } catch (ConcurrentModificationException e) {
102                     LOGGER.debug("{} while closing selection keys of the light ({}): {}", e.getClass().getSimpleName(),
103                             logId, e.getMessage());
104                 }
105             }
106
107             selector.close();
108         } catch (IOException e) {
109             LOGGER.warn("{} while closing the selector of the light ({}): {}", e.getClass().getSimpleName(), logId,
110                     e.getMessage());
111         }
112     }
113
114     public static void cancelKey(@Nullable SelectionKey key, String logId) {
115         if (key == null) {
116             return;
117         }
118
119         try {
120             key.channel().close();
121         } catch (IOException e) {
122             LOGGER.error("{} while closing a channel of the light ({}): {}", e.getClass().getSimpleName(), logId,
123                     e.getMessage());
124         }
125         key.cancel();
126     }
127
128     @SuppressWarnings("resource")
129     public static void receiveAndHandlePackets(Selector selector, String logId,
130             BiConsumer<Packet, InetSocketAddress> packetConsumer) {
131         try {
132             selector.selectNow();
133         } catch (IOException e) {
134             LOGGER.error("{} while selecting keys for the light ({}) : {}", e.getClass().getSimpleName(), logId,
135                     e.getMessage());
136         }
137
138         ByteBuffer readBuffer = ByteBuffer.allocate(LifxNetworkUtil.getBufferSize());
139         Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
140
141         while (keyIterator.hasNext()) {
142             SelectionKey key;
143
144             try {
145                 key = keyIterator.next();
146             } catch (ConcurrentModificationException e) {
147                 // when a StateServiceResponse packet is handled a new unicastChannel may be registered
148                 // in the selector which causes this exception, recover from it by restarting the iteration
149                 LOGGER.debug("{} : Restarting iteration after ConcurrentModificationException", logId);
150                 keyIterator = selector.selectedKeys().iterator();
151                 continue;
152             }
153
154             if (key.isValid() && key.isReadable()) {
155                 if (LOGGER.isTraceEnabled()) {
156                     LOGGER.trace("{} : Channel is ready for reading", logId);
157                 }
158
159                 SelectableChannel channel = key.channel();
160                 readBuffer.rewind();
161
162                 try {
163                     if (channel instanceof DatagramChannel) {
164                         InetSocketAddress address = (InetSocketAddress) ((DatagramChannel) channel).receive(readBuffer);
165                         if (address == null) {
166                             if (LOGGER.isTraceEnabled()) {
167                                 LOGGER.trace("{} : No datagram is available", logId);
168                             }
169                         } else if (isRemoteAddress(address.getAddress())) {
170                             supplyParsedPacketToConsumer(readBuffer, address, packetConsumer, logId);
171                         }
172                     } else if (channel instanceof SocketChannel) {
173                         ((SocketChannel) channel).read(readBuffer);
174                         InetSocketAddress address = (InetSocketAddress) ((SocketChannel) channel).getRemoteAddress();
175                         if (address == null) {
176                             if (LOGGER.isTraceEnabled()) {
177                                 LOGGER.trace("{} : Channel socket is not connected", logId);
178                             }
179                         } else if (isRemoteAddress(address.getAddress())) {
180                             supplyParsedPacketToConsumer(readBuffer, address, packetConsumer, logId);
181                         }
182
183                     }
184                 } catch (Exception e) {
185                     LOGGER.debug("{} while reading data for the light ({}) : {}", e.getClass().getSimpleName(), logId,
186                             e.getMessage());
187                 }
188             }
189         }
190     }
191
192     private static void supplyParsedPacketToConsumer(ByteBuffer readBuffer, InetSocketAddress address,
193             BiConsumer<Packet, InetSocketAddress> packetConsumer, String logId) {
194         int messageLength = readBuffer.position();
195         readBuffer.rewind();
196
197         ByteBuffer packetSize = readBuffer.slice();
198         packetSize.position(0);
199         packetSize.limit(2);
200         int size = Packet.FIELD_SIZE.value(packetSize);
201
202         if (messageLength == size) {
203             ByteBuffer packetType = readBuffer.slice();
204             packetType.position(32);
205             packetType.limit(34);
206             int type = Packet.FIELD_PACKET_TYPE.value(packetType);
207
208             PacketHandler<?> handler = PacketFactory.createHandler(type);
209
210             if (handler == null) {
211                 LOGGER.trace("{} : Unknown packet type: {} (source: {})", logId, String.format("0x%02X", type),
212                         address.toString());
213             } else {
214                 Packet packet = handler.handle(readBuffer);
215                 packetConsumer.accept(packet, address);
216             }
217         }
218     }
219
220     public static boolean broadcastPacket(@Nullable LifxSelectorContext context, Packet packet) {
221         if (context == null) {
222             return false;
223         }
224
225         packet.setSource(context.getSourceId());
226         packet.setSequence(context.getSequenceNumberSupplier().get());
227
228         boolean success = true;
229         for (InetSocketAddress address : LifxNetworkUtil.getBroadcastAddresses()) {
230             success = success && sendPacket(context, packet, address, BROADCAST);
231         }
232         return success;
233     }
234
235     public static String getLogId(@Nullable MACAddress macAddress, @Nullable InetSocketAddress host) {
236         return (macAddress != null ? macAddress.getHex() : (host != null ? host.getHostString() : "Unknown"));
237     }
238
239     public static boolean sendPacket(@Nullable LifxSelectorContext context, Packet packet) {
240         if (context == null) {
241             return false;
242         }
243
244         InetSocketAddress host = context.getHost();
245         if (host == null) {
246             return false;
247         }
248
249         packet.setSource(context.getSourceId());
250         packet.setTarget(context.getMACAddress());
251         packet.setSequence(context.getSequenceNumberSupplier().get());
252         return sendPacket(context, packet, host, UNICAST);
253     }
254
255     public static boolean resendPacket(@Nullable LifxSelectorContext context, Packet packet) {
256         if (context == null) {
257             return false;
258         }
259
260         InetSocketAddress host = context.getHost();
261         if (host == null) {
262             return false;
263         }
264
265         packet.setSource(context.getSourceId());
266         packet.setTarget(context.getMACAddress());
267         return sendPacket(context, packet, host, UNICAST);
268     }
269
270     @SuppressWarnings("resource")
271     private static boolean sendPacket(@Nullable LifxSelectorContext context, Packet packet, InetSocketAddress address,
272             CastType castType) {
273         if (context == null) {
274             return false;
275         }
276
277         try {
278             if (castType == UNICAST) {
279                 packet.setTagged(false);
280                 LifxThrottlingUtil.lock(packet.getTarget());
281             } else {
282                 packet.setTagged(true);
283                 LifxThrottlingUtil.lock();
284             }
285
286             for (int i = 0; i <= MAX_SEND_SELECT_RETRIES; i++) {
287                 context.getSelector().select(SEND_SELECT_TIMEOUT);
288
289                 for (Iterator<SelectionKey> it = context.getSelector().selectedKeys().iterator(); it.hasNext();) {
290                     SelectionKey key = it.next();
291                     SelectionKey castKey = castType == UNICAST ? context.getUnicastKey() : context.getBroadcastKey();
292
293                     if (key.isValid() && key.isWritable() && key.equals(castKey)) {
294                         SelectableChannel channel = key.channel();
295                         if (channel instanceof DatagramChannel) {
296                             if (LOGGER.isTraceEnabled()) {
297                                 LOGGER.trace(
298                                         "{} : Sending packet type '{}' from '{}' to '{}' for '{}' with sequence '{}' and source '{}'",
299                                         new Object[] { context.getLogId(), packet.getClass().getSimpleName(),
300                                                 ((InetSocketAddress) ((DatagramChannel) channel).getLocalAddress())
301                                                         .toString(),
302                                                 address.toString(), packet.getTarget().getHex(), packet.getSequence(),
303                                                 Long.toString(packet.getSource(), 16) });
304                             }
305                             ((DatagramChannel) channel).send(packet.bytes(), address);
306                             return true;
307                         } else if (channel instanceof SocketChannel) {
308                             ((SocketChannel) channel).write(packet.bytes());
309                             return true;
310                         }
311                     }
312                 }
313
314                 if (i == MAX_SEND_SELECT_RETRIES) {
315                     LOGGER.debug("Failed to send packet after {} select retries to the light ({})", i,
316                             context.getLogId());
317                 }
318             }
319         } catch (Exception e) {
320             LOGGER.debug("{} while sending a packet to the light ({}): {}", e.getClass().getSimpleName(),
321                     context.getLogId(), e.getMessage());
322         } finally {
323             if (castType == UNICAST) {
324                 LifxThrottlingUtil.unlock(packet.getTarget());
325             } else {
326                 LifxThrottlingUtil.unlock();
327             }
328         }
329         return false;
330     }
331 }