2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.satel.internal.protocol;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.util.Timer;
20 import java.util.TimerTask;
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.LinkedBlockingQueue;
24 import org.eclipse.jdt.annotation.NonNullByDefault;
25 import org.eclipse.jdt.annotation.Nullable;
26 import org.openhab.binding.satel.internal.command.IntegraVersionCommand;
27 import org.openhab.binding.satel.internal.command.SatelCommand;
28 import org.openhab.binding.satel.internal.command.SatelCommand.State;
29 import org.openhab.binding.satel.internal.event.ConnectionStatusEvent;
30 import org.openhab.binding.satel.internal.event.EventDispatcher;
31 import org.openhab.binding.satel.internal.event.IntegraVersionEvent;
32 import org.openhab.binding.satel.internal.event.SatelEventListener;
33 import org.openhab.binding.satel.internal.types.IntegraType;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 * This class represents abstract communication module and is responsible for
39 * exchanging data between the binding and connected physical module.
40 * Communication happens by sending commands and receiving response from the
41 * module. Each command class must extend {@link SatelCommand}.
43 * @author Krzysztof Goworek - Initial contribution
46 public abstract class SatelModule extends EventDispatcher implements SatelEventListener {
48 private final Logger logger = LoggerFactory.getLogger(SatelModule.class);
50 private static final byte FRAME_SYNC = (byte) 0xfe;
51 private static final byte FRAME_SYNC_ESC = (byte) 0xf0;
52 private static final byte[] FRAME_START = { FRAME_SYNC, FRAME_SYNC };
53 private static final byte[] FRAME_END = { FRAME_SYNC, (byte) 0x0d };
55 private final BlockingQueue<SatelCommand> sendQueue = new LinkedBlockingQueue<>();
57 private final int timeout;
58 private volatile IntegraType integraType;
59 private volatile String integraVersion;
60 private final boolean extPayloadSupport;
61 private @Nullable CommunicationChannel channel;
62 private @Nullable CommunicationWatchdog communicationWatchdog;
65 * Helper interface for connecting and disconnecting to specific module
66 * type. Each module type should implement these methods to provide input
67 * and output streams and way to disconnect from the module.
69 protected interface CommunicationChannel {
71 InputStream getInputStream() throws IOException;
73 OutputStream getOutputStream() throws IOException;
77 default boolean supportsReceiveTimeout() {
83 * Helper interface to handle communication timeouts.
85 protected interface TimeoutTimer {
93 * Thrown on connection failures.
95 protected static class ConnectionFailureException extends Exception {
97 private static final long serialVersionUID = 2L;
99 public ConnectionFailureException(String message) {
103 public ConnectionFailureException(String message, Throwable cause) {
104 super(message, cause);
109 * Creates new instance of the class.
111 * @param timeout timeout value in milliseconds for connect/read/write operations
112 * @param extPayloadSupport if <code>true</code>, the module supports extended command payload for reading
115 public SatelModule(int timeout, boolean extPayloadSupport) {
116 this.timeout = timeout;
117 this.integraType = IntegraType.UNKNOWN;
118 this.integraVersion = "";
119 this.extPayloadSupport = extPayloadSupport;
121 addEventListener(this);
125 * Returns type of Integra connected to the module.
127 * @return Integra type
129 public IntegraType getIntegraType() {
130 return this.integraType;
134 * Returns firmware revision of Integra connected to the module.
136 * @return version of Integra firmware
138 public String getIntegraVersion() {
139 return this.integraVersion;
143 * Returns configured timeout value.
145 * @return timeout value as milliseconds
147 public int getTimeout() {
151 public boolean isConnected() {
152 return this.channel != null;
156 * Returns status of initialization.
158 * @return <code>true</code> if module is properly initialized and ready for sending commands
160 public boolean isInitialized() {
161 return this.integraType != IntegraType.UNKNOWN;
165 * Returns extended payload flag.
167 * @return <code>true</code> if the module supports extended (32-bit) payload for zones/outputs
169 public boolean hasExtPayloadSupport() {
170 return this.extPayloadSupport;
173 protected abstract CommunicationChannel connect() throws ConnectionFailureException;
176 * Starts communication.
178 public synchronized void open() {
179 if (this.communicationWatchdog == null) {
180 this.communicationWatchdog = new CommunicationWatchdog();
182 logger.warn("Module is already opened.");
187 * Stops communication by disconnecting from the module and stopping all background tasks.
189 public void close() {
190 // first we clear watchdog field in the object
191 CommunicationWatchdog watchdog = null;
192 synchronized (this) {
193 if (this.communicationWatchdog != null) {
194 watchdog = this.communicationWatchdog;
195 this.communicationWatchdog = null;
198 // then, if watchdog exists, we close it
199 if (watchdog != null) {
205 * Enqueues specified command in send queue if not already enqueued.
207 * @param cmd command to enqueue
208 * @return <code>true</code> if operation succeeded
210 public boolean sendCommand(SatelCommand cmd) {
211 return this.sendCommand(cmd, false);
215 * Enqueues specified command in send queue.
217 * @param cmd command to enqueue
218 * @param force if <code>true</code> enqueues unconditionally
219 * @return <code>true</code> if operation succeeded
221 public boolean sendCommand(SatelCommand cmd, boolean force) {
223 if (force || !this.sendQueue.contains(cmd)) {
224 this.sendQueue.put(cmd);
225 cmd.setState(State.ENQUEUED);
226 logger.trace("Command enqueued: {}", cmd);
228 logger.debug("Command already in the queue: {}", cmd);
231 } catch (InterruptedException e) {
237 public void incomingEvent(IntegraVersionEvent event) {
238 IntegraVersionEvent versionEvent = event;
239 this.integraType = IntegraType.valueOf(versionEvent.getType() & 0xFF);
240 this.integraVersion = versionEvent.getVersion();
241 logger.info("Connection to {} initialized. INTEGRA version: {}.", this.integraType.getName(),
242 this.integraVersion);
245 private @Nullable SatelMessage readMessage() throws InterruptedException {
246 final CommunicationChannel channel = this.channel;
247 if (channel == null) {
248 logger.error("Reading attempt on closed channel.");
253 final InputStream is = channel.getInputStream();
254 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
255 boolean inMessage = false;
259 // if timed out, exit
267 if (b == FRAME_SYNC) {
269 // syncBytes == 0 means special sequence or end of message
270 // otherwise we discard all received bytes
271 if (syncBytes != 0) {
272 logger.warn("Received frame sync bytes, discarding input: {}", baos.size());
273 // clear gathered bytes, we wait for new message
281 if (syncBytes == 0) {
282 // in sync, we have next message byte
284 } else if (syncBytes == 1) {
285 if (b == FRAME_SYNC_ESC) {
286 baos.write(FRAME_SYNC);
287 } else if (b == FRAME_END[1]) {
291 logger.warn("Received invalid byte {}, discarding input: {}", String.format("%02X", b),
293 // clear gathered bytes, we have new message
298 logger.error("Sync bytes in message: {}", syncBytes);
300 } else if (syncBytes >= 2) {
301 // synced, we have first message byte
305 // otherwise we ignore all bytes until synced
309 // if meanwhile thread has been interrupted, exit the loop
310 if (Thread.interrupted()) {
311 throw new InterruptedException();
315 // return read message
316 return SatelMessage.fromBytes(baos.toByteArray());
317 } catch (IOException e) {
318 if (!Thread.currentThread().isInterrupted()) {
319 logger.error("Unexpected exception occurred during reading a message", e);
326 private boolean writeMessage(SatelMessage message) {
327 final CommunicationChannel channel = this.channel;
328 if (channel == null) {
329 logger.error("Writing attempt on closed channel.");
334 final OutputStream os = channel.getOutputStream();
336 os.write(FRAME_START);
337 for (byte b : message.getBytes()) {
339 if (b == FRAME_SYNC) {
340 os.write(FRAME_SYNC_ESC);
346 } catch (IOException e) {
347 if (!Thread.currentThread().isInterrupted()) {
348 logger.error("Unexpected exception occurred during writing a message", e);
355 private synchronized void disconnect(@Nullable String reason) {
356 // remove all pending commands from the queue
357 // notifying about send failure
359 while ((cmd = this.sendQueue.poll()) != null) {
360 cmd.setState(State.FAILED);
362 final CommunicationChannel channel = this.channel;
363 if (channel != null) {
364 channel.disconnect();
366 // notify about connection status change
367 this.dispatchEvent(new ConnectionStatusEvent(false, reason));
371 private void communicationLoop(TimeoutTimer timeoutTimer) {
372 long reconnectionTime = 10 * 1000;
373 boolean receivedResponse = false;
374 SatelCommand command = null;
375 String disconnectReason = null;
378 while (!Thread.currentThread().isInterrupted()) {
379 // connect, if not connected yet
380 if (this.channel == null) {
381 long connectStartTime = System.currentTimeMillis();
383 synchronized (this) {
384 this.channel = connect();
386 } catch (ConnectionFailureException e) {
387 logger.debug("Connection failed", e);
388 // notify about connection failure
389 this.dispatchEvent(new ConnectionStatusEvent(false, e.getMessage()));
390 // try to reconnect after a while, if connection hasn't
392 Thread.sleep(reconnectionTime - System.currentTimeMillis() + connectStartTime);
397 // get next command and send it
398 command = this.sendQueue.take();
399 logger.debug("Sending message: {}", command.getRequest());
400 timeoutTimer.start();
401 boolean sent = this.writeMessage(command.getRequest());
406 command.setState(State.SENT);
408 SatelMessage response;
410 // command sent, wait for response
411 logger.trace("Waiting for response");
412 timeoutTimer.start();
413 response = this.readMessage();
415 if (response == null) {
418 logger.debug("Got response: {}", response);
420 if (!receivedResponse) {
421 receivedResponse = true;
422 // notify about connection success after first
423 // response from the module
424 this.dispatchEvent(new ConnectionStatusEvent(true));
426 if (command.matches(response)) {
429 logger.info("Ignoring response, it does not match command {}: {}",
430 String.format("%02X", command.getRequest().getCommand()), response);
431 } while (!Thread.currentThread().isInterrupted());
433 if (response == null) {
437 if (command.handleResponse(this, response)) {
438 command.setState(State.SUCCEEDED);
440 command.setState(State.FAILED);
445 } catch (InterruptedException e) {
447 } catch (Exception e) {
448 // unexpected error, log and exit thread
449 logger.info("Unhandled exception occurred in communication loop, disconnecting.", e);
450 disconnectReason = "Unhandled exception: " + e.toString();
452 // stop counting if thread interrupted
456 // either send or receive failed
457 if (command != null) {
458 command.setState(State.FAILED);
461 disconnect(disconnectReason);
465 * Respawns communication thread in case on any error and interrupts it in
466 * case read/write operations take too long.
468 private class CommunicationWatchdog extends Timer implements TimeoutTimer {
469 private @Nullable Thread thread;
470 private volatile long lastActivity;
472 public CommunicationWatchdog() {
474 this.lastActivity = 0;
476 this.schedule(new TimerTask() {
479 CommunicationWatchdog.this.checkThread();
485 public void start() {
486 this.lastActivity = System.currentTimeMillis();
491 this.lastActivity = 0;
494 public synchronized void close() {
495 // cancel timer first to prevent reconnect
497 // then stop communication thread
498 final Thread thread = this.thread;
499 if (thread != null) {
503 } catch (InterruptedException e) {
509 private void startCommunication() {
510 Thread thread = this.thread;
511 if (thread != null && thread.isAlive()) {
512 logger.error("Start communication canceled: communication thread is still alive");
516 thread = new Thread(new Runnable() {
519 logger.debug("Communication thread started");
520 SatelModule.this.communicationLoop(CommunicationWatchdog.this);
521 logger.debug("Communication thread stopped");
525 this.thread = thread;
526 // if module is not initialized yet, send version command
527 if (!SatelModule.this.isInitialized()) {
528 SatelModule.this.sendCommand(new IntegraVersionCommand());
532 private void checkThread() {
533 final Thread thread = this.thread;
534 logger.trace("Checking communication thread: {}, {}", thread != null,
535 Boolean.toString(thread != null && thread.isAlive()));
536 if (thread != null && thread.isAlive()) {
537 final long timePassed = (this.lastActivity == 0) ? 0 : System.currentTimeMillis() - this.lastActivity;
538 final CommunicationChannel channel = SatelModule.this.channel;
540 if (channel != null && !channel.supportsReceiveTimeout() && timePassed > SatelModule.this.timeout) {
541 logger.error("Send/receive timeout, disconnecting module.");
545 // wait for the thread to terminate
547 } catch (InterruptedException e) {
550 SatelModule.this.disconnect("Send/receive timeout");
553 startCommunication();