]> git.basschouten.com Git - openhab-addons.git/blob
85b0bd1119d5f418591271187f9aeb82dac3259f
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2024 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.insteon.internal.driver;
14
15 import java.io.IOException;
16 import java.util.ArrayList;
17 import java.util.Map;
18 import java.util.Map.Entry;
19 import java.util.concurrent.LinkedBlockingQueue;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.atomic.AtomicBoolean;
22
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.binding.insteon.internal.device.DeviceType;
26 import org.openhab.binding.insteon.internal.device.DeviceTypeLoader;
27 import org.openhab.binding.insteon.internal.device.InsteonAddress;
28 import org.openhab.binding.insteon.internal.device.InsteonDevice;
29 import org.openhab.binding.insteon.internal.device.ModemDBBuilder;
30 import org.openhab.binding.insteon.internal.handler.InsteonDeviceHandler;
31 import org.openhab.binding.insteon.internal.message.FieldException;
32 import org.openhab.binding.insteon.internal.message.InvalidMessageTypeException;
33 import org.openhab.binding.insteon.internal.message.Msg;
34 import org.openhab.binding.insteon.internal.message.MsgFactory;
35 import org.openhab.binding.insteon.internal.message.MsgListener;
36 import org.openhab.binding.insteon.internal.utils.Utils;
37 import org.openhab.core.io.transport.serial.SerialPortManager;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * The Port class represents a port, that is a connection to either an Insteon modem either through
43  * a serial or USB port, or via an Insteon Hub.
44  * It does the initialization of the port, and (via its inner classes IOStreamReader and IOStreamWriter)
45  * manages the reading/writing of messages on the Insteon network.
46  *
47  * The IOStreamReader and IOStreamWriter class combined implement the somewhat tricky flow control protocol.
48  * In combination with the MsgFactory class, the incoming data stream is turned into a Msg structure
49  * for further processing by the upper layers (MsgListeners).
50  *
51  * A write queue is maintained to pace the flow of outgoing messages. Sending messages back-to-back
52  * can lead to dropped messages.
53  *
54  *
55  * @author Bernd Pfrommer - Initial contribution
56  * @author Daniel Pfrommer - openHAB 1 insteonplm binding
57  * @author Rob Nielsen - Port to openHAB 2 insteon binding
58  */
59 @NonNullByDefault
60 public class Port {
61     private final Logger logger = LoggerFactory.getLogger(Port.class);
62
63     /**
64      * The ReplyType is used to keep track of the state of the serial port receiver
65      */
66     enum ReplyType {
67         GOT_ACK,
68         WAITING_FOR_ACK,
69         GOT_NACK
70     }
71
72     private IOStream ioStream;
73     private String devName;
74     private String logName;
75     private Modem modem;
76     private IOStreamReader reader;
77     private IOStreamWriter writer;
78     private final int readSize = 1024; // read buffer size
79     private @Nullable Thread readThread = null;
80     private @Nullable Thread writeThread = null;
81     private boolean running = false;
82     private boolean modemDBComplete = false;
83     private MsgFactory msgFactory = new MsgFactory();
84     private Driver driver;
85     private ModemDBBuilder mdbb;
86     private ArrayList<MsgListener> listeners = new ArrayList<>();
87     private LinkedBlockingQueue<Msg> writeQueue = new LinkedBlockingQueue<>();
88     private AtomicBoolean disconnected = new AtomicBoolean(false);
89
90     /**
91      * Constructor
92      *
93      * @param devName the name of the port, i.e. '/dev/insteon'
94      * @param d The Driver object that manages this port
95      */
96     public Port(String devName, Driver d, @Nullable SerialPortManager serialPortManager,
97             ScheduledExecutorService scheduler) {
98         this.devName = devName;
99         this.driver = d;
100         this.logName = Utils.redactPassword(devName);
101         this.modem = new Modem();
102         addListener(modem);
103         this.ioStream = IOStream.create(serialPortManager, devName);
104         this.reader = new IOStreamReader();
105         this.writer = new IOStreamWriter();
106         this.mdbb = new ModemDBBuilder(this, scheduler);
107     }
108
109     public boolean isModem(InsteonAddress a) {
110         return modem.getAddress().equals(a);
111     }
112
113     public synchronized boolean isModemDBComplete() {
114         return (modemDBComplete);
115     }
116
117     public boolean isRunning() {
118         return running;
119     }
120
121     public InsteonAddress getAddress() {
122         return modem.getAddress();
123     }
124
125     public String getDeviceName() {
126         return devName;
127     }
128
129     public Driver getDriver() {
130         return driver;
131     }
132
133     public void addListener(MsgListener l) {
134         synchronized (listeners) {
135             if (!listeners.contains(l)) {
136                 listeners.add(l);
137             }
138         }
139     }
140
141     public void removeListener(MsgListener l) {
142         synchronized (listeners) {
143             if (listeners.remove(l)) {
144                 logger.debug("removed listener from port");
145             }
146         }
147     }
148
149     /**
150      * Clear modem database that has been queried so far.
151      */
152     public void clearModemDB() {
153         logger.debug("clearing modem db!");
154         Map<InsteonAddress, ModemDBEntry> dbes = getDriver().lockModemDBEntries();
155         for (Entry<InsteonAddress, ModemDBEntry> entry : dbes.entrySet()) {
156             if (!entry.getValue().isModem()) {
157                 dbes.remove(entry.getKey());
158             }
159         }
160         getDriver().unlockModemDBEntries();
161     }
162
163     /**
164      * Starts threads necessary for reading and writing
165      */
166     public void start() {
167         logger.debug("starting port {}", logName);
168         if (running) {
169             logger.debug("port {} already running, not started again", logName);
170             return;
171         }
172
173         writeQueue.clear();
174         if (!ioStream.open()) {
175             logger.debug("failed to open port {}", logName);
176             return;
177         }
178         ioStream.start();
179         readThread = new Thread(reader);
180         setParamsAndStart(readThread, "Reader");
181         writeThread = new Thread(writer);
182         setParamsAndStart(writeThread, "Writer");
183
184         if (!mdbb.isComplete()) {
185             modem.initialize();
186             mdbb.start(); // start downloading the device list
187         }
188
189         running = true;
190         disconnected.set(false);
191     }
192
193     private void setParamsAndStart(@Nullable Thread thread, String type) {
194         if (thread != null) {
195             thread.setName("OH-binding-Insteon " + logName + " " + type);
196             thread.setDaemon(true);
197             thread.start();
198         }
199     }
200
201     /**
202      * Stops all threads
203      */
204     public void stop() {
205         if (!running) {
206             logger.debug("port {} not running, no need to stop it", logName);
207             return;
208         }
209
210         running = false;
211         ioStream.stop();
212         ioStream.close();
213
214         Thread readThread = this.readThread;
215         if (readThread != null) {
216             readThread.interrupt();
217         }
218         Thread writeThread = this.writeThread;
219         if (writeThread != null) {
220             writeThread.interrupt();
221         }
222         logger.debug("waiting for read thread to exit for port {}", logName);
223         try {
224             if (readThread != null) {
225                 readThread.join();
226             }
227         } catch (InterruptedException e) {
228             logger.debug("got interrupted waiting for read thread to exit.");
229         }
230         logger.debug("waiting for write thread to exit for port {}", logName);
231         try {
232             if (writeThread != null) {
233                 writeThread.join();
234             }
235         } catch (InterruptedException e) {
236             logger.debug("got interrupted waiting for write thread to exit.");
237         }
238         this.readThread = null;
239         this.writeThread = null;
240
241         logger.debug("all threads for port {} stopped.", logName);
242     }
243
244     /**
245      * Adds message to the write queue
246      *
247      * @param m message to be added to the write queue
248      * @throws IOException
249      */
250     public void writeMessage(@Nullable Msg m) throws IOException {
251         if (m == null) {
252             logger.warn("trying to write null message!");
253             throw new IOException("trying to write null message!");
254         }
255         if (m.getData() == null) {
256             logger.warn("trying to write message without data!");
257             throw new IOException("trying to write message without data!");
258         }
259         try {
260             writeQueue.add(m);
261             logger.trace("enqueued msg: {}", m);
262         } catch (IllegalStateException e) {
263             logger.warn("cannot write message {}, write queue is full!", m);
264         }
265     }
266
267     /**
268      * Gets called by the modem database builder when the modem database is complete
269      */
270     public void modemDBComplete() {
271         synchronized (this) {
272             modemDBComplete = true;
273         }
274         driver.modemDBComplete(this);
275     }
276
277     public void disconnected() {
278         if (isRunning()) {
279             if (!disconnected.getAndSet(true)) {
280                 logger.warn("port {} disconnected", logName);
281                 driver.disconnected();
282             }
283         }
284     }
285
286     /**
287      * The IOStreamReader uses the MsgFactory to turn the incoming bytes into
288      * Msgs for the listeners. It also communicates with the IOStreamWriter
289      * to implement flow control (tell the IOStreamWriter that it needs to retransmit,
290      * or the reply message has been received correctly).
291      *
292      * @author Bernd Pfrommer - Initial contribution
293      */
294     class IOStreamReader implements Runnable {
295
296         private ReplyType reply = ReplyType.GOT_ACK;
297         private Object replyLock = new Object();
298
299         /**
300          * Helper function for implementing synchronization between reader and writer
301          *
302          * @return reference to the RequesReplyLock
303          */
304         public Object getRequestReplyLock() {
305             return replyLock;
306         }
307
308         @Override
309         public void run() {
310             logger.debug("starting reader...");
311             byte[] buffer = new byte[2 * readSize];
312             try {
313                 for (int len = -1; (len = ioStream.read(buffer, 0, readSize)) > 0;) {
314                     msgFactory.addData(buffer, len);
315                     processMessages();
316                 }
317             } catch (InterruptedException e) {
318                 logger.debug("reader thread got interrupted!");
319             } catch (IOException e) {
320                 logger.debug("got an io exception in the reader thread");
321                 disconnected();
322             }
323             logger.debug("reader thread exiting!");
324         }
325
326         private void processMessages() {
327             // must call processData() until msgFactory done fully processing buffer
328             while (!msgFactory.isDone()) {
329                 try {
330                     Msg msg = msgFactory.processData();
331                     if (msg != null) {
332                         toAllListeners(msg);
333                         notifyWriter(msg);
334                     }
335                 } catch (IOException e) {
336                     // got bad data from modem,
337                     // unblock those waiting for ack
338                     synchronized (getRequestReplyLock()) {
339                         if (reply == ReplyType.WAITING_FOR_ACK) {
340                             logger.debug("got bad data back, must assume message was acked.");
341                             reply = ReplyType.GOT_ACK;
342                             getRequestReplyLock().notify();
343                         }
344                     }
345                 }
346             }
347         }
348
349         private void notifyWriter(Msg msg) {
350             synchronized (getRequestReplyLock()) {
351                 if (reply == ReplyType.WAITING_FOR_ACK) {
352                     if (!msg.isUnsolicited()) {
353                         reply = (msg.isPureNack() ? ReplyType.GOT_NACK : ReplyType.GOT_ACK);
354                         logger.trace("signaling receipt of ack: {}", (reply == ReplyType.GOT_ACK));
355                         getRequestReplyLock().notify();
356                     } else if (msg.isPureNack()) {
357                         reply = ReplyType.GOT_NACK;
358                         logger.trace("signaling receipt of pure nack");
359                         getRequestReplyLock().notify();
360                     } else {
361                         logger.trace("got unsolicited message");
362                     }
363                 }
364             }
365         }
366
367         @SuppressWarnings("unchecked")
368         private void toAllListeners(Msg msg) {
369             // When we deliver the message, the recipient
370             // may in turn call removeListener() or addListener(),
371             // thereby corrupting the very same list we are iterating
372             // through. That's why we make a copy of it, and
373             // iterate through the copy.
374             ArrayList<MsgListener> tempList = null;
375             synchronized (listeners) {
376                 tempList = (ArrayList<MsgListener>) listeners.clone();
377             }
378             for (MsgListener l : tempList) {
379                 l.msg(msg); // deliver msg to listener
380             }
381         }
382
383         /**
384          * Blocking wait for ack or nack from modem.
385          * Called by IOStreamWriter for flow control.
386          *
387          * @return true if retransmission is necessary
388          */
389         public boolean waitForReply() {
390             reply = ReplyType.WAITING_FOR_ACK;
391             while (reply == ReplyType.WAITING_FOR_ACK) {
392                 try {
393                     logger.trace("writer waiting for ack.");
394                     // There have been cases observed, in particular for
395                     // the Hub, where we get no ack or nack back, causing the binding
396                     // to hang in the wait() below, because unsolicited messages
397                     // do not trigger a notify(). For this reason we request retransmission
398                     // if the wait() times out.
399                     getRequestReplyLock().wait(30000); // be patient for 30 msec
400                     if (reply == ReplyType.WAITING_FOR_ACK) { // timeout expired without getting ACK or NACK
401                         logger.trace("writer timeout expired, asking for retransmit!");
402                         reply = ReplyType.GOT_NACK;
403                         break;
404                     } else {
405                         logger.trace("writer got ack: {}", (reply == ReplyType.GOT_ACK));
406                     }
407                 } catch (InterruptedException e) {
408                     break; // done for the day...
409                 }
410             }
411             return (reply == ReplyType.GOT_NACK);
412         }
413     }
414
415     /**
416      * Writes messages to the port. Flow control is implemented following Insteon
417      * documents to avoid over running the modem.
418      *
419      * @author Bernd Pfrommer - Initial contribution
420      */
421     class IOStreamWriter implements Runnable {
422         private static final int WAIT_TIME = 200; // milliseconds
423
424         @Override
425         public void run() {
426             logger.debug("starting writer...");
427             while (true) {
428                 try {
429                     // this call blocks until the lock on the queue is released
430                     logger.trace("writer checking message queue");
431                     Msg msg = writeQueue.take();
432                     if (msg.getData() == null) {
433                         logger.warn("found null message in write queue!");
434                     } else {
435                         logger.debug("writing ({}): {}", msg.getQuietTime(), msg);
436                         // To debug race conditions during startup (i.e. make the .items
437                         // file definitions be available *before* the modem link records,
438                         // slow down the modem traffic with the following statement:
439                         // Thread.sleep(500);
440                         synchronized (reader.getRequestReplyLock()) {
441                             ioStream.write(msg.getData());
442                             while (reader.waitForReply()) {
443                                 Thread.sleep(WAIT_TIME);
444                                 logger.trace("retransmitting msg: {}", msg);
445                                 ioStream.write(msg.getData());
446                             }
447                         }
448                         // if rate limited, need to sleep now.
449                         if (msg.getQuietTime() > 0) {
450                             Thread.sleep(msg.getQuietTime());
451                         }
452                     }
453                 } catch (InterruptedException e) {
454                     logger.debug("got interrupted exception in write thread");
455                     break;
456                 } catch (IOException e) {
457                     logger.debug("got an io exception in the write thread");
458                     disconnected();
459                     break;
460                 }
461             }
462             logger.debug("writer thread exiting!");
463         }
464     }
465
466     /**
467      * Class to get info about the modem
468      */
469     class Modem implements MsgListener {
470         private @Nullable InsteonDevice device = null;
471
472         InsteonAddress getAddress() {
473             InsteonDevice device = this.device;
474             return (device == null) ? new InsteonAddress() : (device.getAddress());
475         }
476
477         @Nullable
478         InsteonDevice getDevice() {
479             return device;
480         }
481
482         @Override
483         public void msg(Msg msg) {
484             try {
485                 if (msg.isPureNack()) {
486                     return;
487                 }
488                 if (msg.getByte("Cmd") == 0x60) {
489                     // add the modem to the device list
490                     InsteonAddress a = new InsteonAddress(msg.getAddress("IMAddress"));
491                     DeviceTypeLoader instance = DeviceTypeLoader.instance();
492                     if (instance != null) {
493                         DeviceType dt = instance.getDeviceType(InsteonDeviceHandler.PLM_PRODUCT_KEY);
494                         if (dt == null) {
495                             logger.warn("unknown modem product key: {} for modem: {}.",
496                                     InsteonDeviceHandler.PLM_PRODUCT_KEY, a);
497                         } else {
498                             device = InsteonDevice.makeDevice(dt);
499                             initDevice(a, device);
500                             mdbb.updateModemDB(a, Port.this, null, true);
501                         }
502                     } else {
503                         logger.warn("device type loader instance is null");
504                     }
505                     // can unsubscribe now
506                     removeListener(this);
507                 }
508             } catch (FieldException e) {
509                 logger.warn("error parsing im info reply field: ", e);
510             }
511         }
512
513         private void initDevice(InsteonAddress a, @Nullable InsteonDevice device) {
514             if (device != null) {
515                 device.setAddress(a);
516                 device.setProductKey(InsteonDeviceHandler.PLM_PRODUCT_KEY);
517                 device.setDriver(driver);
518                 device.setIsModem(true);
519                 logger.debug("found modem {} in device_types: {}", a, device.toString());
520             } else {
521                 logger.warn("device is null");
522             }
523         }
524
525         public void initialize() {
526             try {
527                 Msg m = Msg.makeMessage("GetIMInfo");
528                 writeMessage(m);
529             } catch (IOException e) {
530                 logger.warn("modem init failed!", e);
531             } catch (InvalidMessageTypeException e) {
532                 logger.warn("invalid message", e);
533             }
534         }
535     }
536 }