]> git.basschouten.com Git - openhab-addons.git/blob
4819089dbe81685453dd912593cb9c30eb2d20d6
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2022 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.atlona.internal.net;
14
15 import java.io.IOException;
16 import java.net.InetSocketAddress;
17 import java.nio.ByteBuffer;
18 import java.nio.channels.AsynchronousCloseException;
19 import java.nio.channels.SocketChannel;
20 import java.util.List;
21 import java.util.concurrent.ArrayBlockingQueue;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.concurrent.atomic.AtomicReference;
28
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * Represents a restartable socket connection to the underlying telnet session. Commands can be sent via
34  * {@link #sendCommand(String)} and responses will be received on any {@link SocketSessionListener}. This implementation
35  * of {@link SocketSession} communicates using a {@link SocketChannel} connection.
36  *
37  * @author Tim Roberts - Initial contribution
38  */
39 public class SocketChannelSession implements SocketSession {
40     private final Logger logger = LoggerFactory.getLogger(SocketChannelSession.class);
41
42     /**
43      * The uid of the calling thing
44      */
45     private final String uid;
46     /**
47      * The host/ip address to connect to
48      */
49     private final String host;
50
51     /**
52      * The port to connect to
53      */
54     private final int port;
55
56     /**
57      * The actual socket being used. Will be null if not connected
58      */
59     private final AtomicReference<SocketChannel> socketChannel = new AtomicReference<>();
60
61     /**
62      * The {@link ResponseReader} that will be used to read from {@link #_readBuffer}
63      */
64     private final ResponseReader responseReader = new ResponseReader();
65
66     /**
67      * The responses read from the {@link #responseReader}
68      */
69     private final BlockingQueue<Object> responses = new ArrayBlockingQueue<>(50);
70
71     /**
72      * The dispatcher of responses from {@link #responses}
73      */
74     private final Dispatcher dispatcher = new Dispatcher();
75
76     /**
77      * The {@link SocketSessionListener} that the {@link #dispatcher} will call
78      */
79     private List<SocketSessionListener> listeners = new CopyOnWriteArrayList<>();
80
81     /**
82      * Creates the socket session from the given host and port
83      *
84      * @param uid the thing uid of the calling thing
85      * @param host a non-null, non-empty host/ip address
86      * @param port the port number between 1 and 65535
87      */
88     public SocketChannelSession(String uid, String host, int port) {
89         if (host == null || host.trim().length() == 0) {
90             throw new IllegalArgumentException("Host cannot be null or empty");
91         }
92
93         if (port < 1 || port > 65535) {
94             throw new IllegalArgumentException("Port must be between 1 and 65535");
95         }
96         this.uid = uid;
97         this.host = host;
98         this.port = port;
99     }
100
101     @Override
102     public void addListener(SocketSessionListener listener) {
103         if (listener == null) {
104             throw new IllegalArgumentException("listener cannot be null");
105         }
106         listeners.add(listener);
107     }
108
109     @Override
110     public void clearListeners() {
111         listeners.clear();
112     }
113
114     @Override
115     public boolean removeListener(SocketSessionListener listener) {
116         return listeners.remove(listener);
117     }
118
119     @Override
120     public void connect() throws IOException {
121         disconnect();
122
123         final SocketChannel channel = SocketChannel.open();
124         channel.configureBlocking(true);
125
126         logger.debug("Connecting to {}:{}", host, port);
127         channel.connect(new InetSocketAddress(host, port));
128
129         logger.debug("Waiting for connect");
130         while (!channel.finishConnect()) {
131             try {
132                 Thread.sleep(250);
133             } catch (InterruptedException e) {
134             }
135         }
136
137         socketChannel.set(channel);
138         Thread dispatcherThread = new Thread(dispatcher, "OH-binding-" + uid + "-dispatcher");
139         dispatcherThread.setDaemon(true);
140         dispatcherThread.start();
141         Thread responseReaderThread = new Thread(responseReader, "OH-binding-" + uid + "-responseReader");
142         responseReaderThread.setDaemon(true);
143         responseReaderThread.start();
144     }
145
146     @Override
147     public void disconnect() throws IOException {
148         if (isConnected()) {
149             logger.debug("Disconnecting from {}:{}", host, port);
150
151             final SocketChannel channel = socketChannel.getAndSet(null);
152             channel.close();
153
154             dispatcher.stopRunning();
155             responseReader.stopRunning();
156
157             responses.clear();
158         }
159     }
160
161     @Override
162     public boolean isConnected() {
163         final SocketChannel channel = socketChannel.get();
164         return channel != null && channel.isConnected();
165     }
166
167     @Override
168     public synchronized void sendCommand(String command) throws IOException {
169         if (command == null) {
170             throw new IllegalArgumentException("command cannot be null");
171         }
172
173         if (!isConnected()) {
174             throw new IOException("Cannot send message - disconnected");
175         }
176
177         ByteBuffer toSend = ByteBuffer.wrap((command + "\r\n").getBytes());
178
179         final SocketChannel channel = socketChannel.get();
180         if (channel == null) {
181             logger.debug("Cannot send command '{}' - socket channel was closed", command);
182         } else {
183             logger.debug("Sending Command: '{}'", command);
184             channel.write(toSend);
185         }
186     }
187
188     /**
189      * This is the runnable that will read from the socket and add messages to the responses queue (to be processed by
190      * the dispatcher)
191      *
192      * @author Tim Roberts
193      *
194      */
195     private class ResponseReader implements Runnable {
196
197         /**
198          * Whether the reader is currently running
199          */
200         private final AtomicBoolean isRunning = new AtomicBoolean(false);
201
202         /**
203          * Locking to allow proper shutdown of the reader
204          */
205         private final CountDownLatch running = new CountDownLatch(1);
206
207         /**
208          * Stops the reader. Will wait 5 seconds for the runnable to stop
209          */
210         public void stopRunning() {
211             if (isRunning.getAndSet(false)) {
212                 try {
213                     if (!running.await(5, TimeUnit.SECONDS)) {
214                         logger.warn("Waited too long for response reader to finish");
215                     }
216                 } catch (InterruptedException e) {
217                     // Do nothing
218                 }
219             }
220         }
221
222         /**
223          * Runs the logic to read from the socket until {@link #isRunning} is false. A 'response' is anything that ends
224          * with a carriage-return/newline combo. Additionally, the special "Login: " and "Password: " prompts are
225          * treated as responses for purposes of logging in.
226          */
227         @Override
228         public void run() {
229             final StringBuilder sb = new StringBuilder(100);
230             final ByteBuffer readBuffer = ByteBuffer.allocate(1024);
231
232             isRunning.set(true);
233             responses.clear();
234
235             while (isRunning.get()) {
236                 try {
237                     // if reader is null, sleep and try again
238                     if (readBuffer == null) {
239                         Thread.sleep(250);
240                         continue;
241                     }
242
243                     final SocketChannel channel = socketChannel.get();
244                     if (channel == null) {
245                         // socket was closed
246                         isRunning.set(false);
247                         break;
248                     }
249
250                     int bytesRead = channel.read(readBuffer);
251                     if (bytesRead == -1) {
252                         responses.put(new IOException("server closed connection"));
253                         isRunning.set(false);
254                         break;
255                     } else if (bytesRead == 0) {
256                         readBuffer.clear();
257                         continue;
258                     }
259
260                     readBuffer.flip();
261                     while (readBuffer.hasRemaining()) {
262                         final char ch = (char) readBuffer.get();
263                         sb.append(ch);
264                         if (ch == '\n' || ch == ' ') {
265                             final String str = sb.toString();
266                             if (str.endsWith("\r\n") || str.endsWith("Login: ") || str.endsWith("Password: ")) {
267                                 sb.setLength(0);
268                                 final String response = str.substring(0, str.length() - 2);
269                                 responses.put(response);
270                             }
271                         }
272                     }
273
274                     readBuffer.flip();
275                 } catch (InterruptedException e) {
276                     // Do nothing - probably shutting down
277                 } catch (AsynchronousCloseException e) {
278                     // socket was definitely closed by another thread
279                 } catch (IOException e) {
280                     try {
281                         isRunning.set(false);
282                         responses.put(e);
283                     } catch (InterruptedException e1) {
284                         // Do nothing - probably shutting down
285                     }
286                 }
287             }
288
289             running.countDown();
290         }
291     }
292
293     /**
294      * The dispatcher runnable is responsible for reading the response queue and dispatching it to the current callable.
295      * Since the dispatcher is ONLY started when a callable is set, responses may pile up in the queue and be dispatched
296      * when a callable is set. Unlike the socket reader, this can be assigned to another thread (no state outside of the
297      * class).
298      *
299      * @author Tim Roberts
300      */
301     private class Dispatcher implements Runnable {
302
303         /**
304          * Whether the dispatcher is running or not
305          */
306         private final AtomicBoolean isRunning = new AtomicBoolean(false);
307
308         /**
309          * Locking to allow proper shutdown of the reader
310          */
311         private final CountDownLatch running = new CountDownLatch(1);
312
313         /**
314          * Whether the dispatcher is currently processing a message
315          */
316         private final AtomicReference<Thread> processingThread = new AtomicReference<>();
317
318         /**
319          * Stops the reader. Will wait 5 seconds for the runnable to stop (should stop within 1 second based on the poll
320          * timeout below)
321          */
322         @SuppressWarnings("PMD.CompareObjectsWithEquals")
323         public void stopRunning() {
324             if (isRunning.getAndSet(false)) {
325                 // only wait if stopRunning didn't get called as part of processing a message
326                 // (which would happen if we are processing an exception that forced a session close)
327                 final Thread processingThread = this.processingThread.get();
328                 if (processingThread != null && Thread.currentThread() != processingThread) {
329                     try {
330                         if (!running.await(5, TimeUnit.SECONDS)) {
331                             logger.warn("Waited too long for dispatcher to finish");
332                         }
333                     } catch (InterruptedException e) {
334                         // do nothing
335                     }
336                 }
337             }
338         }
339
340         /**
341          * Runs the logic to dispatch any responses to the current listeners until {@link #isRunning} is false.
342          */
343         @Override
344         public void run() {
345             processingThread.set(Thread.currentThread());
346
347             isRunning.set(true);
348             while (isRunning.get()) {
349                 try {
350                     // if no listeners, we don't want to start dispatching yet.
351                     if (listeners.isEmpty()) {
352                         Thread.sleep(250);
353                         continue;
354                     }
355
356                     final Object response = responses.poll(1, TimeUnit.SECONDS);
357
358                     if (response != null) {
359                         if (response instanceof String) {
360                             try {
361                                 logger.debug("Dispatching response: {}", response);
362                                 final SocketSessionListener[] listeners = SocketChannelSession.this.listeners
363                                         .toArray(new SocketSessionListener[0]);
364                                 for (SocketSessionListener listener : listeners) {
365                                     listener.responseReceived((String) response);
366                                 }
367                             } catch (Exception e) {
368                                 logger.warn("Exception occurred processing the response '{}': ", response, e);
369                             }
370                         } else if (response instanceof Exception) {
371                             logger.debug("Dispatching exception: {}", response);
372                             final SocketSessionListener[] listeners = SocketChannelSession.this.listeners
373                                     .toArray(new SocketSessionListener[0]);
374                             for (SocketSessionListener listener : listeners) {
375                                 listener.responseException((Exception) response);
376                             }
377                         } else {
378                             logger.warn("Unknown response class: {}", response);
379                         }
380                     }
381                 } catch (InterruptedException e) {
382                     // Do nothing
383                 } catch (Exception e) {
384                     logger.debug("Uncaught exception {}", e.getMessage(), e);
385                     break;
386                 }
387             }
388             isRunning.set(false);
389             processingThread.set(null);
390             running.countDown();
391         }
392     }
393 }