2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.satel.internal.protocol;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.util.Timer;
20 import java.util.TimerTask;
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.LinkedBlockingQueue;
24 import org.eclipse.jdt.annotation.NonNullByDefault;
25 import org.eclipse.jdt.annotation.Nullable;
26 import org.openhab.binding.satel.internal.command.IntegraVersionCommand;
27 import org.openhab.binding.satel.internal.command.SatelCommand;
28 import org.openhab.binding.satel.internal.command.SatelCommand.State;
29 import org.openhab.binding.satel.internal.event.ConnectionStatusEvent;
30 import org.openhab.binding.satel.internal.event.EventDispatcher;
31 import org.openhab.binding.satel.internal.event.IntegraVersionEvent;
32 import org.openhab.binding.satel.internal.event.SatelEventListener;
33 import org.openhab.binding.satel.internal.types.IntegraType;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 * This class represents abstract communication module and is responsible for
39 * exchanging data between the binding and connected physical module.
40 * Communication happens by sending commands and receiving response from the
41 * module. Each command class must extend {@link SatelCommand}.
43 * @author Krzysztof Goworek - Initial contribution
46 public abstract class SatelModule extends EventDispatcher implements SatelEventListener {
48 private final Logger logger = LoggerFactory.getLogger(SatelModule.class);
50 private static final byte FRAME_SYNC = (byte) 0xfe;
51 private static final byte FRAME_SYNC_ESC = (byte) 0xf0;
52 private static final byte[] FRAME_START = { FRAME_SYNC, FRAME_SYNC };
53 private static final byte[] FRAME_END = { FRAME_SYNC, (byte) 0x0d };
55 private final BlockingQueue<SatelCommand> sendQueue = new LinkedBlockingQueue<>();
57 private final int timeout;
58 private volatile IntegraType integraType;
59 private volatile String integraVersion;
60 private final boolean extPayloadSupport;
61 private @Nullable CommunicationChannel channel;
62 private @Nullable CommunicationWatchdog communicationWatchdog;
65 * Helper interface for connecting and disconnecting to specific module
66 * type. Each module type should implement these methods to provide input
67 * and output streams and way to disconnect from the module.
69 protected interface CommunicationChannel {
71 InputStream getInputStream() throws IOException;
73 OutputStream getOutputStream() throws IOException;
79 * Helper interface to handle communication timeouts.
81 protected interface TimeoutTimer {
89 * Thrown on connection failures.
91 protected static class ConnectionFailureException extends Exception {
93 private static final long serialVersionUID = 2L;
95 public ConnectionFailureException(String message) {
99 public ConnectionFailureException(String message, Throwable cause) {
100 super(message, cause);
105 * Creates new instance of the class.
107 * @param timeout timeout value in milliseconds for connect/read/write operations
108 * @param extPayloadSupport if <code>true</code>, the module supports extended command payload for reading
111 public SatelModule(int timeout, boolean extPayloadSupport) {
112 this.timeout = timeout;
113 this.integraType = IntegraType.UNKNOWN;
114 this.integraVersion = "";
115 this.extPayloadSupport = extPayloadSupport;
117 addEventListener(this);
121 * Returns type of Integra connected to the module.
123 * @return Integra type
125 public IntegraType getIntegraType() {
126 return this.integraType;
130 * Returns firmware revision of Integra connected to the module.
132 * @return version of Integra firmware
134 public String getIntegraVersion() {
135 return this.integraVersion;
139 * Returns configured timeout value.
141 * @return timeout value as milliseconds
143 public int getTimeout() {
147 public boolean isConnected() {
148 return this.channel != null;
152 * Returns status of initialization.
154 * @return <code>true</code> if module is properly initialized and ready for sending commands
156 public boolean isInitialized() {
157 return this.integraType != IntegraType.UNKNOWN;
161 * Returns extended payload flag.
163 * @return <code>true</code> if the module supports extended (32-bit) payload for zones/outputs
165 public boolean hasExtPayloadSupport() {
166 return this.extPayloadSupport;
169 protected abstract CommunicationChannel connect() throws ConnectionFailureException;
172 * Starts communication.
174 public synchronized void open() {
175 if (this.communicationWatchdog == null) {
176 this.communicationWatchdog = new CommunicationWatchdog();
178 logger.warn("Module is already opened.");
183 * Stops communication by disconnecting from the module and stopping all background tasks.
185 public void close() {
186 // first we clear watchdog field in the object
187 CommunicationWatchdog watchdog = null;
188 synchronized (this) {
189 if (this.communicationWatchdog != null) {
190 watchdog = this.communicationWatchdog;
191 this.communicationWatchdog = null;
194 // then, if watchdog exists, we close it
195 if (watchdog != null) {
201 * Enqueues specified command in send queue if not already enqueued.
203 * @param cmd command to enqueue
204 * @return <code>true</code> if operation succeeded
206 public boolean sendCommand(SatelCommand cmd) {
207 return this.sendCommand(cmd, false);
211 * Enqueues specified command in send queue.
213 * @param cmd command to enqueue
214 * @param force if <code>true</code> enqueues unconditionally
215 * @return <code>true</code> if operation succeeded
217 public boolean sendCommand(SatelCommand cmd, boolean force) {
219 if (force || !this.sendQueue.contains(cmd)) {
220 this.sendQueue.put(cmd);
221 cmd.setState(State.ENQUEUED);
222 logger.trace("Command enqueued: {}", cmd);
224 logger.debug("Command already in the queue: {}", cmd);
227 } catch (InterruptedException e) {
233 public void incomingEvent(IntegraVersionEvent event) {
234 IntegraVersionEvent versionEvent = event;
235 this.integraType = IntegraType.valueOf(versionEvent.getType() & 0xFF);
236 this.integraVersion = versionEvent.getVersion();
237 logger.info("Connection to {} initialized. INTEGRA version: {}.", this.integraType.getName(),
238 this.integraVersion);
241 private @Nullable SatelMessage readMessage() throws InterruptedException {
242 final CommunicationChannel channel = this.channel;
243 if (channel == null) {
244 logger.error("Reading attempt on closed channel.");
249 final InputStream is = channel.getInputStream();
250 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
251 boolean inMessage = false;
255 // if timed out, exit
263 if (b == FRAME_SYNC) {
265 // syncBytes == 0 means special sequence or end of message
266 // otherwise we discard all received bytes
267 if (syncBytes != 0) {
268 logger.warn("Received frame sync bytes, discarding input: {}", baos.size());
269 // clear gathered bytes, we wait for new message
277 if (syncBytes == 0) {
278 // in sync, we have next message byte
280 } else if (syncBytes == 1) {
281 if (b == FRAME_SYNC_ESC) {
282 baos.write(FRAME_SYNC);
283 } else if (b == FRAME_END[1]) {
287 logger.warn("Received invalid byte {}, discarding input: {}", String.format("%02X", b),
289 // clear gathered bytes, we have new message
294 logger.error("Sync bytes in message: {}", syncBytes);
296 } else if (syncBytes >= 2) {
297 // synced, we have first message byte
301 // otherwise we ignore all bytes until synced
305 // if meanwhile thread has been interrupted, exit the loop
306 if (Thread.interrupted()) {
307 throw new InterruptedException();
311 // return read message
312 return SatelMessage.fromBytes(baos.toByteArray());
313 } catch (IOException e) {
314 if (!Thread.currentThread().isInterrupted()) {
315 logger.error("Unexpected exception occurred during reading a message", e);
322 private boolean writeMessage(SatelMessage message) {
323 final CommunicationChannel channel = this.channel;
324 if (channel == null) {
325 logger.error("Writing attempt on closed channel.");
330 final OutputStream os = channel.getOutputStream();
332 os.write(FRAME_START);
333 for (byte b : message.getBytes()) {
335 if (b == FRAME_SYNC) {
336 os.write(FRAME_SYNC_ESC);
342 } catch (IOException e) {
343 if (!Thread.currentThread().isInterrupted()) {
344 logger.error("Unexpected exception occurred during writing a message", e);
351 private synchronized void disconnect(@Nullable String reason) {
352 // remove all pending commands from the queue
353 // notifying about send failure
354 while (!this.sendQueue.isEmpty()) {
355 SatelCommand cmd = this.sendQueue.poll();
356 cmd.setState(State.FAILED);
358 final CommunicationChannel channel = this.channel;
359 if (channel != null) {
360 channel.disconnect();
362 // notify about connection status change
363 this.dispatchEvent(new ConnectionStatusEvent(false, reason));
367 private void communicationLoop(TimeoutTimer timeoutTimer) {
368 long reconnectionTime = 10 * 1000;
369 boolean receivedResponse = false;
370 SatelCommand command = null;
371 String disconnectReason = null;
374 while (!Thread.currentThread().isInterrupted()) {
375 // connect, if not connected yet
376 if (this.channel == null) {
377 long connectStartTime = System.currentTimeMillis();
379 synchronized (this) {
380 this.channel = connect();
382 } catch (ConnectionFailureException e) {
383 logger.debug("Connection failed", e);
384 // notify about connection failure
385 this.dispatchEvent(new ConnectionStatusEvent(false, e.getMessage()));
386 // try to reconnect after a while, if connection hasn't
388 Thread.sleep(reconnectionTime - System.currentTimeMillis() + connectStartTime);
393 // get next command and send it
394 command = this.sendQueue.take();
395 logger.debug("Sending message: {}", command.getRequest());
396 timeoutTimer.start();
397 boolean sent = this.writeMessage(command.getRequest());
402 command.setState(State.SENT);
404 SatelMessage response;
406 // command sent, wait for response
407 logger.trace("Waiting for response");
408 timeoutTimer.start();
409 response = this.readMessage();
411 if (response == null) {
414 logger.debug("Got response: {}", response);
416 if (!receivedResponse) {
417 receivedResponse = true;
418 // notify about connection success after first
419 // response from the module
420 this.dispatchEvent(new ConnectionStatusEvent(true));
422 if (command.matches(response)) {
425 logger.info("Ignoring response, it does not match command {}: {}",
426 String.format("%02X", command.getRequest().getCommand()), response);
427 } while (!Thread.currentThread().isInterrupted());
429 if (response == null) {
433 if (command.handleResponse(this, response)) {
434 command.setState(State.SUCCEEDED);
436 command.setState(State.FAILED);
441 } catch (InterruptedException e) {
443 } catch (Exception e) {
444 // unexpected error, log and exit thread
445 logger.info("Unhandled exception occurred in communication loop, disconnecting.", e);
446 disconnectReason = "Unhandled exception: " + e.toString();
448 // stop counting if thread interrupted
452 // either send or receive failed
453 if (command != null) {
454 command.setState(State.FAILED);
457 disconnect(disconnectReason);
461 * Respawns communication thread in case on any error and interrupts it in
462 * case read/write operations take too long.
464 private class CommunicationWatchdog extends Timer implements TimeoutTimer {
465 private @Nullable Thread thread;
466 private volatile long lastActivity;
468 public CommunicationWatchdog() {
470 this.lastActivity = 0;
472 this.schedule(new TimerTask() {
475 CommunicationWatchdog.this.checkThread();
481 public void start() {
482 this.lastActivity = System.currentTimeMillis();
487 this.lastActivity = 0;
490 public synchronized void close() {
491 // cancel timer first to prevent reconnect
493 // then stop communication thread
494 final Thread thread = this.thread;
495 if (thread != null) {
499 } catch (InterruptedException e) {
505 private void startCommunication() {
506 final Thread thread = this.thread;
507 if (thread != null && thread.isAlive()) {
508 logger.error("Start communication canceled: communication thread is still alive");
512 this.thread = new Thread(new Runnable() {
515 logger.debug("Communication thread started");
516 SatelModule.this.communicationLoop(CommunicationWatchdog.this);
517 logger.debug("Communication thread stopped");
521 // if module is not initialized yet, send version command
522 if (!SatelModule.this.isInitialized()) {
523 SatelModule.this.sendCommand(new IntegraVersionCommand());
527 private void checkThread() {
528 final Thread thread = this.thread;
529 logger.trace("Checking communication thread: {}, {}", thread != null,
530 Boolean.toString(thread != null && thread.isAlive()));
531 if (thread != null && thread.isAlive()) {
532 long timePassed = (this.lastActivity == 0) ? 0 : System.currentTimeMillis() - this.lastActivity;
534 if (timePassed > SatelModule.this.timeout) {
535 logger.error("Send/receive timeout, disconnecting module.");
539 // wait for the thread to terminate
541 } catch (InterruptedException e) {
544 SatelModule.this.disconnect("Send/receive timeout");
547 startCommunication();