]> git.basschouten.com Git - openhab-addons.git/blob
594c51cf03ed95d0e5287d555517549f543d6aa4
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2024 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.atlona.internal.net;
14
15 import java.io.IOException;
16 import java.net.InetSocketAddress;
17 import java.nio.ByteBuffer;
18 import java.nio.channels.AsynchronousCloseException;
19 import java.nio.channels.SocketChannel;
20 import java.util.List;
21 import java.util.concurrent.ArrayBlockingQueue;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.concurrent.atomic.AtomicReference;
28
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * Represents a restartable socket connection to the underlying telnet session. Commands can be sent via
34  * {@link #sendCommand(String)} and responses will be received on any {@link SocketSessionListener}. This implementation
35  * of {@link SocketSession} communicates using a {@link SocketChannel} connection.
36  *
37  * @author Tim Roberts - Initial contribution
38  */
39 public class SocketChannelSession implements SocketSession {
40     private final Logger logger = LoggerFactory.getLogger(SocketChannelSession.class);
41
42     /**
43      * The uid of the calling thing
44      */
45     private final String uid;
46     /**
47      * The host/ip address to connect to
48      */
49     private final String host;
50
51     /**
52      * The port to connect to
53      */
54     private final int port;
55
56     /**
57      * The actual socket being used. Will be null if not connected
58      */
59     private final AtomicReference<SocketChannel> socketChannel = new AtomicReference<>();
60
61     /**
62      * The {@link ResponseReader} that will be used to read from {@link #_readBuffer}
63      */
64     private final ResponseReader responseReader = new ResponseReader();
65
66     /**
67      * The responses read from the {@link #responseReader}
68      */
69     private final BlockingQueue<Object> responses = new ArrayBlockingQueue<>(50);
70
71     /**
72      * The dispatcher of responses from {@link #responses}
73      */
74     private final Dispatcher dispatcher = new Dispatcher();
75
76     /**
77      * The {@link SocketSessionListener} that the {@link #dispatcher} will call
78      */
79     private List<SocketSessionListener> listeners = new CopyOnWriteArrayList<>();
80
81     /**
82      * Creates the socket session from the given host and port
83      *
84      * @param uid the thing uid of the calling thing
85      * @param host a non-null, non-empty host/ip address
86      * @param port the port number between 1 and 65535
87      */
88     public SocketChannelSession(String uid, String host, int port) {
89         if (host == null || host.trim().length() == 0) {
90             throw new IllegalArgumentException("Host cannot be null or empty");
91         }
92
93         if (port < 1 || port > 65535) {
94             throw new IllegalArgumentException("Port must be between 1 and 65535");
95         }
96         this.uid = uid;
97         this.host = host;
98         this.port = port;
99     }
100
101     @Override
102     public void addListener(SocketSessionListener listener) {
103         listeners.add(listener);
104     }
105
106     @Override
107     public void clearListeners() {
108         listeners.clear();
109     }
110
111     @Override
112     public boolean removeListener(SocketSessionListener listener) {
113         return listeners.remove(listener);
114     }
115
116     @Override
117     public void connect() throws IOException {
118         disconnect();
119
120         final SocketChannel channel = SocketChannel.open();
121         channel.configureBlocking(true);
122
123         logger.debug("Connecting to {}:{}", host, port);
124         channel.connect(new InetSocketAddress(host, port));
125
126         logger.debug("Waiting for connect");
127         while (!channel.finishConnect()) {
128             try {
129                 Thread.sleep(250);
130             } catch (InterruptedException e) {
131             }
132         }
133
134         socketChannel.set(channel);
135         Thread dispatcherThread = new Thread(dispatcher, "OH-binding-" + uid + "-dispatcher");
136         dispatcherThread.setDaemon(true);
137         dispatcherThread.start();
138         Thread responseReaderThread = new Thread(responseReader, "OH-binding-" + uid + "-responseReader");
139         responseReaderThread.setDaemon(true);
140         responseReaderThread.start();
141     }
142
143     @Override
144     public void disconnect() throws IOException {
145         if (isConnected()) {
146             logger.debug("Disconnecting from {}:{}", host, port);
147
148             final SocketChannel channel = socketChannel.getAndSet(null);
149             channel.close();
150
151             dispatcher.stopRunning();
152             responseReader.stopRunning();
153
154             responses.clear();
155         }
156     }
157
158     @Override
159     public boolean isConnected() {
160         final SocketChannel channel = socketChannel.get();
161         return channel != null && channel.isConnected();
162     }
163
164     @Override
165     public synchronized void sendCommand(String command) throws IOException {
166         if (!isConnected()) {
167             throw new IOException("Cannot send message - disconnected");
168         }
169
170         ByteBuffer toSend = ByteBuffer.wrap((command + "\r\n").getBytes());
171
172         final SocketChannel channel = socketChannel.get();
173         if (channel == null) {
174             logger.debug("Cannot send command '{}' - socket channel was closed", command);
175         } else {
176             logger.debug("Sending Command: '{}'", command);
177             channel.write(toSend);
178         }
179     }
180
181     /**
182      * This is the runnable that will read from the socket and add messages to the responses queue (to be processed by
183      * the dispatcher)
184      *
185      * @author Tim Roberts
186      *
187      */
188     private class ResponseReader implements Runnable {
189
190         /**
191          * Whether the reader is currently running
192          */
193         private final AtomicBoolean isRunning = new AtomicBoolean(false);
194
195         /**
196          * Locking to allow proper shutdown of the reader
197          */
198         private final CountDownLatch running = new CountDownLatch(1);
199
200         /**
201          * Stops the reader. Will wait 5 seconds for the runnable to stop
202          */
203         public void stopRunning() {
204             if (isRunning.getAndSet(false)) {
205                 try {
206                     if (!running.await(5, TimeUnit.SECONDS)) {
207                         logger.warn("Waited too long for response reader to finish");
208                     }
209                 } catch (InterruptedException e) {
210                     // Do nothing
211                 }
212             }
213         }
214
215         /**
216          * Runs the logic to read from the socket until {@link #isRunning} is false. A 'response' is anything that ends
217          * with a carriage-return/newline combo. Additionally, the special "Login: " and "Password: " prompts are
218          * treated as responses for purposes of logging in.
219          */
220         @Override
221         public void run() {
222             final StringBuilder sb = new StringBuilder(100);
223             final ByteBuffer readBuffer = ByteBuffer.allocate(1024);
224
225             isRunning.set(true);
226             responses.clear();
227
228             while (isRunning.get()) {
229                 try {
230                     // if reader is null, sleep and try again
231                     if (readBuffer == null) {
232                         Thread.sleep(250);
233                         continue;
234                     }
235
236                     final SocketChannel channel = socketChannel.get();
237                     if (channel == null) {
238                         // socket was closed
239                         isRunning.set(false);
240                         break;
241                     }
242
243                     int bytesRead = channel.read(readBuffer);
244                     if (bytesRead == -1) {
245                         responses.put(new IOException("server closed connection"));
246                         isRunning.set(false);
247                         break;
248                     } else if (bytesRead == 0) {
249                         readBuffer.clear();
250                         continue;
251                     }
252
253                     readBuffer.flip();
254                     while (readBuffer.hasRemaining()) {
255                         final char ch = (char) readBuffer.get();
256                         sb.append(ch);
257                         if (ch == '\n' || ch == ' ') {
258                             final String str = sb.toString();
259                             if (str.endsWith("\r\n") || str.endsWith("Login: ") || str.endsWith("Password: ")) {
260                                 sb.setLength(0);
261                                 final String response = str.substring(0, str.length() - 2);
262                                 responses.put(response);
263                             }
264                         }
265                     }
266
267                     readBuffer.flip();
268                 } catch (InterruptedException e) {
269                     // Do nothing - probably shutting down
270                 } catch (AsynchronousCloseException e) {
271                     // socket was definitely closed by another thread
272                 } catch (IOException e) {
273                     try {
274                         isRunning.set(false);
275                         responses.put(e);
276                     } catch (InterruptedException e1) {
277                         // Do nothing - probably shutting down
278                     }
279                 }
280             }
281
282             running.countDown();
283         }
284     }
285
286     /**
287      * The dispatcher runnable is responsible for reading the response queue and dispatching it to the current callable.
288      * Since the dispatcher is ONLY started when a callable is set, responses may pile up in the queue and be dispatched
289      * when a callable is set. Unlike the socket reader, this can be assigned to another thread (no state outside of the
290      * class).
291      *
292      * @author Tim Roberts
293      */
294     private class Dispatcher implements Runnable {
295
296         /**
297          * Whether the dispatcher is running or not
298          */
299         private final AtomicBoolean isRunning = new AtomicBoolean(false);
300
301         /**
302          * Locking to allow proper shutdown of the reader
303          */
304         private final CountDownLatch running = new CountDownLatch(1);
305
306         /**
307          * Whether the dispatcher is currently processing a message
308          */
309         private final AtomicReference<Thread> processingThread = new AtomicReference<>();
310
311         /**
312          * Stops the reader. Will wait 5 seconds for the runnable to stop (should stop within 1 second based on the poll
313          * timeout below)
314          */
315         @SuppressWarnings("PMD.CompareObjectsWithEquals")
316         public void stopRunning() {
317             if (isRunning.getAndSet(false)) {
318                 // only wait if stopRunning didn't get called as part of processing a message
319                 // (which would happen if we are processing an exception that forced a session close)
320                 final Thread processingThread = this.processingThread.get();
321                 if (processingThread != null && Thread.currentThread() != processingThread) {
322                     try {
323                         if (!running.await(5, TimeUnit.SECONDS)) {
324                             logger.warn("Waited too long for dispatcher to finish");
325                         }
326                     } catch (InterruptedException e) {
327                         // do nothing
328                     }
329                 }
330             }
331         }
332
333         /**
334          * Runs the logic to dispatch any responses to the current listeners until {@link #isRunning} is false.
335          */
336         @Override
337         public void run() {
338             processingThread.set(Thread.currentThread());
339
340             isRunning.set(true);
341             while (isRunning.get()) {
342                 try {
343                     // if no listeners, we don't want to start dispatching yet.
344                     if (listeners.isEmpty()) {
345                         Thread.sleep(250);
346                         continue;
347                     }
348
349                     final Object response = responses.poll(1, TimeUnit.SECONDS);
350
351                     if (response != null) {
352                         if (response instanceof String stringResponse) {
353                             try {
354                                 logger.debug("Dispatching response: {}", response);
355                                 final SocketSessionListener[] listeners = SocketChannelSession.this.listeners
356                                         .toArray(new SocketSessionListener[0]);
357                                 for (SocketSessionListener listener : listeners) {
358                                     listener.responseReceived(stringResponse);
359                                 }
360                             } catch (Exception e) {
361                                 logger.warn("Exception occurred processing the response '{}': ", response, e);
362                             }
363                         } else if (response instanceof Exception exceptionResponse) {
364                             logger.debug("Dispatching exception: {}", response);
365                             final SocketSessionListener[] listeners = SocketChannelSession.this.listeners
366                                     .toArray(new SocketSessionListener[0]);
367                             for (SocketSessionListener listener : listeners) {
368                                 listener.responseException(exceptionResponse);
369                             }
370                         } else {
371                             logger.warn("Unknown response class: {}", response);
372                         }
373                     }
374                 } catch (InterruptedException e) {
375                     // Do nothing
376                 } catch (Exception e) {
377                     logger.debug("Uncaught exception {}", e.getMessage(), e);
378                     break;
379                 }
380             }
381             isRunning.set(false);
382             processingThread.set(null);
383             running.countDown();
384         }
385     }
386 }