]> git.basschouten.com Git - openhab-addons.git/blob
85fcc5847de3ff331cd51df25756434a5b90993f
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.russound.internal.net;
14
15 import java.io.IOException;
16 import java.net.InetSocketAddress;
17 import java.nio.ByteBuffer;
18 import java.nio.channels.AsynchronousCloseException;
19 import java.nio.channels.SocketChannel;
20 import java.util.List;
21 import java.util.concurrent.ArrayBlockingQueue;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicReference;
26
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /**
31  * Represents a restartable socket connection to the underlying telnet session. Commands can be sent via
32  * {@link #sendCommand(String)} and responses will be received on any {@link SocketSessionListener}. This implementation
33  * of {@link SocketSession} communicates using a {@link SocketChannel} connection.
34  *
35  * @author Tim Roberts - Initial contribution
36  */
37 public class SocketChannelSession implements SocketSession {
38     private final Logger logger = LoggerFactory.getLogger(SocketChannelSession.class);
39
40     /**
41      * The host/ip address to connect to
42      */
43     private final String host;
44
45     /**
46      * The port to connect to
47      */
48     private final int port;
49
50     /**
51      * The actual socket being used. Will be null if not connected
52      */
53     private final AtomicReference<SocketChannel> socketChannel = new AtomicReference<>();
54
55     /**
56      * The responses read from the {@link #responseReader}
57      */
58     private final BlockingQueue<Object> responses = new ArrayBlockingQueue<>(50);
59
60     /**
61      * The {@link SocketSessionListener} that the {@link #dispatcher} will call
62      */
63     private List<SocketSessionListener> sessionListeners = new CopyOnWriteArrayList<>();
64
65     /**
66      * The thread dispatching responses - will be null if not connected
67      */
68     private Thread dispatchingThread = null;
69
70     /**
71      * The thread processing responses - will be null if not connected
72      */
73     private Thread responseThread = null;
74
75     /**
76      * Creates the socket session from the given host and port
77      *
78      * @param host a non-null, non-empty host/ip address
79      * @param port the port number between 1 and 65535
80      */
81     public SocketChannelSession(String host, int port) {
82         if (host == null || host.trim().length() == 0) {
83             throw new IllegalArgumentException("Host cannot be null or empty");
84         }
85
86         if (port < 1 || port > 65535) {
87             throw new IllegalArgumentException("Port must be between 1 and 65535");
88         }
89         this.host = host;
90         this.port = port;
91     }
92
93     @Override
94     public void addListener(SocketSessionListener listener) {
95         if (listener == null) {
96             throw new IllegalArgumentException("listener cannot be null");
97         }
98         sessionListeners.add(listener);
99     }
100
101     @Override
102     public void clearListeners() {
103         sessionListeners.clear();
104     }
105
106     @Override
107     public boolean removeListener(SocketSessionListener listener) {
108         return sessionListeners.remove(listener);
109     }
110
111     @Override
112     public void connect() throws IOException {
113         connect(2000);
114     }
115
116     @Override
117     public void connect(int timeout) throws IOException {
118         disconnect();
119
120         final SocketChannel channel = SocketChannel.open();
121         channel.configureBlocking(true);
122
123         logger.debug("Connecting to {}:{}", host, port);
124         channel.socket().connect(new InetSocketAddress(host, port), timeout);
125
126         socketChannel.set(channel);
127
128         responses.clear();
129
130         dispatchingThread = new Thread(new Dispatcher());
131         responseThread = new Thread(new ResponseReader());
132
133         dispatchingThread.start();
134         responseThread.start();
135     }
136
137     @Override
138     public void disconnect() throws IOException {
139         if (isConnected()) {
140             logger.debug("Disconnecting from {}:{}", host, port);
141
142             final SocketChannel channel = socketChannel.getAndSet(null);
143             channel.close();
144
145             dispatchingThread.interrupt();
146             dispatchingThread = null;
147
148             responseThread.interrupt();
149             responseThread = null;
150
151             responses.clear();
152         }
153     }
154
155     @Override
156     public boolean isConnected() {
157         final SocketChannel channel = socketChannel.get();
158         return channel != null && channel.isConnected();
159     }
160
161     @Override
162     public synchronized void sendCommand(String command) throws IOException {
163         if (command == null) {
164             throw new IllegalArgumentException("command cannot be null");
165         }
166
167         // if (command.trim().length() == 0) {
168         // throw new IllegalArgumentException("Command cannot be empty");
169         // }
170
171         if (!isConnected()) {
172             throw new IOException("Cannot send message - disconnected");
173         }
174
175         ByteBuffer toSend = ByteBuffer.wrap((command + "\r\n").getBytes());
176
177         final SocketChannel channel = socketChannel.get();
178         if (channel == null) {
179             logger.debug("Cannot send command '{}' - socket channel was closed", command);
180         } else {
181             logger.debug("Sending Command: '{}'", command);
182             channel.write(toSend);
183         }
184     }
185
186     /**
187      * This is the runnable that will read from the socket and add messages to the responses queue (to be processed by
188      * the dispatcher)
189      *
190      * @author Tim Roberts
191      *
192      */
193     private class ResponseReader implements Runnable {
194
195         /**
196          * Runs the logic to read from the socket until {@link #isRunning} is false. A 'response' is anything that ends
197          * with a carriage-return/newline combo. Additionally, the special "Login: " and "Password: " prompts are
198          * treated as responses for purposes of logging in.
199          */
200         @Override
201         public void run() {
202             final StringBuilder sb = new StringBuilder(100);
203             final ByteBuffer readBuffer = ByteBuffer.allocate(1024);
204
205             responses.clear();
206
207             while (!Thread.currentThread().isInterrupted()) {
208                 try {
209                     // if reader is null, sleep and try again
210                     if (readBuffer == null) {
211                         Thread.sleep(250);
212                         continue;
213                     }
214
215                     final SocketChannel channel = socketChannel.get();
216                     if (channel == null) {
217                         // socket was closed
218                         Thread.currentThread().interrupt();
219                         break;
220                     }
221
222                     int bytesRead = channel.read(readBuffer);
223                     if (bytesRead == -1) {
224                         responses.put(new IOException("server closed connection"));
225                         break;
226                     } else if (bytesRead == 0) {
227                         readBuffer.clear();
228                         continue;
229                     }
230
231                     readBuffer.flip();
232                     while (readBuffer.hasRemaining()) {
233                         final char ch = (char) readBuffer.get();
234                         sb.append(ch);
235                         if (ch == '\n' || ch == ' ') {
236                             final String str = sb.toString();
237                             if (str.endsWith("\r\n") || str.endsWith("Login: ") || str.endsWith("Password: ")) {
238                                 sb.setLength(0);
239                                 final String response = str.substring(0, str.length() - 2);
240                                 responses.put(response);
241                             }
242                         }
243                     }
244
245                     readBuffer.flip();
246                 } catch (InterruptedException e) {
247                     // Ending thread execution
248                     Thread.currentThread().interrupt();
249                 } catch (AsynchronousCloseException e) {
250                     // socket was closed by another thread but interrupt our loop anyway
251                     Thread.currentThread().interrupt();
252                 } catch (IOException e) {
253                     // set before pushing the response since we'll likely call back our stop
254                     Thread.currentThread().interrupt();
255
256                     try {
257                         responses.put(e);
258                         break;
259                     } catch (InterruptedException e1) {
260                         // Do nothing - probably shutting down
261                         // Since we set isRunning to false, will drop out of loop and end the thread
262                     }
263                 }
264             }
265         }
266     }
267
268     /**
269      * The dispatcher runnable is responsible for reading the response queue and dispatching it to the current callable.
270      * Since the dispatcher is ONLY started when a callable is set, responses may pile up in the queue and be dispatched
271      * when a callable is set. Unlike the socket reader, this can be assigned to another thread (no state outside of the
272      * class).
273      *
274      * @author Tim Roberts
275      */
276     private class Dispatcher implements Runnable {
277         /**
278          * Runs the logic to dispatch any responses to the current listeners until {@link #isRunning} is false.
279          */
280         @Override
281         public void run() {
282             while (!Thread.currentThread().isInterrupted()) {
283                 try {
284                     final SocketSessionListener[] listeners = sessionListeners.toArray(new SocketSessionListener[0]);
285
286                     // if no listeners, we don't want to start dispatching yet.
287                     if (listeners.length == 0) {
288                         Thread.sleep(250);
289                         continue;
290                     }
291
292                     final Object response = responses.poll(1, TimeUnit.SECONDS);
293
294                     if (response != null) {
295                         if (response instanceof String) {
296                             logger.debug("Dispatching response: {}", response);
297                             for (SocketSessionListener listener : listeners) {
298                                 listener.responseReceived((String) response);
299                             }
300                         } else if (response instanceof IOException) {
301                             logger.debug("Dispatching exception: {}", response);
302                             for (SocketSessionListener listener : listeners) {
303                                 listener.responseException((IOException) response);
304                             }
305                         } else {
306                             logger.warn("Unknown response class: {}", response);
307                         }
308                     }
309                 } catch (InterruptedException e) {
310                     // Ending thread execution
311                     Thread.currentThread().interrupt();
312                 } catch (Exception e) {
313                     logger.debug("Uncaught exception {}: ", e.getMessage(), e);
314                     Thread.currentThread().interrupt();
315                 }
316             }
317         }
318     }
319 }