]> git.basschouten.com Git - openhab-addons.git/blob
4d698ba854b559a1d248ef865c68516f6d965cc9
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.lutron.internal.grxprg;
14
15 import java.io.BufferedReader;
16 import java.io.IOException;
17 import java.io.InputStreamReader;
18 import java.io.PrintStream;
19 import java.net.Socket;
20 import java.net.SocketTimeoutException;
21 import java.util.concurrent.ArrayBlockingQueue;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.concurrent.atomic.AtomicReference;
26 import java.util.concurrent.locks.Condition;
27 import java.util.concurrent.locks.Lock;
28 import java.util.concurrent.locks.ReentrantLock;
29
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * Represents a restartable socket connection to the underlying telnet session with an GRX-PRG/GRX-CI-PRG. Commands can
35  * be sent via {@link #sendCommand(String)} and responses will be received on the {@link SocketSessionCallback}
36  *
37  * @author Tim Roberts - Initial contribution
38  */
39 public class SocketSession {
40     private final Logger logger = LoggerFactory.getLogger(SocketSession.class);
41
42     /**
43      * The host/ip address to connect to
44      */
45     private final String host;
46
47     /**
48      * The port to connect to
49      */
50     private final int port;
51
52     /**
53      * The actual socket being used. Will be null if not connected
54      */
55     private Socket client;
56
57     /**
58      * The writer to the {@link #client}. Will be null if not connected
59      */
60     private PrintStream writer;
61
62     /**
63      * The reader from the {@link #client}. Will be null if not connected
64      */
65     private BufferedReader reader;
66
67     /**
68      * The {@link ResponseReader} that will be used to read from {@link #reader}
69      */
70     private final ResponseReader responseReader = new ResponseReader();
71
72     /**
73      * The responses read from the {@link #responseReader}
74      */
75     private final BlockingQueue<Object> responsesQueue = new ArrayBlockingQueue<>(50);
76
77     /**
78      * The dispatcher of responses from {@link #responsesQueue}
79      */
80     private final Dispatcher dispatcher = new Dispatcher();
81
82     /**
83      * The {@link SocketSessionCallback} that the {@link #dispatcher} will call
84      */
85     private AtomicReference<SocketSessionCallback> callback = new AtomicReference<>(null);
86
87     /**
88      * Creates the socket session from the given host and port
89      *
90      * @param host a non-null, non-empty host/ip address
91      * @param port the port number between 1 and 65535
92      */
93     public SocketSession(String host, int port) {
94         if (host == null || host.trim().length() == 0) {
95             throw new IllegalArgumentException("Host cannot be null or empty");
96         }
97
98         if (port < 1 || port > 65535) {
99             throw new IllegalArgumentException("Port must be between 1 and 65535");
100         }
101         this.host = host;
102         this.port = port;
103     }
104
105     /**
106      * Sets the {@link SocketSessionCallback} to use when calling back the
107      * responses that have been received.
108      *
109      * @param callback a non-null {@link SocketSessionCallback} to use
110      */
111     public void setCallback(SocketSessionCallback callback) {
112         if (callback == null) {
113             throw new IllegalArgumentException("callback cannot be null");
114         }
115         this.callback.set(callback);
116     }
117
118     /**
119      * Will attempt to connect to the {@link #host} on port {@link #port}. If we are current connected, will
120      * {@link #disconnect()} first. Once connected, the {@link #writer} and {@link #reader} will be created, the
121      * {@link #dispatcher} and {@link #responseReader} will be started.
122      *
123      * @throws java.io.IOException if an exception occurs during the connection attempt
124      */
125     public void connect() throws IOException {
126         disconnect();
127
128         client = new Socket(host, port);
129         client.setKeepAlive(true);
130         client.setSoTimeout(1000); // allow reader to check to see if it should stop every 1 second
131
132         logger.debug("Connecting to {}:{}", host, port);
133         writer = new PrintStream(client.getOutputStream());
134         reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
135
136         new Thread(responseReader).start();
137         new Thread(dispatcher).start();
138     }
139
140     /**
141      * Disconnects from the {@link #host} if we are {@link #isConnected()}. The {@link #writer}, {@link #reader} and
142      * {@link #client}
143      * will be closed and set to null. The {@link #dispatcher} and {@link #responseReader} will be stopped, the
144      * {@link #callback} will be nulled and the {@link #responsesQueue} will be cleared.
145      *
146      * @throws java.io.IOException if an exception occurs during the disconnect attempt
147      */
148     public void disconnect() throws IOException {
149         if (isConnected()) {
150             logger.debug("Disconnecting from {}:{}", host, port);
151
152             dispatcher.stopRunning();
153             responseReader.stopRunning();
154
155             writer.close();
156             writer = null;
157
158             reader.close();
159             reader = null;
160
161             client.close();
162             client = null;
163
164             callback.set(null);
165             responsesQueue.clear();
166         }
167     }
168
169     /**
170      * Returns true if we are connected ({@link #client} is not null and is connected)
171      *
172      * @return true if connected, false otherwise
173      */
174     public boolean isConnected() {
175         return client != null && client.isConnected();
176     }
177
178     /**
179      * Sends the specified command to the underlying socket
180      *
181      * @param command a non-null, non-empty command
182      * @throws java.io.IOException an exception that occurred while sending
183      */
184     public synchronized void sendCommand(String command) throws IOException {
185         if (command == null) {
186             throw new IllegalArgumentException("command cannot be null");
187         }
188
189         if (!isConnected()) {
190             throw new IOException("Cannot send message - disconnected");
191         }
192
193         logger.debug("Sending Command: '{}'", command);
194         writer.println(command + "\n"); // as pre spec - each command must have a newline
195         writer.flush();
196     }
197
198     /**
199      * This is the runnable that will read from the socket and add messages to the responses queue (to be processed by
200      * the dispatcher)
201      *
202      * @author Tim Roberts
203      *
204      */
205     private class ResponseReader implements Runnable {
206
207         /**
208          * Whether the reader is currently rRunning
209          */
210         private final AtomicBoolean isRunning = new AtomicBoolean(false);
211
212         /**
213          * Locking to allow proper shutdown of the reader
214          */
215         private final Lock rLock = new ReentrantLock();
216         private final Condition rRunning = rLock.newCondition();
217
218         /**
219          * Stops the reader. Will wait 5 seconds for the runnable to stop (should stop within 1 second based on the
220          * setSOTimeout)
221          */
222         public void stopRunning() {
223             rLock.lock();
224             try {
225                 if (isRunning.getAndSet(false)) {
226                     if (!rRunning.await(5, TimeUnit.SECONDS)) {
227                         logger.warn("Waited too long for dispatcher to finish");
228                     }
229                 }
230             } catch (InterruptedException e) {
231                 // shouldn't happen
232             } finally {
233                 rLock.unlock();
234             }
235         }
236
237         /**
238          * Runs the logic to read from the socket until {@link #isRunning} is false. A 'response' is anything that ends
239          * with a carriage-return/newline combo. Additionally, the special "login" prompts are
240          * treated as responses for purposes of logging in.
241          */
242         @Override
243         public void run() {
244             final StringBuilder sb = new StringBuilder(100);
245             int c;
246
247             isRunning.set(true);
248             responsesQueue.clear();
249
250             while (isRunning.get()) {
251                 try {
252                     // if reader is null, sleep and try again
253                     if (reader == null) {
254                         Thread.sleep(250);
255                         continue;
256                     }
257
258                     c = reader.read();
259                     if (c == -1) {
260                         responsesQueue.put(new IOException("server closed connection"));
261                         isRunning.set(false);
262                         break;
263                     }
264                     final char ch = (char) c;
265                     sb.append(ch);
266                     if (ch == '\n' || ch == ' ') {
267                         final String str = sb.toString();
268                         if (str.endsWith("\r\n") || str.endsWith("login: ")) {
269                             sb.setLength(0);
270                             final String response = str.substring(0, str.length() - 2);
271                             logger.debug("Received response: {}", response);
272                             responsesQueue.put(response);
273                         }
274                     }
275                     // logger.debug(">>> reading: " + sb + ":" + (int) ch);
276                 } catch (SocketTimeoutException e) {
277                     // do nothing - we expect this (setSOTimeout) to check the _isReading
278                 } catch (InterruptedException e) {
279                     // Do nothing - probably shutting down
280                 } catch (IOException e) {
281                     try {
282                         isRunning.set(false);
283                         responsesQueue.put(e);
284                     } catch (InterruptedException e1) {
285                         // Do nothing - probably shutting down
286                     }
287                 }
288             }
289
290             rLock.lock();
291             try {
292                 rRunning.signalAll();
293             } finally {
294                 rLock.unlock();
295             }
296         }
297     }
298
299     /**
300      * The dispatcher runnable is responsible for reading the response queue and dispatching it to the current callable.
301      * Since the dispatcher is ONLY started when a callable is set, responses may pile up in the queue and be dispatched
302      * when a callable is set. Unlike the socket reader, this can be assigned to another thread (no state outside of the
303      * class).
304      *
305      * @author Tim Roberts
306      */
307     private class Dispatcher implements Runnable {
308
309         /**
310          * Whether the dispatcher is rRunning or not
311          */
312         private final AtomicBoolean dispatcherRunning = new AtomicBoolean(false);
313
314         /**
315          * Locking to allow proper shutdown of the reader
316          */
317         private final Lock dLock = new ReentrantLock();
318         private final Condition dRunning = dLock.newCondition();
319
320         /**
321          * Stops the reader. Will wait 5 seconds for the runnable to stop (should stop within 1 second based on the poll
322          * timeout below)
323          */
324         public void stopRunning() {
325             dLock.lock();
326             try {
327                 if (dispatcherRunning.getAndSet(false)) {
328                     if (!dRunning.await(5, TimeUnit.SECONDS)) {
329                         logger.warn("Waited too long for dispatcher to finish");
330                     }
331                 }
332             } catch (InterruptedException e) {
333                 // do nothing
334             } finally {
335                 dLock.unlock();
336             }
337         }
338
339         /**
340          * Runs the logic to dispatch any responses to the current callback until {@link #isRunning} is false.
341          */
342         @Override
343         public void run() {
344             dispatcherRunning.set(true);
345             while (dispatcherRunning.get()) {
346                 try {
347                     final SocketSessionCallback ssCallback = callback.get();
348
349                     // if callback is null, we don't want to start dispatching yet.
350                     if (ssCallback == null) {
351                         Thread.sleep(250);
352                         continue;
353                     }
354
355                     final Object response = responsesQueue.poll(1, TimeUnit.SECONDS);
356
357                     if (response != null) {
358                         if (response instanceof String) {
359                             try {
360                                 logger.debug("Dispatching response: {}", response);
361                                 ssCallback.responseReceived((String) response);
362                             } catch (Exception e) {
363                                 logger.warn("Exception occurred processing the response '{}': ", response, e);
364                             }
365                         } else if (response instanceof Exception) {
366                             logger.debug("Dispatching exception: {}", response);
367                             ssCallback.responseException((Exception) response);
368                         } else {
369                             logger.error("Unknown response class: {}", response);
370                         }
371                     }
372                 } catch (InterruptedException e) {
373                     // Do nothing
374                 }
375             }
376
377             dLock.lock();
378             try {
379                 // Signal that we are done
380                 dRunning.signalAll();
381             } finally {
382                 dLock.unlock();
383             }
384         }
385     }
386 }