2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.russound.internal.net;
15 import java.io.IOException;
16 import java.net.InetSocketAddress;
17 import java.nio.ByteBuffer;
18 import java.nio.channels.AsynchronousCloseException;
19 import java.nio.channels.SocketChannel;
20 import java.util.List;
21 import java.util.concurrent.ArrayBlockingQueue;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicReference;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
31 * Represents a restartable socket connection to the underlying telnet session. Commands can be sent via
32 * {@link #sendCommand(String)} and responses will be received on any {@link SocketSessionListener}. This implementation
33 * of {@link SocketSession} communicates using a {@link SocketChannel} connection.
35 * @author Tim Roberts - Initial contribution
37 public class SocketChannelSession implements SocketSession {
38 private final Logger logger = LoggerFactory.getLogger(SocketChannelSession.class);
41 * The host/ip address to connect to
43 private final String host;
46 * The port to connect to
48 private final int port;
51 * The actual socket being used. Will be null if not connected
53 private final AtomicReference<SocketChannel> socketChannel = new AtomicReference<>();
56 * The responses read from the {@link #responseReader}
58 private final BlockingQueue<Object> responses = new ArrayBlockingQueue<>(50);
61 * The {@link SocketSessionListener} that the {@link #dispatcher} will call
63 private List<SocketSessionListener> sessionListeners = new CopyOnWriteArrayList<>();
66 * The thread dispatching responses - will be null if not connected
68 private Thread dispatchingThread = null;
71 * The thread processing responses - will be null if not connected
73 private Thread responseThread = null;
76 * Creates the socket session from the given host and port
78 * @param host a non-null, non-empty host/ip address
79 * @param port the port number between 1 and 65535
81 public SocketChannelSession(String host, int port) {
82 if (host == null || host.trim().length() == 0) {
83 throw new IllegalArgumentException("Host cannot be null or empty");
86 if (port < 1 || port > 65535) {
87 throw new IllegalArgumentException("Port must be between 1 and 65535");
94 public void addListener(SocketSessionListener listener) {
95 if (listener == null) {
96 throw new IllegalArgumentException("listener cannot be null");
98 sessionListeners.add(listener);
102 public void clearListeners() {
103 sessionListeners.clear();
107 public boolean removeListener(SocketSessionListener listener) {
108 return sessionListeners.remove(listener);
112 public void connect() throws IOException {
117 public void connect(int timeout) throws IOException {
120 final SocketChannel channel = SocketChannel.open();
121 channel.configureBlocking(true);
123 logger.debug("Connecting to {}:{}", host, port);
124 channel.socket().connect(new InetSocketAddress(host, port), timeout);
126 socketChannel.set(channel);
130 dispatchingThread = new Thread(new Dispatcher());
131 responseThread = new Thread(new ResponseReader());
133 dispatchingThread.start();
134 responseThread.start();
138 public void disconnect() throws IOException {
140 logger.debug("Disconnecting from {}:{}", host, port);
142 final SocketChannel channel = socketChannel.getAndSet(null);
145 dispatchingThread.interrupt();
146 dispatchingThread = null;
148 responseThread.interrupt();
149 responseThread = null;
156 public boolean isConnected() {
157 final SocketChannel channel = socketChannel.get();
158 return channel != null && channel.isConnected();
162 public synchronized void sendCommand(String command) throws IOException {
163 if (command == null) {
164 throw new IllegalArgumentException("command cannot be null");
167 // if (command.trim().length() == 0) {
168 // throw new IllegalArgumentException("Command cannot be empty");
171 if (!isConnected()) {
172 throw new IOException("Cannot send message - disconnected");
175 ByteBuffer toSend = ByteBuffer.wrap((command + "\r\n").getBytes());
177 final SocketChannel channel = socketChannel.get();
178 if (channel == null) {
179 logger.debug("Cannot send command '{}' - socket channel was closed", command);
181 logger.debug("Sending Command: '{}'", command);
182 channel.write(toSend);
187 * This is the runnable that will read from the socket and add messages to the responses queue (to be processed by
190 * @author Tim Roberts
193 private class ResponseReader implements Runnable {
196 * Runs the logic to read from the socket until {@link #isRunning} is false. A 'response' is anything that ends
197 * with a carriage-return/newline combo. Additionally, the special "Login: " and "Password: " prompts are
198 * treated as responses for purposes of logging in.
202 final StringBuilder sb = new StringBuilder(100);
203 final ByteBuffer readBuffer = ByteBuffer.allocate(1024);
207 while (!Thread.currentThread().isInterrupted()) {
209 // if reader is null, sleep and try again
210 if (readBuffer == null) {
215 final SocketChannel channel = socketChannel.get();
216 if (channel == null) {
218 Thread.currentThread().interrupt();
222 int bytesRead = channel.read(readBuffer);
223 if (bytesRead == -1) {
224 responses.put(new IOException("server closed connection"));
226 } else if (bytesRead == 0) {
232 while (readBuffer.hasRemaining()) {
233 final char ch = (char) readBuffer.get();
235 if (ch == '\n' || ch == ' ') {
236 final String str = sb.toString();
237 if (str.endsWith("\r\n") || str.endsWith("Login: ") || str.endsWith("Password: ")) {
239 final String response = str.substring(0, str.length() - 2);
240 responses.put(response);
246 } catch (InterruptedException e) {
247 // Ending thread execution
248 Thread.currentThread().interrupt();
249 } catch (AsynchronousCloseException e) {
250 // socket was closed by another thread but interrupt our loop anyway
251 Thread.currentThread().interrupt();
252 } catch (IOException e) {
253 // set before pushing the response since we'll likely call back our stop
254 Thread.currentThread().interrupt();
259 } catch (InterruptedException e1) {
260 // Do nothing - probably shutting down
261 // Since we set isRunning to false, will drop out of loop and end the thread
269 * The dispatcher runnable is responsible for reading the response queue and dispatching it to the current callable.
270 * Since the dispatcher is ONLY started when a callable is set, responses may pile up in the queue and be dispatched
271 * when a callable is set. Unlike the socket reader, this can be assigned to another thread (no state outside of the
274 * @author Tim Roberts
276 private class Dispatcher implements Runnable {
278 * Runs the logic to dispatch any responses to the current listeners until {@link #isRunning} is false.
282 while (!Thread.currentThread().isInterrupted()) {
284 final SocketSessionListener[] listeners = sessionListeners.toArray(new SocketSessionListener[0]);
286 // if no listeners, we don't want to start dispatching yet.
287 if (listeners.length == 0) {
292 final Object response = responses.poll(1, TimeUnit.SECONDS);
294 if (response != null) {
295 if (response instanceof String) {
296 logger.debug("Dispatching response: {}", response);
297 for (SocketSessionListener listener : listeners) {
298 listener.responseReceived((String) response);
300 } else if (response instanceof IOException) {
301 logger.debug("Dispatching exception: {}", response);
302 for (SocketSessionListener listener : listeners) {
303 listener.responseException((IOException) response);
306 logger.warn("Unknown response class: {}", response);
309 } catch (InterruptedException e) {
310 // Ending thread execution
311 Thread.currentThread().interrupt();
312 } catch (Exception e) {
313 logger.debug("Uncaught exception {}: ", e.getMessage(), e);
314 Thread.currentThread().interrupt();