2 * Copyright (c) 2010-2022 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.atlona.internal.net;
15 import java.io.IOException;
16 import java.net.InetSocketAddress;
17 import java.nio.ByteBuffer;
18 import java.nio.channels.AsynchronousCloseException;
19 import java.nio.channels.SocketChannel;
20 import java.util.List;
21 import java.util.concurrent.ArrayBlockingQueue;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.concurrent.atomic.AtomicReference;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * Represents a restartable socket connection to the underlying telnet session. Commands can be sent via
34 * {@link #sendCommand(String)} and responses will be received on any {@link SocketSessionListener}. This implementation
35 * of {@link SocketSession} communicates using a {@link SocketChannel} connection.
37 * @author Tim Roberts - Initial contribution
39 public class SocketChannelSession implements SocketSession {
40 private final Logger logger = LoggerFactory.getLogger(SocketChannelSession.class);
43 * The uid of the calling thing
45 private final String uid;
47 * The host/ip address to connect to
49 private final String host;
52 * The port to connect to
54 private final int port;
57 * The actual socket being used. Will be null if not connected
59 private final AtomicReference<SocketChannel> socketChannel = new AtomicReference<>();
62 * The {@link ResponseReader} that will be used to read from {@link #_readBuffer}
64 private final ResponseReader responseReader = new ResponseReader();
67 * The responses read from the {@link #responseReader}
69 private final BlockingQueue<Object> responses = new ArrayBlockingQueue<>(50);
72 * The dispatcher of responses from {@link #responses}
74 private final Dispatcher dispatcher = new Dispatcher();
77 * The {@link SocketSessionListener} that the {@link #dispatcher} will call
79 private List<SocketSessionListener> listeners = new CopyOnWriteArrayList<>();
82 * Creates the socket session from the given host and port
84 * @param uid the thing uid of the calling thing
85 * @param host a non-null, non-empty host/ip address
86 * @param port the port number between 1 and 65535
88 public SocketChannelSession(String uid, String host, int port) {
89 if (host == null || host.trim().length() == 0) {
90 throw new IllegalArgumentException("Host cannot be null or empty");
93 if (port < 1 || port > 65535) {
94 throw new IllegalArgumentException("Port must be between 1 and 65535");
102 public void addListener(SocketSessionListener listener) {
103 if (listener == null) {
104 throw new IllegalArgumentException("listener cannot be null");
106 listeners.add(listener);
110 public void clearListeners() {
115 public boolean removeListener(SocketSessionListener listener) {
116 return listeners.remove(listener);
120 public void connect() throws IOException {
123 final SocketChannel channel = SocketChannel.open();
124 channel.configureBlocking(true);
126 logger.debug("Connecting to {}:{}", host, port);
127 channel.connect(new InetSocketAddress(host, port));
129 logger.debug("Waiting for connect");
130 while (!channel.finishConnect()) {
133 } catch (InterruptedException e) {
137 socketChannel.set(channel);
138 Thread dispatcherThread = new Thread(dispatcher, "OH-binding-" + uid + "-dispatcher");
139 dispatcherThread.setDaemon(true);
140 dispatcherThread.start();
141 Thread responseReaderThread = new Thread(responseReader, "OH-binding-" + uid + "-responseReader");
142 responseReaderThread.setDaemon(true);
143 responseReaderThread.start();
147 public void disconnect() throws IOException {
149 logger.debug("Disconnecting from {}:{}", host, port);
151 final SocketChannel channel = socketChannel.getAndSet(null);
154 dispatcher.stopRunning();
155 responseReader.stopRunning();
162 public boolean isConnected() {
163 final SocketChannel channel = socketChannel.get();
164 return channel != null && channel.isConnected();
168 public synchronized void sendCommand(String command) throws IOException {
169 if (command == null) {
170 throw new IllegalArgumentException("command cannot be null");
173 if (!isConnected()) {
174 throw new IOException("Cannot send message - disconnected");
177 ByteBuffer toSend = ByteBuffer.wrap((command + "\r\n").getBytes());
179 final SocketChannel channel = socketChannel.get();
180 if (channel == null) {
181 logger.debug("Cannot send command '{}' - socket channel was closed", command);
183 logger.debug("Sending Command: '{}'", command);
184 channel.write(toSend);
189 * This is the runnable that will read from the socket and add messages to the responses queue (to be processed by
192 * @author Tim Roberts
195 private class ResponseReader implements Runnable {
198 * Whether the reader is currently running
200 private final AtomicBoolean isRunning = new AtomicBoolean(false);
203 * Locking to allow proper shutdown of the reader
205 private final CountDownLatch running = new CountDownLatch(1);
208 * Stops the reader. Will wait 5 seconds for the runnable to stop
210 public void stopRunning() {
211 if (isRunning.getAndSet(false)) {
213 if (!running.await(5, TimeUnit.SECONDS)) {
214 logger.warn("Waited too long for response reader to finish");
216 } catch (InterruptedException e) {
223 * Runs the logic to read from the socket until {@link #isRunning} is false. A 'response' is anything that ends
224 * with a carriage-return/newline combo. Additionally, the special "Login: " and "Password: " prompts are
225 * treated as responses for purposes of logging in.
229 final StringBuilder sb = new StringBuilder(100);
230 final ByteBuffer readBuffer = ByteBuffer.allocate(1024);
235 while (isRunning.get()) {
237 // if reader is null, sleep and try again
238 if (readBuffer == null) {
243 final SocketChannel channel = socketChannel.get();
244 if (channel == null) {
246 isRunning.set(false);
250 int bytesRead = channel.read(readBuffer);
251 if (bytesRead == -1) {
252 responses.put(new IOException("server closed connection"));
253 isRunning.set(false);
255 } else if (bytesRead == 0) {
261 while (readBuffer.hasRemaining()) {
262 final char ch = (char) readBuffer.get();
264 if (ch == '\n' || ch == ' ') {
265 final String str = sb.toString();
266 if (str.endsWith("\r\n") || str.endsWith("Login: ") || str.endsWith("Password: ")) {
268 final String response = str.substring(0, str.length() - 2);
269 responses.put(response);
275 } catch (InterruptedException e) {
276 // Do nothing - probably shutting down
277 } catch (AsynchronousCloseException e) {
278 // socket was definitely closed by another thread
279 } catch (IOException e) {
281 isRunning.set(false);
283 } catch (InterruptedException e1) {
284 // Do nothing - probably shutting down
294 * The dispatcher runnable is responsible for reading the response queue and dispatching it to the current callable.
295 * Since the dispatcher is ONLY started when a callable is set, responses may pile up in the queue and be dispatched
296 * when a callable is set. Unlike the socket reader, this can be assigned to another thread (no state outside of the
299 * @author Tim Roberts
301 private class Dispatcher implements Runnable {
304 * Whether the dispatcher is running or not
306 private final AtomicBoolean isRunning = new AtomicBoolean(false);
309 * Locking to allow proper shutdown of the reader
311 private final CountDownLatch running = new CountDownLatch(1);
314 * Whether the dispatcher is currently processing a message
316 private final AtomicReference<Thread> processingThread = new AtomicReference<>();
319 * Stops the reader. Will wait 5 seconds for the runnable to stop (should stop within 1 second based on the poll
322 @SuppressWarnings("PMD.CompareObjectsWithEquals")
323 public void stopRunning() {
324 if (isRunning.getAndSet(false)) {
325 // only wait if stopRunning didn't get called as part of processing a message
326 // (which would happen if we are processing an exception that forced a session close)
327 final Thread processingThread = this.processingThread.get();
328 if (processingThread != null && Thread.currentThread() != processingThread) {
330 if (!running.await(5, TimeUnit.SECONDS)) {
331 logger.warn("Waited too long for dispatcher to finish");
333 } catch (InterruptedException e) {
341 * Runs the logic to dispatch any responses to the current listeners until {@link #isRunning} is false.
345 processingThread.set(Thread.currentThread());
348 while (isRunning.get()) {
350 // if no listeners, we don't want to start dispatching yet.
351 if (listeners.isEmpty()) {
356 final Object response = responses.poll(1, TimeUnit.SECONDS);
358 if (response != null) {
359 if (response instanceof String) {
361 logger.debug("Dispatching response: {}", response);
362 final SocketSessionListener[] listeners = SocketChannelSession.this.listeners
363 .toArray(new SocketSessionListener[0]);
364 for (SocketSessionListener listener : listeners) {
365 listener.responseReceived((String) response);
367 } catch (Exception e) {
368 logger.warn("Exception occurred processing the response '{}': ", response, e);
370 } else if (response instanceof Exception) {
371 logger.debug("Dispatching exception: {}", response);
372 final SocketSessionListener[] listeners = SocketChannelSession.this.listeners
373 .toArray(new SocketSessionListener[0]);
374 for (SocketSessionListener listener : listeners) {
375 listener.responseException((Exception) response);
378 logger.warn("Unknown response class: {}", response);
381 } catch (InterruptedException e) {
383 } catch (Exception e) {
384 logger.debug("Uncaught exception {}", e.getMessage(), e);
388 isRunning.set(false);
389 processingThread.set(null);