2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.insteon.internal.driver;
15 import java.io.IOException;
16 import java.util.ArrayList;
18 import java.util.Random;
19 import java.util.concurrent.LinkedBlockingQueue;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.atomic.AtomicBoolean;
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.binding.insteon.internal.device.DeviceType;
26 import org.openhab.binding.insteon.internal.device.DeviceTypeLoader;
27 import org.openhab.binding.insteon.internal.device.InsteonAddress;
28 import org.openhab.binding.insteon.internal.device.InsteonDevice;
29 import org.openhab.binding.insteon.internal.device.ModemDBBuilder;
30 import org.openhab.binding.insteon.internal.handler.InsteonDeviceHandler;
31 import org.openhab.binding.insteon.internal.message.FieldException;
32 import org.openhab.binding.insteon.internal.message.InvalidMessageTypeException;
33 import org.openhab.binding.insteon.internal.message.Msg;
34 import org.openhab.binding.insteon.internal.message.MsgFactory;
35 import org.openhab.binding.insteon.internal.message.MsgListener;
36 import org.openhab.binding.insteon.internal.utils.Utils;
37 import org.openhab.core.io.transport.serial.SerialPortManager;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 * The Port class represents a port, that is a connection to either an Insteon modem either through
43 * a serial or USB port, or via an Insteon Hub.
44 * It does the initialization of the port, and (via its inner classes IOStreamReader and IOStreamWriter)
45 * manages the reading/writing of messages on the Insteon network.
47 * The IOStreamReader and IOStreamWriter class combined implement the somewhat tricky flow control protocol.
48 * In combination with the MsgFactory class, the incoming data stream is turned into a Msg structure
49 * for further processing by the upper layers (MsgListeners).
51 * A write queue is maintained to pace the flow of outgoing messages. Sending messages back-to-back
52 * can lead to dropped messages.
55 * @author Bernd Pfrommer - Initial contribution
56 * @author Daniel Pfrommer - openHAB 1 insteonplm binding
57 * @author Rob Nielsen - Port to openHAB 2 insteon binding
60 @SuppressWarnings("null")
62 private final Logger logger = LoggerFactory.getLogger(Port.class);
65 * The ReplyType is used to keep track of the state of the serial port receiver
73 private IOStream ioStream;
74 private String devName;
75 private String logName;
77 private IOStreamReader reader;
78 private IOStreamWriter writer;
79 private final int readSize = 1024; // read buffer size
80 private @Nullable Thread readThread = null;
81 private @Nullable Thread writeThread = null;
82 private boolean running = false;
83 private boolean modemDBComplete = false;
84 private MsgFactory msgFactory = new MsgFactory();
85 private Driver driver;
86 private ModemDBBuilder mdbb;
87 private ArrayList<MsgListener> listeners = new ArrayList<>();
88 private LinkedBlockingQueue<Msg> writeQueue = new LinkedBlockingQueue<>();
89 private AtomicBoolean disconnected = new AtomicBoolean(false);
94 * @param devName the name of the port, i.e. '/dev/insteon'
95 * @param d The Driver object that manages this port
97 public Port(String devName, Driver d, @Nullable SerialPortManager serialPortManager,
98 ScheduledExecutorService scheduler) {
99 this.devName = devName;
101 this.logName = Utils.redactPassword(devName);
102 this.modem = new Modem();
104 this.ioStream = IOStream.create(serialPortManager, devName);
105 this.reader = new IOStreamReader();
106 this.writer = new IOStreamWriter();
107 this.mdbb = new ModemDBBuilder(this, scheduler);
110 public boolean isModem(InsteonAddress a) {
111 return modem.getAddress().equals(a);
114 public synchronized boolean isModemDBComplete() {
115 return (modemDBComplete);
118 public boolean isRunning() {
122 public InsteonAddress getAddress() {
123 return modem.getAddress();
126 public String getDeviceName() {
130 public Driver getDriver() {
134 public void addListener(MsgListener l) {
135 synchronized (listeners) {
136 if (!listeners.contains(l)) {
142 public void removeListener(MsgListener l) {
143 synchronized (listeners) {
144 if (listeners.remove(l)) {
145 logger.debug("removed listener from port");
151 * Clear modem database that has been queried so far.
153 public void clearModemDB() {
154 logger.debug("clearing modem db!");
155 Map<InsteonAddress, ModemDBEntry> dbes = getDriver().lockModemDBEntries();
156 for (InsteonAddress addr : dbes.keySet()) {
157 if (!dbes.get(addr).isModem()) {
161 getDriver().unlockModemDBEntries();
165 * Starts threads necessary for reading and writing
167 public void start() {
168 logger.debug("starting port {}", logName);
170 logger.debug("port {} already running, not started again", logName);
175 if (!ioStream.open()) {
176 logger.debug("failed to open port {}", logName);
180 readThread = new Thread(reader);
181 readThread.setName("Insteon " + logName + " Reader");
182 readThread.setDaemon(true);
184 writeThread = new Thread(writer);
185 writeThread.setName("Insteon " + logName + " Writer");
186 writeThread.setDaemon(true);
189 if (!mdbb.isComplete()) {
191 mdbb.start(); // start downloading the device list
195 disconnected.set(false);
203 logger.debug("port {} not running, no need to stop it", logName);
211 if (readThread != null) {
212 readThread.interrupt();
214 if (writeThread != null) {
215 writeThread.interrupt();
217 logger.debug("waiting for read thread to exit for port {}", logName);
219 if (readThread != null) {
222 } catch (InterruptedException e) {
223 logger.debug("got interrupted waiting for read thread to exit.");
225 logger.debug("waiting for write thread to exit for port {}", logName);
227 if (writeThread != null) {
230 } catch (InterruptedException e) {
231 logger.debug("got interrupted waiting for write thread to exit.");
236 logger.debug("all threads for port {} stopped.", logName);
240 * Adds message to the write queue
242 * @param m message to be added to the write queue
243 * @throws IOException
245 public void writeMessage(@Nullable Msg m) throws IOException {
247 logger.warn("trying to write null message!");
248 throw new IOException("trying to write null message!");
250 if (m.getData() == null) {
251 logger.warn("trying to write message without data!");
252 throw new IOException("trying to write message without data!");
256 logger.trace("enqueued msg: {}", m);
257 } catch (IllegalStateException e) {
258 logger.warn("cannot write message {}, write queue is full!", m);
263 * Gets called by the modem database builder when the modem database is complete
265 public void modemDBComplete() {
266 synchronized (this) {
267 modemDBComplete = true;
269 driver.modemDBComplete(this);
272 public void disconnected() {
274 if (!disconnected.getAndSet(true)) {
275 logger.warn("port {} disconnected", logName);
276 driver.disconnected();
282 * The IOStreamReader uses the MsgFactory to turn the incoming bytes into
283 * Msgs for the listeners. It also communicates with the IOStreamWriter
284 * to implement flow control (tell the IOStreamWriter that it needs to retransmit,
285 * or the reply message has been received correctly).
287 * @author Bernd Pfrommer - Initial contribution
290 class IOStreamReader implements Runnable {
292 private ReplyType reply = ReplyType.GOT_ACK;
293 private Object replyLock = new Object();
294 private boolean dropRandomBytes = false; // set to true for fault injection
297 * Helper function for implementing synchronization between reader and writer
299 * @return reference to the RequesReplyLock
301 public Object getRequestReplyLock() {
307 logger.debug("starting reader...");
308 byte[] buffer = new byte[2 * readSize];
309 Random rng = new Random();
311 for (int len = -1; (len = ioStream.read(buffer, 0, readSize)) > 0;) {
312 if (dropRandomBytes && rng.nextInt(100) < 20) {
313 len = dropBytes(buffer, len);
315 msgFactory.addData(buffer, len);
318 } catch (InterruptedException e) {
319 logger.debug("reader thread got interrupted!");
320 } catch (IOException e) {
321 logger.debug("got an io exception in the reader thread");
324 logger.debug("reader thread exiting!");
327 private void processMessages() {
328 // must call processData() until msgFactory done fully processing buffer
329 while (!msgFactory.isDone()) {
331 Msg msg = msgFactory.processData();
336 } catch (IOException e) {
337 // got bad data from modem,
338 // unblock those waiting for ack
339 synchronized (getRequestReplyLock()) {
340 if (reply == ReplyType.WAITING_FOR_ACK) {
341 logger.debug("got bad data back, must assume message was acked.");
342 reply = ReplyType.GOT_ACK;
343 getRequestReplyLock().notify();
350 private void notifyWriter(Msg msg) {
351 synchronized (getRequestReplyLock()) {
352 if (reply == ReplyType.WAITING_FOR_ACK) {
353 if (!msg.isUnsolicited()) {
354 reply = (msg.isPureNack() ? ReplyType.GOT_NACK : ReplyType.GOT_ACK);
355 logger.trace("signaling receipt of ack: {}", (reply == ReplyType.GOT_ACK));
356 getRequestReplyLock().notify();
357 } else if (msg.isPureNack()) {
358 reply = ReplyType.GOT_NACK;
359 logger.trace("signaling receipt of pure nack");
360 getRequestReplyLock().notify();
362 logger.trace("got unsolicited message");
369 * Drops bytes randomly from buffer to simulate errors seen
370 * from the InsteonHub using the raw interface
372 * @param buffer byte buffer from which to drop bytes
373 * @param len original number of valid bytes in buffer
374 * @return length of byte buffer after dropping from it
376 private int dropBytes(byte[] buffer, int len) {
377 final int dropRate = 2; // in percent
378 Random rng = new Random();
379 ArrayList<Byte> l = new ArrayList<>();
380 for (int i = 0; i < len; i++) {
381 if (rng.nextInt(100) >= dropRate) {
385 for (int i = 0; i < l.size(); i++) {
386 buffer[i] = l.get(i);
391 @SuppressWarnings("unchecked")
392 private void toAllListeners(Msg msg) {
393 // When we deliver the message, the recipient
394 // may in turn call removeListener() or addListener(),
395 // thereby corrupting the very same list we are iterating
396 // through. That's why we make a copy of it, and
397 // iterate through the copy.
398 ArrayList<MsgListener> tempList = null;
399 synchronized (listeners) {
400 tempList = (ArrayList<MsgListener>) listeners.clone();
402 for (MsgListener l : tempList) {
403 l.msg(msg); // deliver msg to listener
408 * Blocking wait for ack or nack from modem.
409 * Called by IOStreamWriter for flow control.
411 * @return true if retransmission is necessary
413 public boolean waitForReply() {
414 reply = ReplyType.WAITING_FOR_ACK;
415 while (reply == ReplyType.WAITING_FOR_ACK) {
417 logger.trace("writer waiting for ack.");
418 // There have been cases observed, in particular for
419 // the Hub, where we get no ack or nack back, causing the binding
420 // to hang in the wait() below, because unsolicited messages
421 // do not trigger a notify(). For this reason we request retransmission
422 // if the wait() times out.
423 getRequestReplyLock().wait(30000); // be patient for 30 msec
424 if (reply == ReplyType.WAITING_FOR_ACK) { // timeout expired without getting ACK or NACK
425 logger.trace("writer timeout expired, asking for retransmit!");
426 reply = ReplyType.GOT_NACK;
429 logger.trace("writer got ack: {}", (reply == ReplyType.GOT_ACK));
431 } catch (InterruptedException e) {
432 break; // done for the day...
435 return (reply == ReplyType.GOT_NACK);
440 * Writes messages to the port. Flow control is implemented following Insteon
441 * documents to avoid over running the modem.
443 * @author Bernd Pfrommer - Initial contribution
446 class IOStreamWriter implements Runnable {
447 private static final int WAIT_TIME = 200; // milliseconds
451 logger.debug("starting writer...");
454 // this call blocks until the lock on the queue is released
455 logger.trace("writer checking message queue");
456 Msg msg = writeQueue.take();
457 if (msg.getData() == null) {
458 logger.warn("found null message in write queue!");
460 logger.debug("writing ({}): {}", msg.getQuietTime(), msg);
461 // To debug race conditions during startup (i.e. make the .items
462 // file definitions be available *before* the modem link records,
463 // slow down the modem traffic with the following statement:
464 // Thread.sleep(500);
465 synchronized (reader.getRequestReplyLock()) {
466 ioStream.write(msg.getData());
467 while (reader.waitForReply()) {
468 Thread.sleep(WAIT_TIME);
469 logger.trace("retransmitting msg: {}", msg);
470 ioStream.write(msg.getData());
474 // if rate limited, need to sleep now.
475 if (msg.getQuietTime() > 0) {
476 Thread.sleep(msg.getQuietTime());
479 } catch (InterruptedException e) {
480 logger.debug("got interrupted exception in write thread");
482 } catch (IOException e) {
483 logger.debug("got an io exception in the write thread");
488 logger.debug("writer thread exiting!");
493 * Class to get info about the modem
496 class Modem implements MsgListener {
497 private @Nullable InsteonDevice device = null;
499 InsteonAddress getAddress() {
500 return (device == null) ? new InsteonAddress() : (device.getAddress());
504 InsteonDevice getDevice() {
509 public void msg(Msg msg) {
511 if (msg.isPureNack()) {
514 if (msg.getByte("Cmd") == 0x60) {
515 // add the modem to the device list
516 InsteonAddress a = new InsteonAddress(msg.getAddress("IMAddress"));
517 DeviceType dt = DeviceTypeLoader.instance().getDeviceType(InsteonDeviceHandler.PLM_PRODUCT_KEY);
519 logger.warn("unknown modem product key: {} for modem: {}.",
520 InsteonDeviceHandler.PLM_PRODUCT_KEY, a);
522 device = InsteonDevice.makeDevice(dt);
523 device.setAddress(a);
524 device.setProductKey(InsteonDeviceHandler.PLM_PRODUCT_KEY);
525 device.setDriver(driver);
526 device.setIsModem(true);
527 logger.debug("found modem {} in device_types: {}", a, device.toString());
528 mdbb.updateModemDB(a, Port.this, null, true);
530 // can unsubscribe now
531 removeListener(this);
533 } catch (FieldException e) {
534 logger.warn("error parsing im info reply field: ", e);
538 public void initialize() {
540 Msg m = Msg.makeMessage("GetIMInfo");
542 } catch (IOException e) {
543 logger.warn("modem init failed!", e);
544 } catch (InvalidMessageTypeException e) {
545 logger.warn("invalid message", e);