]> git.basschouten.com Git - openhab-addons.git/blob
a79aae5a55b5812b71b3483402ce4771af823a27
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.satel.internal.protocol;
14
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.util.Timer;
20 import java.util.TimerTask;
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.LinkedBlockingQueue;
23
24 import org.eclipse.jdt.annotation.NonNullByDefault;
25 import org.eclipse.jdt.annotation.Nullable;
26 import org.openhab.binding.satel.internal.command.IntegraVersionCommand;
27 import org.openhab.binding.satel.internal.command.SatelCommand;
28 import org.openhab.binding.satel.internal.command.SatelCommand.State;
29 import org.openhab.binding.satel.internal.event.ConnectionStatusEvent;
30 import org.openhab.binding.satel.internal.event.EventDispatcher;
31 import org.openhab.binding.satel.internal.event.IntegraVersionEvent;
32 import org.openhab.binding.satel.internal.event.SatelEventListener;
33 import org.openhab.binding.satel.internal.types.IntegraType;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * This class represents abstract communication module and is responsible for
39  * exchanging data between the binding and connected physical module.
40  * Communication happens by sending commands and receiving response from the
41  * module. Each command class must extend {@link SatelCommand}.
42  *
43  * @author Krzysztof Goworek - Initial contribution
44  */
45 @NonNullByDefault
46 public abstract class SatelModule extends EventDispatcher implements SatelEventListener {
47
48     private final Logger logger = LoggerFactory.getLogger(SatelModule.class);
49
50     private static final byte FRAME_SYNC = (byte) 0xfe;
51     private static final byte FRAME_SYNC_ESC = (byte) 0xf0;
52     private static final byte[] FRAME_START = { FRAME_SYNC, FRAME_SYNC };
53     private static final byte[] FRAME_END = { FRAME_SYNC, (byte) 0x0d };
54
55     private final BlockingQueue<SatelCommand> sendQueue = new LinkedBlockingQueue<>();
56
57     private final int timeout;
58     private volatile IntegraType integraType;
59     private volatile String integraVersion;
60     private final boolean extPayloadSupport;
61     private @Nullable CommunicationChannel channel;
62     private @Nullable CommunicationWatchdog communicationWatchdog;
63
64     /*
65      * Helper interface for connecting and disconnecting to specific module
66      * type. Each module type should implement these methods to provide input
67      * and output streams and way to disconnect from the module.
68      */
69     protected interface CommunicationChannel {
70
71         InputStream getInputStream() throws IOException;
72
73         OutputStream getOutputStream() throws IOException;
74
75         void disconnect();
76
77         default boolean supportsReceiveTimeout() {
78             return false;
79         }
80     }
81
82     /*
83      * Helper interface to handle communication timeouts.
84      */
85     protected interface TimeoutTimer {
86
87         void start();
88
89         void stop();
90     }
91
92     /*
93      * Thrown on connection failures.
94      */
95     protected static class ConnectionFailureException extends Exception {
96
97         private static final long serialVersionUID = 2L;
98
99         public ConnectionFailureException(String message) {
100             super(message);
101         }
102
103         public ConnectionFailureException(String message, Throwable cause) {
104             super(message, cause);
105         }
106     }
107
108     /**
109      * Creates new instance of the class.
110      *
111      * @param timeout timeout value in milliseconds for connect/read/write operations
112      * @param extPayloadSupport if <code>true</code>, the module supports extended command payload for reading
113      *            INTEGRA256 state
114      */
115     public SatelModule(int timeout, boolean extPayloadSupport) {
116         this.timeout = timeout;
117         this.integraType = IntegraType.UNKNOWN;
118         this.integraVersion = "";
119         this.extPayloadSupport = extPayloadSupport;
120
121         addEventListener(this);
122     }
123
124     /**
125      * Returns type of Integra connected to the module.
126      *
127      * @return Integra type
128      */
129     public IntegraType getIntegraType() {
130         return this.integraType;
131     }
132
133     /**
134      * Returns firmware revision of Integra connected to the module.
135      *
136      * @return version of Integra firmware
137      */
138     public String getIntegraVersion() {
139         return this.integraVersion;
140     }
141
142     /**
143      * Returns configured timeout value.
144      *
145      * @return timeout value as milliseconds
146      */
147     public int getTimeout() {
148         return this.timeout;
149     }
150
151     public boolean isConnected() {
152         return this.channel != null;
153     }
154
155     /**
156      * Returns status of initialization.
157      *
158      * @return <code>true</code> if module is properly initialized and ready for sending commands
159      */
160     public boolean isInitialized() {
161         return this.integraType != IntegraType.UNKNOWN;
162     }
163
164     /**
165      * Returns extended payload flag.
166      *
167      * @return <code>true</code> if the module supports extended (32-bit) payload for zones/outputs
168      */
169     public boolean hasExtPayloadSupport() {
170         return this.extPayloadSupport;
171     }
172
173     protected abstract CommunicationChannel connect() throws ConnectionFailureException;
174
175     /**
176      * Starts communication.
177      */
178     public synchronized void open() {
179         if (this.communicationWatchdog == null) {
180             this.communicationWatchdog = new CommunicationWatchdog();
181         } else {
182             logger.warn("Module is already opened.");
183         }
184     }
185
186     /**
187      * Stops communication by disconnecting from the module and stopping all background tasks.
188      */
189     public void close() {
190         // first we clear watchdog field in the object
191         CommunicationWatchdog watchdog = null;
192         synchronized (this) {
193             if (this.communicationWatchdog != null) {
194                 watchdog = this.communicationWatchdog;
195                 this.communicationWatchdog = null;
196             }
197         }
198         // then, if watchdog exists, we close it
199         if (watchdog != null) {
200             watchdog.close();
201         }
202     }
203
204     /**
205      * Enqueues specified command in send queue if not already enqueued.
206      *
207      * @param cmd command to enqueue
208      * @return <code>true</code> if operation succeeded
209      */
210     public boolean sendCommand(SatelCommand cmd) {
211         return this.sendCommand(cmd, false);
212     }
213
214     /**
215      * Enqueues specified command in send queue.
216      *
217      * @param cmd command to enqueue
218      * @param force if <code>true</code> enqueues unconditionally
219      * @return <code>true</code> if operation succeeded
220      */
221     public boolean sendCommand(SatelCommand cmd, boolean force) {
222         try {
223             if (force || !this.sendQueue.contains(cmd)) {
224                 this.sendQueue.put(cmd);
225                 cmd.setState(State.ENQUEUED);
226                 logger.trace("Command enqueued: {}", cmd);
227             } else {
228                 logger.debug("Command already in the queue: {}", cmd);
229             }
230             return true;
231         } catch (InterruptedException e) {
232             return false;
233         }
234     }
235
236     @Override
237     public void incomingEvent(IntegraVersionEvent event) {
238         IntegraVersionEvent versionEvent = event;
239         this.integraType = IntegraType.valueOf(versionEvent.getType() & 0xFF);
240         this.integraVersion = versionEvent.getVersion();
241         logger.info("Connection to {} initialized. INTEGRA version: {}.", this.integraType.getName(),
242                 this.integraVersion);
243     }
244
245     private @Nullable SatelMessage readMessage() throws InterruptedException {
246         final CommunicationChannel channel = this.channel;
247         if (channel == null) {
248             logger.error("Reading attempt on closed channel.");
249             return null;
250         }
251
252         try {
253             final InputStream is = channel.getInputStream();
254             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
255             boolean inMessage = false;
256             int syncBytes = 0;
257
258             while (true) {
259                 // if timed out, exit
260                 int c = is.read();
261                 if (c < 0) {
262                     return null;
263                 }
264
265                 byte b = (byte) c;
266
267                 if (b == FRAME_SYNC) {
268                     if (inMessage) {
269                         // syncBytes == 0 means special sequence or end of message
270                         // otherwise we discard all received bytes
271                         if (syncBytes != 0) {
272                             logger.warn("Received frame sync bytes, discarding input: {}", baos.size());
273                             // clear gathered bytes, we wait for new message
274                             inMessage = false;
275                             baos.reset();
276                         }
277                     }
278                     ++syncBytes;
279                 } else {
280                     if (inMessage) {
281                         if (syncBytes == 0) {
282                             // in sync, we have next message byte
283                             baos.write(b);
284                         } else if (syncBytes == 1) {
285                             if (b == FRAME_SYNC_ESC) {
286                                 baos.write(FRAME_SYNC);
287                             } else if (b == FRAME_END[1]) {
288                                 // end of message
289                                 break;
290                             } else {
291                                 logger.warn("Received invalid byte {}, discarding input: {}", String.format("%02X", b),
292                                         baos.size());
293                                 // clear gathered bytes, we have new message
294                                 inMessage = false;
295                                 baos.reset();
296                             }
297                         } else {
298                             logger.error("Sync bytes in message: {}", syncBytes);
299                         }
300                     } else if (syncBytes >= 2) {
301                         // synced, we have first message byte
302                         inMessage = true;
303                         baos.write(b);
304                     }
305                     // otherwise we ignore all bytes until synced
306                     syncBytes = 0;
307                 }
308
309                 // if meanwhile thread has been interrupted, exit the loop
310                 if (Thread.interrupted()) {
311                     throw new InterruptedException();
312                 }
313             }
314
315             // return read message
316             return SatelMessage.fromBytes(baos.toByteArray());
317         } catch (IOException e) {
318             if (!Thread.currentThread().isInterrupted()) {
319                 logger.error("Unexpected exception occurred during reading a message", e);
320             }
321         }
322
323         return null;
324     }
325
326     private boolean writeMessage(SatelMessage message) {
327         final CommunicationChannel channel = this.channel;
328         if (channel == null) {
329             logger.error("Writing attempt on closed channel.");
330             return false;
331         }
332
333         try {
334             final OutputStream os = channel.getOutputStream();
335
336             os.write(FRAME_START);
337             for (byte b : message.getBytes()) {
338                 os.write(b);
339                 if (b == FRAME_SYNC) {
340                     os.write(FRAME_SYNC_ESC);
341                 }
342             }
343             os.write(FRAME_END);
344             os.flush();
345             return true;
346         } catch (IOException e) {
347             if (!Thread.currentThread().isInterrupted()) {
348                 logger.error("Unexpected exception occurred during writing a message", e);
349             }
350         }
351
352         return false;
353     }
354
355     private synchronized void disconnect(@Nullable String reason) {
356         // remove all pending commands from the queue
357         // notifying about send failure
358         SatelCommand cmd;
359         while ((cmd = this.sendQueue.poll()) != null) {
360             cmd.setState(State.FAILED);
361         }
362         final CommunicationChannel channel = this.channel;
363         if (channel != null) {
364             channel.disconnect();
365             this.channel = null;
366             // notify about connection status change
367             this.dispatchEvent(new ConnectionStatusEvent(false, reason));
368         }
369     }
370
371     private void communicationLoop(TimeoutTimer timeoutTimer) {
372         long reconnectionTime = 10 * 1000;
373         boolean receivedResponse = false;
374         SatelCommand command = null;
375         String disconnectReason = null;
376
377         try {
378             while (!Thread.currentThread().isInterrupted()) {
379                 // connect, if not connected yet
380                 if (this.channel == null) {
381                     long connectStartTime = System.currentTimeMillis();
382                     try {
383                         synchronized (this) {
384                             this.channel = connect();
385                         }
386                     } catch (ConnectionFailureException e) {
387                         logger.debug("Connection failed", e);
388                         // notify about connection failure
389                         this.dispatchEvent(new ConnectionStatusEvent(false, e.getMessage()));
390                         // try to reconnect after a while, if connection hasn't
391                         // been established
392                         Thread.sleep(reconnectionTime - System.currentTimeMillis() + connectStartTime);
393                         continue;
394                     }
395                 }
396
397                 // get next command and send it
398                 command = this.sendQueue.take();
399                 logger.debug("Sending message: {}", command.getRequest());
400                 timeoutTimer.start();
401                 boolean sent = this.writeMessage(command.getRequest());
402                 timeoutTimer.stop();
403                 if (!sent) {
404                     break;
405                 }
406                 command.setState(State.SENT);
407
408                 SatelMessage response;
409                 do {
410                     // command sent, wait for response
411                     logger.trace("Waiting for response");
412                     timeoutTimer.start();
413                     response = this.readMessage();
414                     timeoutTimer.stop();
415                     if (response == null) {
416                         break;
417                     }
418                     logger.debug("Got response: {}", response);
419
420                     if (!receivedResponse) {
421                         receivedResponse = true;
422                         // notify about connection success after first
423                         // response from the module
424                         this.dispatchEvent(new ConnectionStatusEvent(true));
425                     }
426                     if (command.matches(response)) {
427                         break;
428                     }
429                     logger.info("Ignoring response, it does not match command {}: {}",
430                             String.format("%02X", command.getRequest().getCommand()), response);
431                 } while (!Thread.currentThread().isInterrupted());
432
433                 if (response == null) {
434                     break;
435                 }
436
437                 if (command.handleResponse(this, response)) {
438                     command.setState(State.SUCCEEDED);
439                 } else {
440                     command.setState(State.FAILED);
441                 }
442
443                 command = null;
444             }
445         } catch (InterruptedException e) {
446             // exit thread
447         } catch (Exception e) {
448             // unexpected error, log and exit thread
449             logger.info("Unhandled exception occurred in communication loop, disconnecting.", e);
450             disconnectReason = "Unhandled exception: " + e.toString();
451         } finally {
452             // stop counting if thread interrupted
453             timeoutTimer.stop();
454         }
455
456         // either send or receive failed
457         if (command != null) {
458             command.setState(State.FAILED);
459         }
460
461         disconnect(disconnectReason);
462     }
463
464     /*
465      * Respawns communication thread in case on any error and interrupts it in
466      * case read/write operations take too long.
467      */
468     private class CommunicationWatchdog extends Timer implements TimeoutTimer {
469         private @Nullable Thread thread;
470         private volatile long lastActivity;
471
472         public CommunicationWatchdog() {
473             this.thread = null;
474             this.lastActivity = 0;
475
476             this.schedule(new TimerTask() {
477                 @Override
478                 public void run() {
479                     CommunicationWatchdog.this.checkThread();
480                 }
481             }, 0, 1000);
482         }
483
484         @Override
485         public void start() {
486             this.lastActivity = System.currentTimeMillis();
487         }
488
489         @Override
490         public void stop() {
491             this.lastActivity = 0;
492         }
493
494         public synchronized void close() {
495             // cancel timer first to prevent reconnect
496             this.cancel();
497             // then stop communication thread
498             final Thread thread = this.thread;
499             if (thread != null) {
500                 thread.interrupt();
501                 try {
502                     thread.join();
503                 } catch (InterruptedException e) {
504                     // ignore
505                 }
506             }
507         }
508
509         private void startCommunication() {
510             Thread thread = this.thread;
511             if (thread != null && thread.isAlive()) {
512                 logger.error("Start communication canceled: communication thread is still alive");
513                 return;
514             }
515             // start new thread
516             thread = new Thread(new Runnable() {
517                 @Override
518                 public void run() {
519                     logger.debug("Communication thread started");
520                     SatelModule.this.communicationLoop(CommunicationWatchdog.this);
521                     logger.debug("Communication thread stopped");
522                 }
523             });
524             thread.start();
525             this.thread = thread;
526             // if module is not initialized yet, send version command
527             if (!SatelModule.this.isInitialized()) {
528                 SatelModule.this.sendCommand(new IntegraVersionCommand());
529             }
530         }
531
532         private void checkThread() {
533             final Thread thread = this.thread;
534             logger.trace("Checking communication thread: {}, {}", thread != null,
535                     Boolean.toString(thread != null && thread.isAlive()));
536             if (thread != null && thread.isAlive()) {
537                 final long timePassed = (this.lastActivity == 0) ? 0 : System.currentTimeMillis() - this.lastActivity;
538                 final CommunicationChannel channel = SatelModule.this.channel;
539
540                 if (channel != null && !channel.supportsReceiveTimeout() && timePassed > SatelModule.this.timeout) {
541                     logger.error("Send/receive timeout, disconnecting module.");
542                     stop();
543                     thread.interrupt();
544                     try {
545                         // wait for the thread to terminate
546                         thread.join(100);
547                     } catch (InterruptedException e) {
548                         // ignore
549                     }
550                     SatelModule.this.disconnect("Send/receive timeout");
551                 }
552             } else {
553                 startCommunication();
554             }
555         }
556     }
557 }