2 * Copyright (c) 2010-2021 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.lutron.internal.grxprg;
15 import java.io.BufferedReader;
16 import java.io.IOException;
17 import java.io.InputStreamReader;
18 import java.io.PrintStream;
19 import java.net.Socket;
20 import java.net.SocketTimeoutException;
21 import java.util.concurrent.ArrayBlockingQueue;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.concurrent.atomic.AtomicReference;
26 import java.util.concurrent.locks.Condition;
27 import java.util.concurrent.locks.Lock;
28 import java.util.concurrent.locks.ReentrantLock;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
34 * Represents a restartable socket connection to the underlying telnet session with an GRX-PRG/GRX-CI-PRG. Commands can
35 * be sent via {@link #sendCommand(String)} and responses will be received on the {@link SocketSessionCallback}
37 * @author Tim Roberts - Initial contribution
39 public class SocketSession {
40 private final Logger logger = LoggerFactory.getLogger(SocketSession.class);
43 * The host/ip address to connect to
45 private final String host;
48 * The port to connect to
50 private final int port;
53 * The actual socket being used. Will be null if not connected
55 private Socket client;
58 * The writer to the {@link #client}. Will be null if not connected
60 private PrintStream writer;
63 * The reader from the {@link #client}. Will be null if not connected
65 private BufferedReader reader;
68 * The {@link ResponseReader} that will be used to read from {@link #reader}
70 private final ResponseReader responseReader = new ResponseReader();
73 * The responses read from the {@link #responseReader}
75 private final BlockingQueue<Object> responsesQueue = new ArrayBlockingQueue<>(50);
78 * The dispatcher of responses from {@link #responsesQueue}
80 private final Dispatcher dispatcher = new Dispatcher();
83 * The {@link SocketSessionCallback} that the {@link #dispatcher} will call
85 private AtomicReference<SocketSessionCallback> callback = new AtomicReference<>(null);
88 * Creates the socket session from the given host and port
90 * @param host a non-null, non-empty host/ip address
91 * @param port the port number between 1 and 65535
93 public SocketSession(String host, int port) {
94 if (host == null || host.trim().length() == 0) {
95 throw new IllegalArgumentException("Host cannot be null or empty");
98 if (port < 1 || port > 65535) {
99 throw new IllegalArgumentException("Port must be between 1 and 65535");
106 * Sets the {@link SocketSessionCallback} to use when calling back the
107 * responses that have been received.
109 * @param callback a non-null {@link SocketSessionCallback} to use
111 public void setCallback(SocketSessionCallback callback) {
112 if (callback == null) {
113 throw new IllegalArgumentException("callback cannot be null");
115 this.callback.set(callback);
119 * Will attempt to connect to the {@link #host} on port {@link #port}. If we are current connected, will
120 * {@link #disconnect()} first. Once connected, the {@link #writer} and {@link #reader} will be created, the
121 * {@link #dispatcher} and {@link #responseReader} will be started.
123 * @throws java.io.IOException if an exception occurs during the connection attempt
125 public void connect() throws IOException {
128 client = new Socket(host, port);
129 client.setKeepAlive(true);
130 client.setSoTimeout(1000); // allow reader to check to see if it should stop every 1 second
132 logger.debug("Connecting to {}:{}", host, port);
133 writer = new PrintStream(client.getOutputStream());
134 reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
136 new Thread(responseReader).start();
137 new Thread(dispatcher).start();
141 * Disconnects from the {@link #host} if we are {@link #isConnected()}. The {@link #writer}, {@link #reader} and
143 * will be closed and set to null. The {@link #dispatcher} and {@link #responseReader} will be stopped, the
144 * {@link #callback} will be nulled and the {@link #responsesQueue} will be cleared.
146 * @throws java.io.IOException if an exception occurs during the disconnect attempt
148 public void disconnect() throws IOException {
150 logger.debug("Disconnecting from {}:{}", host, port);
152 dispatcher.stopRunning();
153 responseReader.stopRunning();
165 responsesQueue.clear();
170 * Returns true if we are connected ({@link #client} is not null and is connected)
172 * @return true if connected, false otherwise
174 public boolean isConnected() {
175 return client != null && client.isConnected();
179 * Sends the specified command to the underlying socket
181 * @param command a non-null, non-empty command
182 * @throws java.io.IOException an exception that occurred while sending
184 public synchronized void sendCommand(String command) throws IOException {
185 if (command == null) {
186 throw new IllegalArgumentException("command cannot be null");
189 if (!isConnected()) {
190 throw new IOException("Cannot send message - disconnected");
193 logger.debug("Sending Command: '{}'", command);
194 writer.println(command + "\n"); // as pre spec - each command must have a newline
199 * This is the runnable that will read from the socket and add messages to the responses queue (to be processed by
202 * @author Tim Roberts
205 private class ResponseReader implements Runnable {
208 * Whether the reader is currently rRunning
210 private final AtomicBoolean isRunning = new AtomicBoolean(false);
213 * Locking to allow proper shutdown of the reader
215 private final Lock rLock = new ReentrantLock();
216 private final Condition rRunning = rLock.newCondition();
219 * Stops the reader. Will wait 5 seconds for the runnable to stop (should stop within 1 second based on the
222 public void stopRunning() {
225 if (isRunning.getAndSet(false)) {
226 if (!rRunning.await(5, TimeUnit.SECONDS)) {
227 logger.warn("Waited too long for dispatcher to finish");
230 } catch (InterruptedException e) {
238 * Runs the logic to read from the socket until {@link #isRunning} is false. A 'response' is anything that ends
239 * with a carriage-return/newline combo. Additionally, the special "login" prompts are
240 * treated as responses for purposes of logging in.
244 final StringBuilder sb = new StringBuilder(100);
248 responsesQueue.clear();
250 while (isRunning.get()) {
252 // if reader is null, sleep and try again
253 if (reader == null) {
260 responsesQueue.put(new IOException("server closed connection"));
261 isRunning.set(false);
264 final char ch = (char) c;
266 if (ch == '\n' || ch == ' ') {
267 final String str = sb.toString();
268 if (str.endsWith("\r\n") || str.endsWith("login: ")) {
270 final String response = str.substring(0, str.length() - 2);
271 logger.debug("Received response: {}", response);
272 responsesQueue.put(response);
275 // logger.debug(">>> reading: " + sb + ":" + (int) ch);
276 } catch (SocketTimeoutException e) {
277 // do nothing - we expect this (setSOTimeout) to check the _isReading
278 } catch (InterruptedException e) {
279 // Do nothing - probably shutting down
280 } catch (IOException e) {
282 isRunning.set(false);
283 responsesQueue.put(e);
284 } catch (InterruptedException e1) {
285 // Do nothing - probably shutting down
292 rRunning.signalAll();
300 * The dispatcher runnable is responsible for reading the response queue and dispatching it to the current callable.
301 * Since the dispatcher is ONLY started when a callable is set, responses may pile up in the queue and be dispatched
302 * when a callable is set. Unlike the socket reader, this can be assigned to another thread (no state outside of the
305 * @author Tim Roberts
307 private class Dispatcher implements Runnable {
310 * Whether the dispatcher is rRunning or not
312 private final AtomicBoolean dispatcherRunning = new AtomicBoolean(false);
315 * Locking to allow proper shutdown of the reader
317 private final Lock dLock = new ReentrantLock();
318 private final Condition dRunning = dLock.newCondition();
321 * Stops the reader. Will wait 5 seconds for the runnable to stop (should stop within 1 second based on the poll
324 public void stopRunning() {
327 if (dispatcherRunning.getAndSet(false)) {
328 if (!dRunning.await(5, TimeUnit.SECONDS)) {
329 logger.warn("Waited too long for dispatcher to finish");
332 } catch (InterruptedException e) {
340 * Runs the logic to dispatch any responses to the current callback until {@link #isRunning} is false.
344 dispatcherRunning.set(true);
345 while (dispatcherRunning.get()) {
347 final SocketSessionCallback ssCallback = callback.get();
349 // if callback is null, we don't want to start dispatching yet.
350 if (ssCallback == null) {
355 final Object response = responsesQueue.poll(1, TimeUnit.SECONDS);
357 if (response != null) {
358 if (response instanceof String) {
360 logger.debug("Dispatching response: {}", response);
361 ssCallback.responseReceived((String) response);
362 } catch (Exception e) {
363 logger.warn("Exception occurred processing the response '{}': ", response, e);
365 } else if (response instanceof Exception) {
366 logger.debug("Dispatching exception: {}", response);
367 ssCallback.responseException((Exception) response);
369 logger.error("Unknown response class: {}", response);
372 } catch (InterruptedException e) {
379 // Signal that we are done
380 dRunning.signalAll();