]> git.basschouten.com Git - openhab-addons.git/blob
bf4570248bf9c30b7add98369508deca102f60af
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.atlona.internal.net;
14
15 import java.io.IOException;
16 import java.net.InetSocketAddress;
17 import java.nio.ByteBuffer;
18 import java.nio.channels.AsynchronousCloseException;
19 import java.nio.channels.SocketChannel;
20 import java.util.List;
21 import java.util.concurrent.ArrayBlockingQueue;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.concurrent.atomic.AtomicReference;
28
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * Represents a restartable socket connection to the underlying telnet session. Commands can be sent via
34  * {@link #sendCommand(String)} and responses will be received on any {@link SocketSessionListener}. This implementation
35  * of {@link SocketSession} communicates using a {@link SocketChannel} connection.
36  *
37  * @author Tim Roberts - Initial contribution
38  */
39 public class SocketChannelSession implements SocketSession {
40     private final Logger logger = LoggerFactory.getLogger(SocketChannelSession.class);
41
42     /**
43      * The host/ip address to connect to
44      */
45     private final String host;
46
47     /**
48      * The port to connect to
49      */
50     private final int port;
51
52     /**
53      * The actual socket being used. Will be null if not connected
54      */
55     private final AtomicReference<SocketChannel> socketChannel = new AtomicReference<>();
56
57     /**
58      * The {@link ResponseReader} that will be used to read from {@link #_readBuffer}
59      */
60     private final ResponseReader responseReader = new ResponseReader();
61
62     /**
63      * The responses read from the {@link #responseReader}
64      */
65     private final BlockingQueue<Object> responses = new ArrayBlockingQueue<>(50);
66
67     /**
68      * The dispatcher of responses from {@link #responses}
69      */
70     private final Dispatcher dispatcher = new Dispatcher();
71
72     /**
73      * The {@link SocketSessionListener} that the {@link #dispatcher} will call
74      */
75     private List<SocketSessionListener> listeners = new CopyOnWriteArrayList<>();
76
77     /**
78      * Creates the socket session from the given host and port
79      *
80      * @param host a non-null, non-empty host/ip address
81      * @param port the port number between 1 and 65535
82      */
83     public SocketChannelSession(String host, int port) {
84         if (host == null || host.trim().length() == 0) {
85             throw new IllegalArgumentException("Host cannot be null or empty");
86         }
87
88         if (port < 1 || port > 65535) {
89             throw new IllegalArgumentException("Port must be between 1 and 65535");
90         }
91         this.host = host;
92         this.port = port;
93     }
94
95     @Override
96     public void addListener(SocketSessionListener listener) {
97         if (listener == null) {
98             throw new IllegalArgumentException("listener cannot be null");
99         }
100         listeners.add(listener);
101     }
102
103     @Override
104     public void clearListeners() {
105         listeners.clear();
106     }
107
108     @Override
109     public boolean removeListener(SocketSessionListener listener) {
110         return listeners.remove(listener);
111     }
112
113     @Override
114     public void connect() throws IOException {
115         disconnect();
116
117         final SocketChannel channel = SocketChannel.open();
118         channel.configureBlocking(true);
119
120         logger.debug("Connecting to {}:{}", host, port);
121         channel.connect(new InetSocketAddress(host, port));
122
123         logger.debug("Waiting for connect");
124         while (!channel.finishConnect()) {
125             try {
126                 Thread.sleep(250);
127             } catch (InterruptedException e) {
128             }
129         }
130
131         socketChannel.set(channel);
132         new Thread(dispatcher).start();
133         new Thread(responseReader).start();
134     }
135
136     @Override
137     public void disconnect() throws IOException {
138         if (isConnected()) {
139             logger.debug("Disconnecting from {}:{}", host, port);
140
141             final SocketChannel channel = socketChannel.getAndSet(null);
142             channel.close();
143
144             dispatcher.stopRunning();
145             responseReader.stopRunning();
146
147             responses.clear();
148         }
149     }
150
151     @Override
152     public boolean isConnected() {
153         final SocketChannel channel = socketChannel.get();
154         return channel != null && channel.isConnected();
155     }
156
157     @Override
158     public synchronized void sendCommand(String command) throws IOException {
159         if (command == null) {
160             throw new IllegalArgumentException("command cannot be null");
161         }
162
163         if (!isConnected()) {
164             throw new IOException("Cannot send message - disconnected");
165         }
166
167         ByteBuffer toSend = ByteBuffer.wrap((command + "\r\n").getBytes());
168
169         final SocketChannel channel = socketChannel.get();
170         if (channel == null) {
171             logger.debug("Cannot send command '{}' - socket channel was closed", command);
172         } else {
173             logger.debug("Sending Command: '{}'", command);
174             channel.write(toSend);
175         }
176     }
177
178     /**
179      * This is the runnable that will read from the socket and add messages to the responses queue (to be processed by
180      * the dispatcher)
181      *
182      * @author Tim Roberts
183      *
184      */
185     private class ResponseReader implements Runnable {
186
187         /**
188          * Whether the reader is currently running
189          */
190         private final AtomicBoolean isRunning = new AtomicBoolean(false);
191
192         /**
193          * Locking to allow proper shutdown of the reader
194          */
195         private final CountDownLatch running = new CountDownLatch(1);
196
197         /**
198          * Stops the reader. Will wait 5 seconds for the runnable to stop
199          */
200         public void stopRunning() {
201             if (isRunning.getAndSet(false)) {
202                 try {
203                     if (!running.await(5, TimeUnit.SECONDS)) {
204                         logger.warn("Waited too long for response reader to finish");
205                     }
206                 } catch (InterruptedException e) {
207                     // Do nothing
208                 }
209             }
210         }
211
212         /**
213          * Runs the logic to read from the socket until {@link #isRunning} is false. A 'response' is anything that ends
214          * with a carriage-return/newline combo. Additionally, the special "Login: " and "Password: " prompts are
215          * treated as responses for purposes of logging in.
216          */
217         @Override
218         public void run() {
219             final StringBuilder sb = new StringBuilder(100);
220             final ByteBuffer readBuffer = ByteBuffer.allocate(1024);
221
222             isRunning.set(true);
223             responses.clear();
224
225             while (isRunning.get()) {
226                 try {
227                     // if reader is null, sleep and try again
228                     if (readBuffer == null) {
229                         Thread.sleep(250);
230                         continue;
231                     }
232
233                     final SocketChannel channel = socketChannel.get();
234                     if (channel == null) {
235                         // socket was closed
236                         isRunning.set(false);
237                         break;
238                     }
239
240                     int bytesRead = channel.read(readBuffer);
241                     if (bytesRead == -1) {
242                         responses.put(new IOException("server closed connection"));
243                         isRunning.set(false);
244                         break;
245                     } else if (bytesRead == 0) {
246                         readBuffer.clear();
247                         continue;
248                     }
249
250                     readBuffer.flip();
251                     while (readBuffer.hasRemaining()) {
252                         final char ch = (char) readBuffer.get();
253                         sb.append(ch);
254                         if (ch == '\n' || ch == ' ') {
255                             final String str = sb.toString();
256                             if (str.endsWith("\r\n") || str.endsWith("Login: ") || str.endsWith("Password: ")) {
257                                 sb.setLength(0);
258                                 final String response = str.substring(0, str.length() - 2);
259                                 responses.put(response);
260                             }
261                         }
262                     }
263
264                     readBuffer.flip();
265                 } catch (InterruptedException e) {
266                     // Do nothing - probably shutting down
267                 } catch (AsynchronousCloseException e) {
268                     // socket was definitely closed by another thread
269                 } catch (IOException e) {
270                     try {
271                         isRunning.set(false);
272                         responses.put(e);
273                     } catch (InterruptedException e1) {
274                         // Do nothing - probably shutting down
275                     }
276                 }
277             }
278
279             running.countDown();
280         }
281     }
282
283     /**
284      * The dispatcher runnable is responsible for reading the response queue and dispatching it to the current callable.
285      * Since the dispatcher is ONLY started when a callable is set, responses may pile up in the queue and be dispatched
286      * when a callable is set. Unlike the socket reader, this can be assigned to another thread (no state outside of the
287      * class).
288      *
289      * @author Tim Roberts
290      */
291     private class Dispatcher implements Runnable {
292
293         /**
294          * Whether the dispatcher is running or not
295          */
296         private final AtomicBoolean isRunning = new AtomicBoolean(false);
297
298         /**
299          * Locking to allow proper shutdown of the reader
300          */
301         private final CountDownLatch running = new CountDownLatch(1);
302
303         /**
304          * Whether the dispatcher is currently processing a message
305          */
306         private final AtomicReference<Thread> processingThread = new AtomicReference<>();
307
308         /**
309          * Stops the reader. Will wait 5 seconds for the runnable to stop (should stop within 1 second based on the poll
310          * timeout below)
311          */
312         public void stopRunning() {
313             if (isRunning.getAndSet(false)) {
314                 // only wait if stopRunning didn't get called as part of processing a message
315                 // (which would happen if we are processing an exception that forced a session close)
316                 final Thread processingThread = this.processingThread.get();
317                 if (processingThread != null && Thread.currentThread() != processingThread) {
318                     try {
319                         if (!running.await(5, TimeUnit.SECONDS)) {
320                             logger.warn("Waited too long for dispatcher to finish");
321                         }
322                     } catch (InterruptedException e) {
323                         // do nothing
324                     }
325                 }
326             }
327         }
328
329         /**
330          * Runs the logic to dispatch any responses to the current listeners until {@link #isRunning} is false.
331          */
332         @Override
333         public void run() {
334             processingThread.set(Thread.currentThread());
335
336             isRunning.set(true);
337             while (isRunning.get()) {
338                 try {
339                     // if no listeners, we don't want to start dispatching yet.
340                     if (listeners.size() == 0) {
341                         Thread.sleep(250);
342                         continue;
343                     }
344
345                     final Object response = responses.poll(1, TimeUnit.SECONDS);
346
347                     if (response != null) {
348                         if (response instanceof String) {
349                             try {
350                                 logger.debug("Dispatching response: {}", response);
351                                 final SocketSessionListener[] listeners = SocketChannelSession.this.listeners
352                                         .toArray(new SocketSessionListener[0]);
353                                 for (SocketSessionListener listener : listeners) {
354                                     listener.responseReceived((String) response);
355                                 }
356                             } catch (Exception e) {
357                                 logger.warn("Exception occurred processing the response '{}': ", response, e);
358                             }
359                         } else if (response instanceof Exception) {
360                             logger.debug("Dispatching exception: {}", response);
361                             final SocketSessionListener[] listeners = SocketChannelSession.this.listeners
362                                     .toArray(new SocketSessionListener[0]);
363                             for (SocketSessionListener listener : listeners) {
364                                 listener.responseException((Exception) response);
365                             }
366                         } else {
367                             logger.warn("Unknown response class: {}", response);
368                         }
369                     }
370                 } catch (InterruptedException e) {
371                     // Do nothing
372                 } catch (Exception e) {
373                     logger.debug("Uncaught exception {}", e.getMessage(), e);
374                     break;
375                 }
376             }
377             isRunning.set(false);
378             processingThread.set(null);
379             running.countDown();
380         }
381     }
382 }