2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.lutron.internal.grxprg;
15 import java.io.BufferedReader;
16 import java.io.IOException;
17 import java.io.InputStreamReader;
18 import java.io.PrintStream;
19 import java.net.Socket;
20 import java.net.SocketTimeoutException;
21 import java.util.concurrent.ArrayBlockingQueue;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.concurrent.atomic.AtomicReference;
26 import java.util.concurrent.locks.Condition;
27 import java.util.concurrent.locks.Lock;
28 import java.util.concurrent.locks.ReentrantLock;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
34 * Represents a restartable socket connection to the underlying telnet session with a GRX-PRG/GRX-CI-PRG. Commands can
35 * be sent via {@link #sendCommand(String)} and responses will be received on the {@link SocketSessionCallback}
37 * @author Tim Roberts - Initial contribution
39 public class SocketSession {
40 private final Logger logger = LoggerFactory.getLogger(SocketSession.class);
43 * The uid of the calling thing
45 private final String uid;
47 * The host/ip address to connect to
49 private final String host;
52 * The port to connect to
54 private final int port;
57 * The actual socket being used. Will be null if not connected
59 private Socket client;
62 * The writer to the {@link #client}. Will be null if not connected
64 private PrintStream writer;
67 * The reader from the {@link #client}. Will be null if not connected
69 private BufferedReader reader;
72 * The {@link ResponseReader} that will be used to read from {@link #reader}
74 private final ResponseReader responseReader = new ResponseReader();
77 * The responses read from the {@link #responseReader}
79 private final BlockingQueue<Object> responsesQueue = new ArrayBlockingQueue<>(50);
82 * The dispatcher of responses from {@link #responsesQueue}
84 private final Dispatcher dispatcher = new Dispatcher();
87 * The {@link SocketSessionCallback} that the {@link #dispatcher} will call
89 private AtomicReference<SocketSessionCallback> callback = new AtomicReference<>(null);
92 * Creates the socket session from the given host and port
94 * @param uid the thing uid of the calling thing
95 * @param host a non-null, non-empty host/ip address
96 * @param port the port number between 1 and 65535
98 public SocketSession(String uid, String host, int port) {
99 if (host == null || host.trim().length() == 0) {
100 throw new IllegalArgumentException("Host cannot be null or empty");
103 if (port < 1 || port > 65535) {
104 throw new IllegalArgumentException("Port must be between 1 and 65535");
112 * Sets the {@link SocketSessionCallback} to use when calling back the
113 * responses that have been received.
115 * @param callback a non-null {@link SocketSessionCallback} to use
117 public void setCallback(SocketSessionCallback callback) {
118 if (callback == null) {
119 throw new IllegalArgumentException("callback cannot be null");
121 this.callback.set(callback);
125 * Will attempt to connect to the {@link #host} on port {@link #port}. If we are current connected, will
126 * {@link #disconnect()} first. Once connected, the {@link #writer} and {@link #reader} will be created, the
127 * {@link #dispatcher} and {@link #responseReader} will be started.
129 * @throws java.io.IOException if an exception occurs during the connection attempt
131 public void connect() throws IOException {
134 client = new Socket(host, port);
135 client.setKeepAlive(true);
136 client.setSoTimeout(1000); // allow reader to check to see if it should stop every 1 second
138 logger.debug("Connecting to {}:{}", host, port);
139 writer = new PrintStream(client.getOutputStream());
140 reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
142 new Thread(responseReader, "OH-binding-" + uid + "-responseReader").start();
143 new Thread(dispatcher, "OH-binding-" + uid + "-dispatcher").start();
147 * Disconnects from the {@link #host} if we are {@link #isConnected()}. The {@link #writer}, {@link #reader} and
149 * will be closed and set to null. The {@link #dispatcher} and {@link #responseReader} will be stopped, the
150 * {@link #callback} will be nulled and the {@link #responsesQueue} will be cleared.
152 * @throws java.io.IOException if an exception occurs during the disconnect attempt
154 public void disconnect() throws IOException {
156 logger.debug("Disconnecting from {}:{}", host, port);
158 dispatcher.stopRunning();
159 responseReader.stopRunning();
171 responsesQueue.clear();
176 * Returns true if we are connected ({@link #client} is not null and is connected)
178 * @return true if connected, false otherwise
180 public boolean isConnected() {
181 return client != null && client.isConnected();
185 * Sends the specified command to the underlying socket
187 * @param command a non-null, non-empty command
188 * @throws java.io.IOException an exception that occurred while sending
190 public synchronized void sendCommand(String command) throws IOException {
191 if (command == null) {
192 throw new IllegalArgumentException("command cannot be null");
195 if (!isConnected()) {
196 throw new IOException("Cannot send message - disconnected");
199 logger.debug("Sending Command: '{}'", command);
200 writer.println(command + "\n"); // as pre spec - each command must have a newline
205 * This is the runnable that will read from the socket and add messages to the responses queue (to be processed by
208 * @author Tim Roberts
211 private class ResponseReader implements Runnable {
214 * Whether the reader is currently rRunning
216 private final AtomicBoolean isRunning = new AtomicBoolean(false);
219 * Locking to allow proper shutdown of the reader
221 private final Lock rLock = new ReentrantLock();
222 private final Condition rRunning = rLock.newCondition();
225 * Stops the reader. Will wait 5 seconds for the runnable to stop (should stop within 1 second based on the
228 public void stopRunning() {
231 if (isRunning.getAndSet(false)) {
232 if (!rRunning.await(5, TimeUnit.SECONDS)) {
233 logger.warn("Waited too long for dispatcher to finish");
236 } catch (InterruptedException e) {
244 * Runs the logic to read from the socket until {@link #isRunning} is false. A 'response' is anything that ends
245 * with a carriage-return/newline combo. Additionally, the special "login" prompts are
246 * treated as responses for purposes of logging in.
250 final StringBuilder sb = new StringBuilder(100);
254 responsesQueue.clear();
256 while (isRunning.get()) {
258 // if reader is null, sleep and try again
259 if (reader == null) {
266 responsesQueue.put(new IOException("server closed connection"));
267 isRunning.set(false);
270 final char ch = (char) c;
272 if (ch == '\n' || ch == ' ') {
273 final String str = sb.toString();
274 if (str.endsWith("\r\n") || str.endsWith("login: ")) {
276 final String response = str.substring(0, str.length() - 2);
277 logger.debug("Received response: {}", response);
278 responsesQueue.put(response);
281 // logger.debug(">>> reading: " + sb + ":" + (int) ch);
282 } catch (SocketTimeoutException e) {
283 // do nothing - we expect this (setSOTimeout) to check the _isReading
284 } catch (InterruptedException e) {
285 // Do nothing - probably shutting down
286 } catch (IOException e) {
288 isRunning.set(false);
289 responsesQueue.put(e);
290 } catch (InterruptedException e1) {
291 // Do nothing - probably shutting down
298 rRunning.signalAll();
306 * The dispatcher runnable is responsible for reading the response queue and dispatching it to the current callable.
307 * Since the dispatcher is ONLY started when a callable is set, responses may pile up in the queue and be dispatched
308 * when a callable is set. Unlike the socket reader, this can be assigned to another thread (no state outside of the
311 * @author Tim Roberts
313 private class Dispatcher implements Runnable {
316 * Whether the dispatcher is rRunning or not
318 private final AtomicBoolean dispatcherRunning = new AtomicBoolean(false);
321 * Locking to allow proper shutdown of the reader
323 private final Lock dLock = new ReentrantLock();
324 private final Condition dRunning = dLock.newCondition();
327 * Stops the reader. Will wait 5 seconds for the runnable to stop (should stop within 1 second based on the poll
330 public void stopRunning() {
333 if (dispatcherRunning.getAndSet(false)) {
334 if (!dRunning.await(5, TimeUnit.SECONDS)) {
335 logger.warn("Waited too long for dispatcher to finish");
338 } catch (InterruptedException e) {
346 * Runs the logic to dispatch any responses to the current callback until {@link #isRunning} is false.
350 dispatcherRunning.set(true);
351 while (dispatcherRunning.get()) {
353 final SocketSessionCallback ssCallback = callback.get();
355 // if callback is null, we don't want to start dispatching yet.
356 if (ssCallback == null) {
361 final Object response = responsesQueue.poll(1, TimeUnit.SECONDS);
363 if (response != null) {
364 if (response instanceof String) {
366 logger.debug("Dispatching response: {}", response);
367 ssCallback.responseReceived((String) response);
368 } catch (Exception e) {
369 logger.warn("Exception occurred processing the response '{}': ", response, e);
371 } else if (response instanceof Exception) {
372 logger.debug("Dispatching exception: {}", response);
373 ssCallback.responseException((Exception) response);
375 logger.error("Unknown response class: {}", response);
378 } catch (InterruptedException e) {
385 // Signal that we are done
386 dRunning.signalAll();