]> git.basschouten.com Git - openhab-addons.git/blob
ca0ebeca261b44757991360d6a9407d580be49e3
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.lifx.internal.util;
14
15 import static org.openhab.binding.lifx.internal.util.LifxNetworkUtil.isRemoteAddress;
16 import static org.openhab.binding.lifx.internal.util.LifxSelectorUtil.CastType.*;
17
18 import java.io.IOException;
19 import java.net.InetSocketAddress;
20 import java.net.StandardProtocolFamily;
21 import java.net.StandardSocketOptions;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.DatagramChannel;
24 import java.nio.channels.SelectableChannel;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.nio.channels.SocketChannel;
28 import java.util.ConcurrentModificationException;
29 import java.util.Iterator;
30 import java.util.function.BiConsumer;
31
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.binding.lifx.internal.LifxSelectorContext;
35 import org.openhab.binding.lifx.internal.fields.MACAddress;
36 import org.openhab.binding.lifx.internal.protocol.Packet;
37 import org.openhab.binding.lifx.internal.protocol.PacketFactory;
38 import org.openhab.binding.lifx.internal.protocol.PacketHandler;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * Utility class for sharing {@link Selector} logic between objects.
44  *
45  * @author Wouter Born - Make selector logic reusable between discovery and handlers
46  */
47 @NonNullByDefault
48 public class LifxSelectorUtil {
49
50     private static final Logger LOGGER = LoggerFactory.getLogger(LifxSelectorUtil.class);
51     private static final int MAX_SEND_SELECT_RETRIES = 10;
52     private static final int SEND_SELECT_TIMEOUT = 200;
53
54     enum CastType {
55         BROADCAST,
56         UNICAST
57     }
58
59     @SuppressWarnings("resource")
60     public static @Nullable SelectionKey openBroadcastChannel(@Nullable Selector selector, String logId,
61             int broadcastPort) throws IOException {
62         if (selector == null) {
63             return null;
64         }
65         DatagramChannel broadcastChannel = DatagramChannel.open(StandardProtocolFamily.INET)
66                 .setOption(StandardSocketOptions.SO_REUSEADDR, true)
67                 .setOption(StandardSocketOptions.SO_BROADCAST, true);
68         broadcastChannel.configureBlocking(false);
69         LOGGER.debug("{} : Binding the broadcast channel on port {}", logId, broadcastPort);
70         broadcastChannel.bind(new InetSocketAddress(broadcastPort));
71         return broadcastChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
72     }
73
74     @SuppressWarnings("resource")
75     public static @Nullable SelectionKey openUnicastChannel(@Nullable Selector selector, String logId,
76             @Nullable InetSocketAddress address) throws IOException {
77         if (selector == null || address == null) {
78             return null;
79         }
80         DatagramChannel unicastChannel = DatagramChannel.open(StandardProtocolFamily.INET)
81                 .setOption(StandardSocketOptions.SO_REUSEADDR, true);
82         unicastChannel.configureBlocking(false);
83         unicastChannel.connect(address);
84         LOGGER.trace("{} : Connected to light via {}", logId, unicastChannel.getLocalAddress().toString());
85         return unicastChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
86     }
87
88     public static void closeSelector(@Nullable Selector selector, String logId) {
89         if (selector == null) {
90             return;
91         }
92
93         try {
94             selector.wakeup();
95
96             boolean done = false;
97             while (!done) {
98                 try {
99                     selector.keys().stream().forEach(key -> cancelKey(key, logId));
100                     done = true; // continue until all keys are cancelled
101                 } catch (ConcurrentModificationException e) {
102                     LOGGER.debug("{} while closing selection keys of the light ({}): {}", e.getClass().getSimpleName(),
103                             logId, e.getMessage());
104                 }
105             }
106
107             selector.close();
108         } catch (IOException e) {
109             LOGGER.warn("{} while closing the selector of the light ({}): {}", e.getClass().getSimpleName(), logId,
110                     e.getMessage());
111         }
112     }
113
114     public static void cancelKey(@Nullable SelectionKey key, String logId) {
115         if (key == null) {
116             return;
117         }
118
119         try {
120             key.channel().close();
121         } catch (IOException e) {
122             LOGGER.error("{} while closing a channel of the light ({}): {}", e.getClass().getSimpleName(), logId,
123                     e.getMessage());
124         }
125         key.cancel();
126     }
127
128     @SuppressWarnings("resource")
129     public static void receiveAndHandlePackets(Selector selector, String logId,
130             BiConsumer<Packet, InetSocketAddress> packetConsumer) {
131         try {
132             selector.selectNow();
133         } catch (IOException e) {
134             LOGGER.error("{} while selecting keys for the light ({}) : {}", e.getClass().getSimpleName(), logId,
135                     e.getMessage());
136         }
137
138         Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
139
140         while (keyIterator.hasNext()) {
141             SelectionKey key;
142
143             try {
144                 key = keyIterator.next();
145             } catch (ConcurrentModificationException e) {
146                 // when a StateServiceResponse packet is handled a new unicastChannel may be registered
147                 // in the selector which causes this exception, recover from it by restarting the iteration
148                 LOGGER.debug("{} : Restarting iteration after ConcurrentModificationException", logId);
149                 keyIterator = selector.selectedKeys().iterator();
150                 continue;
151             }
152
153             if (key.isValid() && key.isReadable()) {
154                 LOGGER.trace("{} : Channel is ready for reading", logId);
155                 SelectableChannel channel = key.channel();
156                 ByteBuffer readBuffer = ByteBuffer.allocate(LifxNetworkUtil.getBufferSize());
157
158                 try {
159                     if (channel instanceof DatagramChannel) {
160                         InetSocketAddress address = (InetSocketAddress) ((DatagramChannel) channel).receive(readBuffer);
161                         if (isRemoteAddress(address.getAddress())) {
162                             supplyParsedPacketToConsumer(readBuffer, address, packetConsumer, logId);
163                         }
164                     } else if (channel instanceof SocketChannel) {
165                         InetSocketAddress address = (InetSocketAddress) ((SocketChannel) channel).getRemoteAddress();
166                         ((SocketChannel) channel).read(readBuffer);
167                         if (isRemoteAddress(address.getAddress())) {
168                             supplyParsedPacketToConsumer(readBuffer, address, packetConsumer, logId);
169                         }
170                     }
171                 } catch (Exception e) {
172                     LOGGER.debug("{} while reading data for the light ({}) : {}", e.getClass().getSimpleName(), logId,
173                             e.getMessage());
174                 }
175             }
176         }
177     }
178
179     private static void supplyParsedPacketToConsumer(ByteBuffer readBuffer, InetSocketAddress address,
180             BiConsumer<Packet, InetSocketAddress> packetConsumer, String logId) {
181         int messageLength = readBuffer.position();
182         readBuffer.rewind();
183
184         ByteBuffer packetSize = readBuffer.slice();
185         packetSize.position(0);
186         packetSize.limit(2);
187         int size = Packet.FIELD_SIZE.value(packetSize);
188
189         if (messageLength == size) {
190             ByteBuffer packetType = readBuffer.slice();
191             packetType.position(32);
192             packetType.limit(34);
193             int type = Packet.FIELD_PACKET_TYPE.value(packetType);
194
195             PacketHandler<?> handler = PacketFactory.createHandler(type);
196
197             if (handler == null) {
198                 LOGGER.trace("{} : Unknown packet type: {} (source: {})", logId, String.format("0x%02X", type),
199                         address.toString());
200             } else {
201                 Packet packet = handler.handle(readBuffer);
202                 packetConsumer.accept(packet, address);
203             }
204         }
205     }
206
207     public static boolean broadcastPacket(@Nullable LifxSelectorContext context, Packet packet) {
208         if (context == null) {
209             return false;
210         }
211
212         packet.setSource(context.getSourceId());
213         packet.setSequence(context.getSequenceNumberSupplier().get());
214
215         boolean success = true;
216         for (InetSocketAddress address : LifxNetworkUtil.getBroadcastAddresses()) {
217             success = success && sendPacket(context, packet, address, BROADCAST);
218         }
219         return success;
220     }
221
222     public static String getLogId(@Nullable MACAddress macAddress, @Nullable InetSocketAddress host) {
223         return (macAddress != null ? macAddress.getHex() : (host != null ? host.getHostString() : "Unknown"));
224     }
225
226     public static boolean sendPacket(@Nullable LifxSelectorContext context, Packet packet) {
227         if (context == null) {
228             return false;
229         }
230
231         InetSocketAddress host = context.getHost();
232         if (host == null) {
233             return false;
234         }
235
236         packet.setSource(context.getSourceId());
237         packet.setTarget(context.getMACAddress());
238         packet.setSequence(context.getSequenceNumberSupplier().get());
239         return sendPacket(context, packet, host, UNICAST);
240     }
241
242     public static boolean resendPacket(@Nullable LifxSelectorContext context, Packet packet) {
243         if (context == null) {
244             return false;
245         }
246
247         InetSocketAddress host = context.getHost();
248         if (host == null) {
249             return false;
250         }
251
252         packet.setSource(context.getSourceId());
253         packet.setTarget(context.getMACAddress());
254         return sendPacket(context, packet, host, UNICAST);
255     }
256
257     @SuppressWarnings("resource")
258     private static boolean sendPacket(@Nullable LifxSelectorContext context, Packet packet, InetSocketAddress address,
259             CastType castType) {
260         if (context == null) {
261             return false;
262         }
263
264         try {
265             if (castType == UNICAST) {
266                 LifxThrottlingUtil.lock(packet.getTarget());
267             } else {
268                 LifxThrottlingUtil.lock();
269             }
270
271             for (int i = 0; i <= MAX_SEND_SELECT_RETRIES; i++) {
272                 context.getSelector().select(SEND_SELECT_TIMEOUT);
273
274                 for (Iterator<SelectionKey> it = context.getSelector().selectedKeys().iterator(); it.hasNext();) {
275                     SelectionKey key = it.next();
276                     SelectionKey castKey = castType == UNICAST ? context.getUnicastKey() : context.getBroadcastKey();
277
278                     if (key.isValid() && key.isWritable() && key.equals(castKey)) {
279                         SelectableChannel channel = key.channel();
280                         if (channel instanceof DatagramChannel) {
281                             if (LOGGER.isTraceEnabled()) {
282                                 LOGGER.trace(
283                                         "{} : Sending packet type '{}' from '{}' to '{}' for '{}' with sequence '{}' and source '{}'",
284                                         new Object[] { context.getLogId(), packet.getClass().getSimpleName(),
285                                                 ((InetSocketAddress) ((DatagramChannel) channel).getLocalAddress())
286                                                         .toString(),
287                                                 address.toString(), packet.getTarget().getHex(), packet.getSequence(),
288                                                 Long.toString(packet.getSource(), 16) });
289                             }
290                             ((DatagramChannel) channel).send(packet.bytes(), address);
291                             return true;
292                         } else if (channel instanceof SocketChannel) {
293                             ((SocketChannel) channel).write(packet.bytes());
294                             return true;
295                         }
296                     }
297                 }
298
299                 if (i == MAX_SEND_SELECT_RETRIES) {
300                     LOGGER.debug("Failed to send packet after {} select retries to the light ({})", i,
301                             context.getLogId());
302                 }
303             }
304         } catch (Exception e) {
305             LOGGER.debug("{} while sending a packet to the light ({}): {}", e.getClass().getSimpleName(),
306                     context.getLogId(), e.getMessage());
307         } finally {
308             if (castType == UNICAST) {
309                 LifxThrottlingUtil.unlock(packet.getTarget());
310             } else {
311                 LifxThrottlingUtil.unlock();
312             }
313         }
314         return false;
315     }
316 }