]> git.basschouten.com Git - openhab-addons.git/blob
20e272725504efe0952f2898b4b73b9c522278df
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.satel.internal.protocol;
14
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.util.Timer;
20 import java.util.TimerTask;
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.LinkedBlockingQueue;
23
24 import org.eclipse.jdt.annotation.NonNullByDefault;
25 import org.eclipse.jdt.annotation.Nullable;
26 import org.openhab.binding.satel.internal.command.IntegraVersionCommand;
27 import org.openhab.binding.satel.internal.command.SatelCommand;
28 import org.openhab.binding.satel.internal.command.SatelCommand.State;
29 import org.openhab.binding.satel.internal.event.ConnectionStatusEvent;
30 import org.openhab.binding.satel.internal.event.EventDispatcher;
31 import org.openhab.binding.satel.internal.event.IntegraVersionEvent;
32 import org.openhab.binding.satel.internal.event.SatelEventListener;
33 import org.openhab.binding.satel.internal.types.IntegraType;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * This class represents abstract communication module and is responsible for
39  * exchanging data between the binding and connected physical module.
40  * Communication happens by sending commands and receiving response from the
41  * module. Each command class must extend {@link SatelCommand}.
42  *
43  * @author Krzysztof Goworek - Initial contribution
44  */
45 @NonNullByDefault
46 public abstract class SatelModule extends EventDispatcher implements SatelEventListener {
47
48     private final Logger logger = LoggerFactory.getLogger(SatelModule.class);
49
50     private static final byte FRAME_SYNC = (byte) 0xfe;
51     private static final byte FRAME_SYNC_ESC = (byte) 0xf0;
52     private static final byte[] FRAME_START = { FRAME_SYNC, FRAME_SYNC };
53     private static final byte[] FRAME_END = { FRAME_SYNC, (byte) 0x0d };
54
55     private final BlockingQueue<SatelCommand> sendQueue = new LinkedBlockingQueue<>();
56
57     private final int timeout;
58     private volatile IntegraType integraType;
59     private volatile String integraVersion;
60     private final boolean extPayloadSupport;
61     private @Nullable CommunicationChannel channel;
62     private @Nullable CommunicationWatchdog communicationWatchdog;
63
64     /*
65      * Helper interface for connecting and disconnecting to specific module
66      * type. Each module type should implement these methods to provide input
67      * and output streams and way to disconnect from the module.
68      */
69     protected interface CommunicationChannel {
70
71         InputStream getInputStream() throws IOException;
72
73         OutputStream getOutputStream() throws IOException;
74
75         void disconnect();
76     }
77
78     /*
79      * Helper interface to handle communication timeouts.
80      */
81     protected interface TimeoutTimer {
82
83         void start();
84
85         void stop();
86     }
87
88     /*
89      * Thrown on connection failures.
90      */
91     protected static class ConnectionFailureException extends Exception {
92
93         private static final long serialVersionUID = 2L;
94
95         public ConnectionFailureException(String message) {
96             super(message);
97         }
98
99         public ConnectionFailureException(String message, Throwable cause) {
100             super(message, cause);
101         }
102     }
103
104     /**
105      * Creates new instance of the class.
106      *
107      * @param timeout timeout value in milliseconds for connect/read/write operations
108      * @param extPayloadSupport if <code>true</code>, the module supports extended command payload for reading
109      *            INTEGRA256 state
110      */
111     public SatelModule(int timeout, boolean extPayloadSupport) {
112         this.timeout = timeout;
113         this.integraType = IntegraType.UNKNOWN;
114         this.integraVersion = "";
115         this.extPayloadSupport = extPayloadSupport;
116
117         addEventListener(this);
118     }
119
120     /**
121      * Returns type of Integra connected to the module.
122      *
123      * @return Integra type
124      */
125     public IntegraType getIntegraType() {
126         return this.integraType;
127     }
128
129     /**
130      * Returns firmware revision of Integra connected to the module.
131      *
132      * @return version of Integra firmware
133      */
134     public String getIntegraVersion() {
135         return this.integraVersion;
136     }
137
138     /**
139      * Returns configured timeout value.
140      *
141      * @return timeout value as milliseconds
142      */
143     public int getTimeout() {
144         return this.timeout;
145     }
146
147     public boolean isConnected() {
148         return this.channel != null;
149     }
150
151     /**
152      * Returns status of initialization.
153      *
154      * @return <code>true</code> if module is properly initialized and ready for sending commands
155      */
156     public boolean isInitialized() {
157         return this.integraType != IntegraType.UNKNOWN;
158     }
159
160     /**
161      * Returns extended payload flag.
162      *
163      * @return <code>true</code> if the module supports extended (32-bit) payload for zones/outputs
164      */
165     public boolean hasExtPayloadSupport() {
166         return this.extPayloadSupport;
167     }
168
169     protected abstract CommunicationChannel connect() throws ConnectionFailureException;
170
171     /**
172      * Starts communication.
173      */
174     public synchronized void open() {
175         if (this.communicationWatchdog == null) {
176             this.communicationWatchdog = new CommunicationWatchdog();
177         } else {
178             logger.warn("Module is already opened.");
179         }
180     }
181
182     /**
183      * Stops communication by disconnecting from the module and stopping all background tasks.
184      */
185     public void close() {
186         // first we clear watchdog field in the object
187         CommunicationWatchdog watchdog = null;
188         synchronized (this) {
189             if (this.communicationWatchdog != null) {
190                 watchdog = this.communicationWatchdog;
191                 this.communicationWatchdog = null;
192             }
193         }
194         // then, if watchdog exists, we close it
195         if (watchdog != null) {
196             watchdog.close();
197         }
198     }
199
200     /**
201      * Enqueues specified command in send queue if not already enqueued.
202      *
203      * @param cmd command to enqueue
204      * @return <code>true</code> if operation succeeded
205      */
206     public boolean sendCommand(SatelCommand cmd) {
207         return this.sendCommand(cmd, false);
208     }
209
210     /**
211      * Enqueues specified command in send queue.
212      *
213      * @param cmd command to enqueue
214      * @param force if <code>true</code> enqueues unconditionally
215      * @return <code>true</code> if operation succeeded
216      */
217     public boolean sendCommand(SatelCommand cmd, boolean force) {
218         try {
219             if (force || !this.sendQueue.contains(cmd)) {
220                 this.sendQueue.put(cmd);
221                 cmd.setState(State.ENQUEUED);
222                 logger.trace("Command enqueued: {}", cmd);
223             } else {
224                 logger.debug("Command already in the queue: {}", cmd);
225             }
226             return true;
227         } catch (InterruptedException e) {
228             return false;
229         }
230     }
231
232     @Override
233     public void incomingEvent(IntegraVersionEvent event) {
234         IntegraVersionEvent versionEvent = event;
235         this.integraType = IntegraType.valueOf(versionEvent.getType() & 0xFF);
236         this.integraVersion = versionEvent.getVersion();
237         logger.info("Connection to {} initialized. INTEGRA version: {}.", this.integraType.getName(),
238                 this.integraVersion);
239     }
240
241     private @Nullable SatelMessage readMessage() throws InterruptedException {
242         final CommunicationChannel channel = this.channel;
243         if (channel == null) {
244             logger.error("Reading attempt on closed channel.");
245             return null;
246         }
247
248         try {
249             final InputStream is = channel.getInputStream();
250             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
251             boolean inMessage = false;
252             int syncBytes = 0;
253
254             while (true) {
255                 // if timed out, exit
256                 int c = is.read();
257                 if (c < 0) {
258                     return null;
259                 }
260
261                 byte b = (byte) c;
262
263                 if (b == FRAME_SYNC) {
264                     if (inMessage) {
265                         // syncBytes == 0 means special sequence or end of message
266                         // otherwise we discard all received bytes
267                         if (syncBytes != 0) {
268                             logger.warn("Received frame sync bytes, discarding input: {}", baos.size());
269                             // clear gathered bytes, we wait for new message
270                             inMessage = false;
271                             baos.reset();
272                         }
273                     }
274                     ++syncBytes;
275                 } else {
276                     if (inMessage) {
277                         if (syncBytes == 0) {
278                             // in sync, we have next message byte
279                             baos.write(b);
280                         } else if (syncBytes == 1) {
281                             if (b == FRAME_SYNC_ESC) {
282                                 baos.write(FRAME_SYNC);
283                             } else if (b == FRAME_END[1]) {
284                                 // end of message
285                                 break;
286                             } else {
287                                 logger.warn("Received invalid byte {}, discarding input: {}", String.format("%02X", b),
288                                         baos.size());
289                                 // clear gathered bytes, we have new message
290                                 inMessage = false;
291                                 baos.reset();
292                             }
293                         } else {
294                             logger.error("Sync bytes in message: {}", syncBytes);
295                         }
296                     } else if (syncBytes >= 2) {
297                         // synced, we have first message byte
298                         inMessage = true;
299                         baos.write(b);
300                     }
301                     // otherwise we ignore all bytes until synced
302                     syncBytes = 0;
303                 }
304
305                 // if meanwhile thread has been interrupted, exit the loop
306                 if (Thread.interrupted()) {
307                     throw new InterruptedException();
308                 }
309             }
310
311             // return read message
312             return SatelMessage.fromBytes(baos.toByteArray());
313         } catch (IOException e) {
314             if (!Thread.currentThread().isInterrupted()) {
315                 logger.error("Unexpected exception occurred during reading a message", e);
316             }
317         }
318
319         return null;
320     }
321
322     private boolean writeMessage(SatelMessage message) {
323         final CommunicationChannel channel = this.channel;
324         if (channel == null) {
325             logger.error("Writing attempt on closed channel.");
326             return false;
327         }
328
329         try {
330             final OutputStream os = channel.getOutputStream();
331
332             os.write(FRAME_START);
333             for (byte b : message.getBytes()) {
334                 os.write(b);
335                 if (b == FRAME_SYNC) {
336                     os.write(FRAME_SYNC_ESC);
337                 }
338             }
339             os.write(FRAME_END);
340             os.flush();
341             return true;
342         } catch (IOException e) {
343             if (!Thread.currentThread().isInterrupted()) {
344                 logger.error("Unexpected exception occurred during writing a message", e);
345             }
346         }
347
348         return false;
349     }
350
351     private synchronized void disconnect(@Nullable String reason) {
352         // remove all pending commands from the queue
353         // notifying about send failure
354         SatelCommand cmd;
355         while ((cmd = this.sendQueue.poll()) != null) {
356             cmd.setState(State.FAILED);
357         }
358         final CommunicationChannel channel = this.channel;
359         if (channel != null) {
360             channel.disconnect();
361             this.channel = null;
362             // notify about connection status change
363             this.dispatchEvent(new ConnectionStatusEvent(false, reason));
364         }
365     }
366
367     private void communicationLoop(TimeoutTimer timeoutTimer) {
368         long reconnectionTime = 10 * 1000;
369         boolean receivedResponse = false;
370         SatelCommand command = null;
371         String disconnectReason = null;
372
373         try {
374             while (!Thread.currentThread().isInterrupted()) {
375                 // connect, if not connected yet
376                 if (this.channel == null) {
377                     long connectStartTime = System.currentTimeMillis();
378                     try {
379                         synchronized (this) {
380                             this.channel = connect();
381                         }
382                     } catch (ConnectionFailureException e) {
383                         logger.debug("Connection failed", e);
384                         // notify about connection failure
385                         this.dispatchEvent(new ConnectionStatusEvent(false, e.getMessage()));
386                         // try to reconnect after a while, if connection hasn't
387                         // been established
388                         Thread.sleep(reconnectionTime - System.currentTimeMillis() + connectStartTime);
389                         continue;
390                     }
391                 }
392
393                 // get next command and send it
394                 command = this.sendQueue.take();
395                 logger.debug("Sending message: {}", command.getRequest());
396                 timeoutTimer.start();
397                 boolean sent = this.writeMessage(command.getRequest());
398                 timeoutTimer.stop();
399                 if (!sent) {
400                     break;
401                 }
402                 command.setState(State.SENT);
403
404                 SatelMessage response;
405                 do {
406                     // command sent, wait for response
407                     logger.trace("Waiting for response");
408                     timeoutTimer.start();
409                     response = this.readMessage();
410                     timeoutTimer.stop();
411                     if (response == null) {
412                         break;
413                     }
414                     logger.debug("Got response: {}", response);
415
416                     if (!receivedResponse) {
417                         receivedResponse = true;
418                         // notify about connection success after first
419                         // response from the module
420                         this.dispatchEvent(new ConnectionStatusEvent(true));
421                     }
422                     if (command.matches(response)) {
423                         break;
424                     }
425                     logger.info("Ignoring response, it does not match command {}: {}",
426                             String.format("%02X", command.getRequest().getCommand()), response);
427                 } while (!Thread.currentThread().isInterrupted());
428
429                 if (response == null) {
430                     break;
431                 }
432
433                 if (command.handleResponse(this, response)) {
434                     command.setState(State.SUCCEEDED);
435                 } else {
436                     command.setState(State.FAILED);
437                 }
438
439                 command = null;
440             }
441         } catch (InterruptedException e) {
442             // exit thread
443         } catch (Exception e) {
444             // unexpected error, log and exit thread
445             logger.info("Unhandled exception occurred in communication loop, disconnecting.", e);
446             disconnectReason = "Unhandled exception: " + e.toString();
447         } finally {
448             // stop counting if thread interrupted
449             timeoutTimer.stop();
450         }
451
452         // either send or receive failed
453         if (command != null) {
454             command.setState(State.FAILED);
455         }
456
457         disconnect(disconnectReason);
458     }
459
460     /*
461      * Respawns communication thread in case on any error and interrupts it in
462      * case read/write operations take too long.
463      */
464     private class CommunicationWatchdog extends Timer implements TimeoutTimer {
465         private @Nullable Thread thread;
466         private volatile long lastActivity;
467
468         public CommunicationWatchdog() {
469             this.thread = null;
470             this.lastActivity = 0;
471
472             this.schedule(new TimerTask() {
473                 @Override
474                 public void run() {
475                     CommunicationWatchdog.this.checkThread();
476                 }
477             }, 0, 1000);
478         }
479
480         @Override
481         public void start() {
482             this.lastActivity = System.currentTimeMillis();
483         }
484
485         @Override
486         public void stop() {
487             this.lastActivity = 0;
488         }
489
490         public synchronized void close() {
491             // cancel timer first to prevent reconnect
492             this.cancel();
493             // then stop communication thread
494             final Thread thread = this.thread;
495             if (thread != null) {
496                 thread.interrupt();
497                 try {
498                     thread.join();
499                 } catch (InterruptedException e) {
500                     // ignore
501                 }
502             }
503         }
504
505         private void startCommunication() {
506             Thread thread = this.thread;
507             if (thread != null && thread.isAlive()) {
508                 logger.error("Start communication canceled: communication thread is still alive");
509                 return;
510             }
511             // start new thread
512             thread = new Thread(new Runnable() {
513                 @Override
514                 public void run() {
515                     logger.debug("Communication thread started");
516                     SatelModule.this.communicationLoop(CommunicationWatchdog.this);
517                     logger.debug("Communication thread stopped");
518                 }
519             });
520             thread.start();
521             this.thread = thread;
522             // if module is not initialized yet, send version command
523             if (!SatelModule.this.isInitialized()) {
524                 SatelModule.this.sendCommand(new IntegraVersionCommand());
525             }
526         }
527
528         private void checkThread() {
529             final Thread thread = this.thread;
530             logger.trace("Checking communication thread: {}, {}", thread != null,
531                     Boolean.toString(thread != null && thread.isAlive()));
532             if (thread != null && thread.isAlive()) {
533                 long timePassed = (this.lastActivity == 0) ? 0 : System.currentTimeMillis() - this.lastActivity;
534
535                 if (timePassed > SatelModule.this.timeout) {
536                     logger.error("Send/receive timeout, disconnecting module.");
537                     stop();
538                     thread.interrupt();
539                     try {
540                         // wait for the thread to terminate
541                         thread.join(100);
542                     } catch (InterruptedException e) {
543                         // ignore
544                     }
545                     SatelModule.this.disconnect("Send/receive timeout");
546                 }
547             } else {
548                 startCommunication();
549             }
550         }
551     }
552 }