]> git.basschouten.com Git - openhab-addons.git/blob
42d3a58ef62da70dae43ecba091e1c8f9a900120
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.lutron.internal.grxprg;
14
15 import java.io.BufferedReader;
16 import java.io.IOException;
17 import java.io.InputStreamReader;
18 import java.io.PrintStream;
19 import java.net.Socket;
20 import java.net.SocketTimeoutException;
21 import java.util.concurrent.ArrayBlockingQueue;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.concurrent.atomic.AtomicReference;
26 import java.util.concurrent.locks.Condition;
27 import java.util.concurrent.locks.Lock;
28 import java.util.concurrent.locks.ReentrantLock;
29
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * Represents a restartable socket connection to the underlying telnet session with a GRX-PRG/GRX-CI-PRG. Commands can
35  * be sent via {@link #sendCommand(String)} and responses will be received on the {@link SocketSessionCallback}
36  *
37  * @author Tim Roberts - Initial contribution
38  */
39 public class SocketSession {
40     private final Logger logger = LoggerFactory.getLogger(SocketSession.class);
41
42     /**
43      * The uid of the calling thing
44      */
45     private final String uid;
46     /**
47      * The host/ip address to connect to
48      */
49     private final String host;
50
51     /**
52      * The port to connect to
53      */
54     private final int port;
55
56     /**
57      * The actual socket being used. Will be null if not connected
58      */
59     private Socket client;
60
61     /**
62      * The writer to the {@link #client}. Will be null if not connected
63      */
64     private PrintStream writer;
65
66     /**
67      * The reader from the {@link #client}. Will be null if not connected
68      */
69     private BufferedReader reader;
70
71     /**
72      * The {@link ResponseReader} that will be used to read from {@link #reader}
73      */
74     private final ResponseReader responseReader = new ResponseReader();
75
76     /**
77      * The responses read from the {@link #responseReader}
78      */
79     private final BlockingQueue<Object> responsesQueue = new ArrayBlockingQueue<>(50);
80
81     /**
82      * The dispatcher of responses from {@link #responsesQueue}
83      */
84     private final Dispatcher dispatcher = new Dispatcher();
85
86     /**
87      * The {@link SocketSessionCallback} that the {@link #dispatcher} will call
88      */
89     private AtomicReference<SocketSessionCallback> callback = new AtomicReference<>(null);
90
91     /**
92      * Creates the socket session from the given host and port
93      *
94      * @param uid the thing uid of the calling thing
95      * @param host a non-null, non-empty host/ip address
96      * @param port the port number between 1 and 65535
97      */
98     public SocketSession(String uid, String host, int port) {
99         if (host == null || host.trim().length() == 0) {
100             throw new IllegalArgumentException("Host cannot be null or empty");
101         }
102
103         if (port < 1 || port > 65535) {
104             throw new IllegalArgumentException("Port must be between 1 and 65535");
105         }
106         this.uid = uid;
107         this.host = host;
108         this.port = port;
109     }
110
111     /**
112      * Sets the {@link SocketSessionCallback} to use when calling back the
113      * responses that have been received.
114      *
115      * @param callback a non-null {@link SocketSessionCallback} to use
116      */
117     public void setCallback(SocketSessionCallback callback) {
118         if (callback == null) {
119             throw new IllegalArgumentException("callback cannot be null");
120         }
121         this.callback.set(callback);
122     }
123
124     /**
125      * Will attempt to connect to the {@link #host} on port {@link #port}. If we are current connected, will
126      * {@link #disconnect()} first. Once connected, the {@link #writer} and {@link #reader} will be created, the
127      * {@link #dispatcher} and {@link #responseReader} will be started.
128      *
129      * @throws java.io.IOException if an exception occurs during the connection attempt
130      */
131     public void connect() throws IOException {
132         disconnect();
133
134         client = new Socket(host, port);
135         client.setKeepAlive(true);
136         client.setSoTimeout(1000); // allow reader to check to see if it should stop every 1 second
137
138         logger.debug("Connecting to {}:{}", host, port);
139         writer = new PrintStream(client.getOutputStream());
140         reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
141
142         new Thread(responseReader, "OH-binding-" + uid + "-responseReader").start();
143         new Thread(dispatcher, "OH-binding-" + uid + "-dispatcher").start();
144     }
145
146     /**
147      * Disconnects from the {@link #host} if we are {@link #isConnected()}. The {@link #writer}, {@link #reader} and
148      * {@link #client}
149      * will be closed and set to null. The {@link #dispatcher} and {@link #responseReader} will be stopped, the
150      * {@link #callback} will be nulled and the {@link #responsesQueue} will be cleared.
151      *
152      * @throws java.io.IOException if an exception occurs during the disconnect attempt
153      */
154     public void disconnect() throws IOException {
155         if (isConnected()) {
156             logger.debug("Disconnecting from {}:{}", host, port);
157
158             dispatcher.stopRunning();
159             responseReader.stopRunning();
160
161             writer.close();
162             writer = null;
163
164             reader.close();
165             reader = null;
166
167             client.close();
168             client = null;
169
170             callback.set(null);
171             responsesQueue.clear();
172         }
173     }
174
175     /**
176      * Returns true if we are connected ({@link #client} is not null and is connected)
177      *
178      * @return true if connected, false otherwise
179      */
180     public boolean isConnected() {
181         return client != null && client.isConnected();
182     }
183
184     /**
185      * Sends the specified command to the underlying socket
186      *
187      * @param command a non-null, non-empty command
188      * @throws java.io.IOException an exception that occurred while sending
189      */
190     public synchronized void sendCommand(String command) throws IOException {
191         if (command == null) {
192             throw new IllegalArgumentException("command cannot be null");
193         }
194
195         if (!isConnected()) {
196             throw new IOException("Cannot send message - disconnected");
197         }
198
199         logger.debug("Sending Command: '{}'", command);
200         writer.println(command + "\n"); // as pre spec - each command must have a newline
201         writer.flush();
202     }
203
204     /**
205      * This is the runnable that will read from the socket and add messages to the responses queue (to be processed by
206      * the dispatcher)
207      *
208      * @author Tim Roberts
209      *
210      */
211     private class ResponseReader implements Runnable {
212
213         /**
214          * Whether the reader is currently rRunning
215          */
216         private final AtomicBoolean isRunning = new AtomicBoolean(false);
217
218         /**
219          * Locking to allow proper shutdown of the reader
220          */
221         private final Lock rLock = new ReentrantLock();
222         private final Condition rRunning = rLock.newCondition();
223
224         /**
225          * Stops the reader. Will wait 5 seconds for the runnable to stop (should stop within 1 second based on the
226          * setSOTimeout)
227          */
228         public void stopRunning() {
229             rLock.lock();
230             try {
231                 if (isRunning.getAndSet(false)) {
232                     if (!rRunning.await(5, TimeUnit.SECONDS)) {
233                         logger.warn("Waited too long for dispatcher to finish");
234                     }
235                 }
236             } catch (InterruptedException e) {
237                 // shouldn't happen
238             } finally {
239                 rLock.unlock();
240             }
241         }
242
243         /**
244          * Runs the logic to read from the socket until {@link #isRunning} is false. A 'response' is anything that ends
245          * with a carriage-return/newline combo. Additionally, the special "login" prompts are
246          * treated as responses for purposes of logging in.
247          */
248         @Override
249         public void run() {
250             final StringBuilder sb = new StringBuilder(100);
251             int c;
252
253             isRunning.set(true);
254             responsesQueue.clear();
255
256             while (isRunning.get()) {
257                 try {
258                     // if reader is null, sleep and try again
259                     if (reader == null) {
260                         Thread.sleep(250);
261                         continue;
262                     }
263
264                     c = reader.read();
265                     if (c == -1) {
266                         responsesQueue.put(new IOException("server closed connection"));
267                         isRunning.set(false);
268                         break;
269                     }
270                     final char ch = (char) c;
271                     sb.append(ch);
272                     if (ch == '\n' || ch == ' ') {
273                         final String str = sb.toString();
274                         if (str.endsWith("\r\n") || str.endsWith("login: ")) {
275                             sb.setLength(0);
276                             final String response = str.substring(0, str.length() - 2);
277                             logger.debug("Received response: {}", response);
278                             responsesQueue.put(response);
279                         }
280                     }
281                     // logger.debug(">>> reading: " + sb + ":" + (int) ch);
282                 } catch (SocketTimeoutException e) {
283                     // do nothing - we expect this (setSOTimeout) to check the _isReading
284                 } catch (InterruptedException e) {
285                     // Do nothing - probably shutting down
286                 } catch (IOException e) {
287                     try {
288                         isRunning.set(false);
289                         responsesQueue.put(e);
290                     } catch (InterruptedException e1) {
291                         // Do nothing - probably shutting down
292                     }
293                 }
294             }
295
296             rLock.lock();
297             try {
298                 rRunning.signalAll();
299             } finally {
300                 rLock.unlock();
301             }
302         }
303     }
304
305     /**
306      * The dispatcher runnable is responsible for reading the response queue and dispatching it to the current callable.
307      * Since the dispatcher is ONLY started when a callable is set, responses may pile up in the queue and be dispatched
308      * when a callable is set. Unlike the socket reader, this can be assigned to another thread (no state outside of the
309      * class).
310      *
311      * @author Tim Roberts
312      */
313     private class Dispatcher implements Runnable {
314
315         /**
316          * Whether the dispatcher is rRunning or not
317          */
318         private final AtomicBoolean dispatcherRunning = new AtomicBoolean(false);
319
320         /**
321          * Locking to allow proper shutdown of the reader
322          */
323         private final Lock dLock = new ReentrantLock();
324         private final Condition dRunning = dLock.newCondition();
325
326         /**
327          * Stops the reader. Will wait 5 seconds for the runnable to stop (should stop within 1 second based on the poll
328          * timeout below)
329          */
330         public void stopRunning() {
331             dLock.lock();
332             try {
333                 if (dispatcherRunning.getAndSet(false)) {
334                     if (!dRunning.await(5, TimeUnit.SECONDS)) {
335                         logger.warn("Waited too long for dispatcher to finish");
336                     }
337                 }
338             } catch (InterruptedException e) {
339                 // do nothing
340             } finally {
341                 dLock.unlock();
342             }
343         }
344
345         /**
346          * Runs the logic to dispatch any responses to the current callback until {@link #isRunning} is false.
347          */
348         @Override
349         public void run() {
350             dispatcherRunning.set(true);
351             while (dispatcherRunning.get()) {
352                 try {
353                     final SocketSessionCallback ssCallback = callback.get();
354
355                     // if callback is null, we don't want to start dispatching yet.
356                     if (ssCallback == null) {
357                         Thread.sleep(250);
358                         continue;
359                     }
360
361                     final Object response = responsesQueue.poll(1, TimeUnit.SECONDS);
362
363                     if (response != null) {
364                         if (response instanceof String) {
365                             try {
366                                 logger.debug("Dispatching response: {}", response);
367                                 ssCallback.responseReceived((String) response);
368                             } catch (Exception e) {
369                                 logger.warn("Exception occurred processing the response '{}': ", response, e);
370                             }
371                         } else if (response instanceof Exception) {
372                             logger.debug("Dispatching exception: {}", response);
373                             ssCallback.responseException((Exception) response);
374                         } else {
375                             logger.error("Unknown response class: {}", response);
376                         }
377                     }
378                 } catch (InterruptedException e) {
379                     // Do nothing
380                 }
381             }
382
383             dLock.lock();
384             try {
385                 // Signal that we are done
386                 dRunning.signalAll();
387             } finally {
388                 dLock.unlock();
389             }
390         }
391     }
392 }