]> git.basschouten.com Git - openhab-addons.git/blob
125102b563d3266b0a2643ca5e17a151e8a4c1c9
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.insteon.internal.driver;
14
15 import java.io.IOException;
16 import java.util.ArrayList;
17 import java.util.Map;
18 import java.util.Random;
19 import java.util.concurrent.LinkedBlockingQueue;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.atomic.AtomicBoolean;
22
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.binding.insteon.internal.device.DeviceType;
26 import org.openhab.binding.insteon.internal.device.DeviceTypeLoader;
27 import org.openhab.binding.insteon.internal.device.InsteonAddress;
28 import org.openhab.binding.insteon.internal.device.InsteonDevice;
29 import org.openhab.binding.insteon.internal.device.ModemDBBuilder;
30 import org.openhab.binding.insteon.internal.handler.InsteonDeviceHandler;
31 import org.openhab.binding.insteon.internal.message.FieldException;
32 import org.openhab.binding.insteon.internal.message.InvalidMessageTypeException;
33 import org.openhab.binding.insteon.internal.message.Msg;
34 import org.openhab.binding.insteon.internal.message.MsgFactory;
35 import org.openhab.binding.insteon.internal.message.MsgListener;
36 import org.openhab.binding.insteon.internal.utils.Utils;
37 import org.openhab.core.io.transport.serial.SerialPortManager;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * The Port class represents a port, that is a connection to either an Insteon modem either through
43  * a serial or USB port, or via an Insteon Hub.
44  * It does the initialization of the port, and (via its inner classes IOStreamReader and IOStreamWriter)
45  * manages the reading/writing of messages on the Insteon network.
46  *
47  * The IOStreamReader and IOStreamWriter class combined implement the somewhat tricky flow control protocol.
48  * In combination with the MsgFactory class, the incoming data stream is turned into a Msg structure
49  * for further processing by the upper layers (MsgListeners).
50  *
51  * A write queue is maintained to pace the flow of outgoing messages. Sending messages back-to-back
52  * can lead to dropped messages.
53  *
54  *
55  * @author Bernd Pfrommer - Initial contribution
56  * @author Daniel Pfrommer - openHAB 1 insteonplm binding
57  * @author Rob Nielsen - Port to openHAB 2 insteon binding
58  */
59 @NonNullByDefault
60 @SuppressWarnings("null")
61 public class Port {
62     private final Logger logger = LoggerFactory.getLogger(Port.class);
63
64     /**
65      * The ReplyType is used to keep track of the state of the serial port receiver
66      */
67     enum ReplyType {
68         GOT_ACK,
69         WAITING_FOR_ACK,
70         GOT_NACK
71     }
72
73     private IOStream ioStream;
74     private String devName;
75     private String logName;
76     private Modem modem;
77     private IOStreamReader reader;
78     private IOStreamWriter writer;
79     private final int readSize = 1024; // read buffer size
80     private @Nullable Thread readThread = null;
81     private @Nullable Thread writeThread = null;
82     private boolean running = false;
83     private boolean modemDBComplete = false;
84     private MsgFactory msgFactory = new MsgFactory();
85     private Driver driver;
86     private ModemDBBuilder mdbb;
87     private ArrayList<MsgListener> listeners = new ArrayList<>();
88     private LinkedBlockingQueue<Msg> writeQueue = new LinkedBlockingQueue<>();
89     private AtomicBoolean disconnected = new AtomicBoolean(false);
90
91     /**
92      * Constructor
93      *
94      * @param devName the name of the port, i.e. '/dev/insteon'
95      * @param d The Driver object that manages this port
96      */
97     public Port(String devName, Driver d, @Nullable SerialPortManager serialPortManager,
98             ScheduledExecutorService scheduler) {
99         this.devName = devName;
100         this.driver = d;
101         this.logName = Utils.redactPassword(devName);
102         this.modem = new Modem();
103         addListener(modem);
104         this.ioStream = IOStream.create(serialPortManager, devName);
105         this.reader = new IOStreamReader();
106         this.writer = new IOStreamWriter();
107         this.mdbb = new ModemDBBuilder(this, scheduler);
108     }
109
110     public boolean isModem(InsteonAddress a) {
111         return modem.getAddress().equals(a);
112     }
113
114     public synchronized boolean isModemDBComplete() {
115         return (modemDBComplete);
116     }
117
118     public boolean isRunning() {
119         return running;
120     }
121
122     public InsteonAddress getAddress() {
123         return modem.getAddress();
124     }
125
126     public String getDeviceName() {
127         return devName;
128     }
129
130     public Driver getDriver() {
131         return driver;
132     }
133
134     public void addListener(MsgListener l) {
135         synchronized (listeners) {
136             if (!listeners.contains(l)) {
137                 listeners.add(l);
138             }
139         }
140     }
141
142     public void removeListener(MsgListener l) {
143         synchronized (listeners) {
144             if (listeners.remove(l)) {
145                 logger.debug("removed listener from port");
146             }
147         }
148     }
149
150     /**
151      * Clear modem database that has been queried so far.
152      */
153     public void clearModemDB() {
154         logger.debug("clearing modem db!");
155         Map<InsteonAddress, @Nullable ModemDBEntry> dbes = getDriver().lockModemDBEntries();
156         for (InsteonAddress addr : dbes.keySet()) {
157             if (!dbes.get(addr).isModem()) {
158                 dbes.remove(addr);
159             }
160         }
161         getDriver().unlockModemDBEntries();
162     }
163
164     /**
165      * Starts threads necessary for reading and writing
166      */
167     public void start() {
168         logger.debug("starting port {}", logName);
169         if (running) {
170             logger.debug("port {} already running, not started again", logName);
171             return;
172         }
173
174         writeQueue.clear();
175         if (!ioStream.open()) {
176             logger.debug("failed to open port {}", logName);
177             return;
178         }
179         ioStream.start();
180         readThread = new Thread(reader);
181         readThread.setName("Insteon " + logName + " Reader");
182         readThread.setDaemon(true);
183         readThread.start();
184         writeThread = new Thread(writer);
185         writeThread.setName("Insteon " + logName + " Writer");
186         writeThread.setDaemon(true);
187         writeThread.start();
188
189         if (!mdbb.isComplete()) {
190             modem.initialize();
191             mdbb.start(); // start downloading the device list
192         }
193
194         running = true;
195         disconnected.set(false);
196     }
197
198     /**
199      * Stops all threads
200      */
201     public void stop() {
202         if (!running) {
203             logger.debug("port {} not running, no need to stop it", logName);
204             return;
205         }
206
207         running = false;
208         ioStream.stop();
209         ioStream.close();
210
211         if (readThread != null) {
212             readThread.interrupt();
213         }
214         if (writeThread != null) {
215             writeThread.interrupt();
216         }
217         logger.debug("waiting for read thread to exit for port {}", logName);
218         try {
219             if (readThread != null) {
220                 readThread.join();
221             }
222         } catch (InterruptedException e) {
223             logger.debug("got interrupted waiting for read thread to exit.");
224         }
225         logger.debug("waiting for write thread to exit for port {}", logName);
226         try {
227             if (writeThread != null) {
228                 writeThread.join();
229             }
230         } catch (InterruptedException e) {
231             logger.debug("got interrupted waiting for write thread to exit.");
232         }
233         readThread = null;
234         writeThread = null;
235
236         logger.debug("all threads for port {} stopped.", logName);
237     }
238
239     /**
240      * Adds message to the write queue
241      *
242      * @param m message to be added to the write queue
243      * @throws IOException
244      */
245     public void writeMessage(@Nullable Msg m) throws IOException {
246         if (m == null) {
247             logger.warn("trying to write null message!");
248             throw new IOException("trying to write null message!");
249         }
250         if (m.getData() == null) {
251             logger.warn("trying to write message without data!");
252             throw new IOException("trying to write message without data!");
253         }
254         try {
255             writeQueue.add(m);
256             logger.trace("enqueued msg: {}", m);
257         } catch (IllegalStateException e) {
258             logger.warn("cannot write message {}, write queue is full!", m);
259         }
260     }
261
262     /**
263      * Gets called by the modem database builder when the modem database is complete
264      */
265     public void modemDBComplete() {
266         synchronized (this) {
267             modemDBComplete = true;
268         }
269         driver.modemDBComplete(this);
270     }
271
272     public void disconnected() {
273         if (isRunning()) {
274             if (!disconnected.getAndSet(true)) {
275                 logger.warn("port {} disconnected", logName);
276                 driver.disconnected();
277             }
278         }
279     }
280
281     /**
282      * The IOStreamReader uses the MsgFactory to turn the incoming bytes into
283      * Msgs for the listeners. It also communicates with the IOStreamWriter
284      * to implement flow control (tell the IOStreamWriter that it needs to retransmit,
285      * or the reply message has been received correctly).
286      *
287      * @author Bernd Pfrommer - Initial contribution
288      */
289     @NonNullByDefault
290     class IOStreamReader implements Runnable {
291
292         private ReplyType reply = ReplyType.GOT_ACK;
293         private Object replyLock = new Object();
294         private boolean dropRandomBytes = false; // set to true for fault injection
295
296         /**
297          * Helper function for implementing synchronization between reader and writer
298          *
299          * @return reference to the RequesReplyLock
300          */
301         public Object getRequestReplyLock() {
302             return replyLock;
303         }
304
305         @Override
306         public void run() {
307             logger.debug("starting reader...");
308             byte[] buffer = new byte[2 * readSize];
309             Random rng = new Random();
310             try {
311                 for (int len = -1; (len = ioStream.read(buffer, 0, readSize)) > 0;) {
312                     if (dropRandomBytes && rng.nextInt(100) < 20) {
313                         len = dropBytes(buffer, len);
314                     }
315                     msgFactory.addData(buffer, len);
316                     processMessages();
317                 }
318             } catch (InterruptedException e) {
319                 logger.debug("reader thread got interrupted!");
320             } catch (IOException e) {
321                 logger.debug("got an io exception in the reader thread");
322                 disconnected();
323             }
324             logger.debug("reader thread exiting!");
325         }
326
327         private void processMessages() {
328             // must call processData() until msgFactory done fully processing buffer
329             while (!msgFactory.isDone()) {
330                 try {
331                     Msg msg = msgFactory.processData();
332                     if (msg != null) {
333                         toAllListeners(msg);
334                         notifyWriter(msg);
335                     }
336                 } catch (IOException e) {
337                     // got bad data from modem,
338                     // unblock those waiting for ack
339                     synchronized (getRequestReplyLock()) {
340                         if (reply == ReplyType.WAITING_FOR_ACK) {
341                             logger.debug("got bad data back, must assume message was acked.");
342                             reply = ReplyType.GOT_ACK;
343                             getRequestReplyLock().notify();
344                         }
345                     }
346                 }
347             }
348         }
349
350         private void notifyWriter(Msg msg) {
351             synchronized (getRequestReplyLock()) {
352                 if (reply == ReplyType.WAITING_FOR_ACK) {
353                     if (!msg.isUnsolicited()) {
354                         reply = (msg.isPureNack() ? ReplyType.GOT_NACK : ReplyType.GOT_ACK);
355                         logger.trace("signaling receipt of ack: {}", (reply == ReplyType.GOT_ACK));
356                         getRequestReplyLock().notify();
357                     } else if (msg.isPureNack()) {
358                         reply = ReplyType.GOT_NACK;
359                         logger.trace("signaling receipt of pure nack");
360                         getRequestReplyLock().notify();
361                     } else {
362                         logger.trace("got unsolicited message");
363                     }
364                 }
365             }
366         }
367
368         /**
369          * Drops bytes randomly from buffer to simulate errors seen
370          * from the InsteonHub using the raw interface
371          *
372          * @param buffer byte buffer from which to drop bytes
373          * @param len original number of valid bytes in buffer
374          * @return length of byte buffer after dropping from it
375          */
376         private int dropBytes(byte[] buffer, int len) {
377             final int dropRate = 2; // in percent
378             Random rng = new Random();
379             ArrayList<Byte> l = new ArrayList<>();
380             for (int i = 0; i < len; i++) {
381                 if (rng.nextInt(100) >= dropRate) {
382                     l.add(buffer[i]);
383                 }
384             }
385             for (int i = 0; i < l.size(); i++) {
386                 buffer[i] = l.get(i);
387             }
388             return (l.size());
389         }
390
391         @SuppressWarnings("unchecked")
392         private void toAllListeners(Msg msg) {
393             // When we deliver the message, the recipient
394             // may in turn call removeListener() or addListener(),
395             // thereby corrupting the very same list we are iterating
396             // through. That's why we make a copy of it, and
397             // iterate through the copy.
398             ArrayList<MsgListener> tempList = null;
399             synchronized (listeners) {
400                 tempList = (ArrayList<MsgListener>) listeners.clone();
401             }
402             for (MsgListener l : tempList) {
403                 l.msg(msg); // deliver msg to listener
404             }
405         }
406
407         /**
408          * Blocking wait for ack or nack from modem.
409          * Called by IOStreamWriter for flow control.
410          *
411          * @return true if retransmission is necessary
412          */
413         public boolean waitForReply() {
414             reply = ReplyType.WAITING_FOR_ACK;
415             while (reply == ReplyType.WAITING_FOR_ACK) {
416                 try {
417                     logger.trace("writer waiting for ack.");
418                     // There have been cases observed, in particular for
419                     // the Hub, where we get no ack or nack back, causing the binding
420                     // to hang in the wait() below, because unsolicited messages
421                     // do not trigger a notify(). For this reason we request retransmission
422                     // if the wait() times out.
423                     getRequestReplyLock().wait(30000); // be patient for 30 msec
424                     if (reply == ReplyType.WAITING_FOR_ACK) { // timeout expired without getting ACK or NACK
425                         logger.trace("writer timeout expired, asking for retransmit!");
426                         reply = ReplyType.GOT_NACK;
427                         break;
428                     } else {
429                         logger.trace("writer got ack: {}", (reply == ReplyType.GOT_ACK));
430                     }
431                 } catch (InterruptedException e) {
432                     break; // done for the day...
433                 }
434             }
435             return (reply == ReplyType.GOT_NACK);
436         }
437     }
438
439     /**
440      * Writes messages to the port. Flow control is implemented following Insteon
441      * documents to avoid over running the modem.
442      *
443      * @author Bernd Pfrommer - Initial contribution
444      */
445     @NonNullByDefault
446     class IOStreamWriter implements Runnable {
447         private static final int WAIT_TIME = 200; // milliseconds
448
449         @Override
450         public void run() {
451             logger.debug("starting writer...");
452             while (true) {
453                 try {
454                     // this call blocks until the lock on the queue is released
455                     logger.trace("writer checking message queue");
456                     Msg msg = writeQueue.take();
457                     if (msg.getData() == null) {
458                         logger.warn("found null message in write queue!");
459                     } else {
460                         logger.debug("writing ({}): {}", msg.getQuietTime(), msg);
461                         // To debug race conditions during startup (i.e. make the .items
462                         // file definitions be available *before* the modem link records,
463                         // slow down the modem traffic with the following statement:
464                         // Thread.sleep(500);
465                         synchronized (reader.getRequestReplyLock()) {
466                             ioStream.write(msg.getData());
467                             while (reader.waitForReply()) {
468                                 Thread.sleep(WAIT_TIME);
469                                 logger.trace("retransmitting msg: {}", msg);
470                                 ioStream.write(msg.getData());
471                             }
472
473                         }
474                         // if rate limited, need to sleep now.
475                         if (msg.getQuietTime() > 0) {
476                             Thread.sleep(msg.getQuietTime());
477                         }
478                     }
479                 } catch (InterruptedException e) {
480                     logger.debug("got interrupted exception in write thread");
481                     break;
482                 } catch (IOException e) {
483                     logger.debug("got an io exception in the write thread");
484                     disconnected();
485                     break;
486                 }
487             }
488             logger.debug("writer thread exiting!");
489         }
490     }
491
492     /**
493      * Class to get info about the modem
494      */
495     @NonNullByDefault
496     class Modem implements MsgListener {
497         private @Nullable InsteonDevice device = null;
498
499         InsteonAddress getAddress() {
500             return (device == null) ? new InsteonAddress() : (device.getAddress());
501         }
502
503         @Nullable
504         InsteonDevice getDevice() {
505             return device;
506         }
507
508         @Override
509         public void msg(Msg msg) {
510             try {
511                 if (msg.isPureNack()) {
512                     return;
513                 }
514                 if (msg.getByte("Cmd") == 0x60) {
515                     // add the modem to the device list
516                     InsteonAddress a = new InsteonAddress(msg.getAddress("IMAddress"));
517                     DeviceType dt = DeviceTypeLoader.instance().getDeviceType(InsteonDeviceHandler.PLM_PRODUCT_KEY);
518                     if (dt == null) {
519                         logger.warn("unknown modem product key: {} for modem: {}.",
520                                 InsteonDeviceHandler.PLM_PRODUCT_KEY, a);
521                     } else {
522                         device = InsteonDevice.makeDevice(dt);
523                         device.setAddress(a);
524                         device.setProductKey(InsteonDeviceHandler.PLM_PRODUCT_KEY);
525                         device.setDriver(driver);
526                         device.setIsModem(true);
527                         logger.debug("found modem {} in device_types: {}", a, device.toString());
528                         mdbb.updateModemDB(a, Port.this, null, true);
529                     }
530                     // can unsubscribe now
531                     removeListener(this);
532                 }
533             } catch (FieldException e) {
534                 logger.warn("error parsing im info reply field: ", e);
535             }
536         }
537
538         public void initialize() {
539             try {
540                 Msg m = Msg.makeMessage("GetIMInfo");
541                 writeMessage(m);
542             } catch (IOException e) {
543                 logger.warn("modem init failed!", e);
544             } catch (InvalidMessageTypeException e) {
545                 logger.warn("invalid message", e);
546             }
547         }
548     }
549 }