2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.insteon.internal.driver;
15 import java.io.IOException;
16 import java.util.ArrayList;
18 import java.util.Map.Entry;
19 import java.util.concurrent.LinkedBlockingQueue;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.atomic.AtomicBoolean;
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.binding.insteon.internal.device.DeviceType;
26 import org.openhab.binding.insteon.internal.device.DeviceTypeLoader;
27 import org.openhab.binding.insteon.internal.device.InsteonAddress;
28 import org.openhab.binding.insteon.internal.device.InsteonDevice;
29 import org.openhab.binding.insteon.internal.device.ModemDBBuilder;
30 import org.openhab.binding.insteon.internal.handler.InsteonDeviceHandler;
31 import org.openhab.binding.insteon.internal.message.FieldException;
32 import org.openhab.binding.insteon.internal.message.InvalidMessageTypeException;
33 import org.openhab.binding.insteon.internal.message.Msg;
34 import org.openhab.binding.insteon.internal.message.MsgFactory;
35 import org.openhab.binding.insteon.internal.message.MsgListener;
36 import org.openhab.binding.insteon.internal.utils.Utils;
37 import org.openhab.core.io.transport.serial.SerialPortManager;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 * The Port class represents a port, that is a connection to either an Insteon modem either through
43 * a serial or USB port, or via an Insteon Hub.
44 * It does the initialization of the port, and (via its inner classes IOStreamReader and IOStreamWriter)
45 * manages the reading/writing of messages on the Insteon network.
47 * The IOStreamReader and IOStreamWriter class combined implement the somewhat tricky flow control protocol.
48 * In combination with the MsgFactory class, the incoming data stream is turned into a Msg structure
49 * for further processing by the upper layers (MsgListeners).
51 * A write queue is maintained to pace the flow of outgoing messages. Sending messages back-to-back
52 * can lead to dropped messages.
55 * @author Bernd Pfrommer - Initial contribution
56 * @author Daniel Pfrommer - openHAB 1 insteonplm binding
57 * @author Rob Nielsen - Port to openHAB 2 insteon binding
61 private final Logger logger = LoggerFactory.getLogger(Port.class);
64 * The ReplyType is used to keep track of the state of the serial port receiver
72 private IOStream ioStream;
73 private String devName;
74 private String logName;
76 private IOStreamReader reader;
77 private IOStreamWriter writer;
78 private final int readSize = 1024; // read buffer size
79 private @Nullable Thread readThread = null;
80 private @Nullable Thread writeThread = null;
81 private boolean running = false;
82 private boolean modemDBComplete = false;
83 private MsgFactory msgFactory = new MsgFactory();
84 private Driver driver;
85 private ModemDBBuilder mdbb;
86 private ArrayList<MsgListener> listeners = new ArrayList<>();
87 private LinkedBlockingQueue<Msg> writeQueue = new LinkedBlockingQueue<>();
88 private AtomicBoolean disconnected = new AtomicBoolean(false);
93 * @param devName the name of the port, i.e. '/dev/insteon'
94 * @param d The Driver object that manages this port
96 public Port(String devName, Driver d, @Nullable SerialPortManager serialPortManager,
97 ScheduledExecutorService scheduler) {
98 this.devName = devName;
100 this.logName = Utils.redactPassword(devName);
101 this.modem = new Modem();
103 this.ioStream = IOStream.create(serialPortManager, devName);
104 this.reader = new IOStreamReader();
105 this.writer = new IOStreamWriter();
106 this.mdbb = new ModemDBBuilder(this, scheduler);
109 public boolean isModem(InsteonAddress a) {
110 return modem.getAddress().equals(a);
113 public synchronized boolean isModemDBComplete() {
114 return (modemDBComplete);
117 public boolean isRunning() {
121 public InsteonAddress getAddress() {
122 return modem.getAddress();
125 public String getDeviceName() {
129 public Driver getDriver() {
133 public void addListener(MsgListener l) {
134 synchronized (listeners) {
135 if (!listeners.contains(l)) {
141 public void removeListener(MsgListener l) {
142 synchronized (listeners) {
143 if (listeners.remove(l)) {
144 logger.debug("removed listener from port");
150 * Clear modem database that has been queried so far.
152 public void clearModemDB() {
153 logger.debug("clearing modem db!");
154 Map<InsteonAddress, ModemDBEntry> dbes = getDriver().lockModemDBEntries();
155 for (Entry<InsteonAddress, ModemDBEntry> entry : dbes.entrySet()) {
156 if (!entry.getValue().isModem()) {
157 dbes.remove(entry.getKey());
160 getDriver().unlockModemDBEntries();
164 * Starts threads necessary for reading and writing
166 public void start() {
167 logger.debug("starting port {}", logName);
169 logger.debug("port {} already running, not started again", logName);
174 if (!ioStream.open()) {
175 logger.debug("failed to open port {}", logName);
179 readThread = new Thread(reader);
180 setParamsAndStart(readThread, "Reader");
181 writeThread = new Thread(writer);
182 setParamsAndStart(writeThread, "Writer");
184 if (!mdbb.isComplete()) {
186 mdbb.start(); // start downloading the device list
190 disconnected.set(false);
193 private void setParamsAndStart(@Nullable Thread thread, String type) {
194 if (thread != null) {
195 thread.setName("OH-binding-Insteon " + logName + " " + type);
196 thread.setDaemon(true);
206 logger.debug("port {} not running, no need to stop it", logName);
214 Thread readThread = this.readThread;
215 if (readThread != null) {
216 readThread.interrupt();
218 Thread writeThread = this.writeThread;
219 if (writeThread != null) {
220 writeThread.interrupt();
222 logger.debug("waiting for read thread to exit for port {}", logName);
224 if (readThread != null) {
227 } catch (InterruptedException e) {
228 logger.debug("got interrupted waiting for read thread to exit.");
230 logger.debug("waiting for write thread to exit for port {}", logName);
232 if (writeThread != null) {
235 } catch (InterruptedException e) {
236 logger.debug("got interrupted waiting for write thread to exit.");
238 this.readThread = null;
239 this.writeThread = null;
241 logger.debug("all threads for port {} stopped.", logName);
245 * Adds message to the write queue
247 * @param m message to be added to the write queue
248 * @throws IOException
250 public void writeMessage(@Nullable Msg m) throws IOException {
252 logger.warn("trying to write null message!");
253 throw new IOException("trying to write null message!");
255 if (m.getData() == null) {
256 logger.warn("trying to write message without data!");
257 throw new IOException("trying to write message without data!");
261 logger.trace("enqueued msg: {}", m);
262 } catch (IllegalStateException e) {
263 logger.warn("cannot write message {}, write queue is full!", m);
268 * Gets called by the modem database builder when the modem database is complete
270 public void modemDBComplete() {
271 synchronized (this) {
272 modemDBComplete = true;
274 driver.modemDBComplete(this);
277 public void disconnected() {
279 if (!disconnected.getAndSet(true)) {
280 logger.warn("port {} disconnected", logName);
281 driver.disconnected();
287 * The IOStreamReader uses the MsgFactory to turn the incoming bytes into
288 * Msgs for the listeners. It also communicates with the IOStreamWriter
289 * to implement flow control (tell the IOStreamWriter that it needs to retransmit,
290 * or the reply message has been received correctly).
292 * @author Bernd Pfrommer - Initial contribution
294 class IOStreamReader implements Runnable {
296 private ReplyType reply = ReplyType.GOT_ACK;
297 private Object replyLock = new Object();
300 * Helper function for implementing synchronization between reader and writer
302 * @return reference to the RequesReplyLock
304 public Object getRequestReplyLock() {
310 logger.debug("starting reader...");
311 byte[] buffer = new byte[2 * readSize];
313 for (int len = -1; (len = ioStream.read(buffer, 0, readSize)) > 0;) {
314 msgFactory.addData(buffer, len);
317 } catch (InterruptedException e) {
318 logger.debug("reader thread got interrupted!");
319 } catch (IOException e) {
320 logger.debug("got an io exception in the reader thread");
323 logger.debug("reader thread exiting!");
326 private void processMessages() {
327 // must call processData() until msgFactory done fully processing buffer
328 while (!msgFactory.isDone()) {
330 Msg msg = msgFactory.processData();
335 } catch (IOException e) {
336 // got bad data from modem,
337 // unblock those waiting for ack
338 synchronized (getRequestReplyLock()) {
339 if (reply == ReplyType.WAITING_FOR_ACK) {
340 logger.debug("got bad data back, must assume message was acked.");
341 reply = ReplyType.GOT_ACK;
342 getRequestReplyLock().notify();
349 private void notifyWriter(Msg msg) {
350 synchronized (getRequestReplyLock()) {
351 if (reply == ReplyType.WAITING_FOR_ACK) {
352 if (!msg.isUnsolicited()) {
353 reply = (msg.isPureNack() ? ReplyType.GOT_NACK : ReplyType.GOT_ACK);
354 logger.trace("signaling receipt of ack: {}", (reply == ReplyType.GOT_ACK));
355 getRequestReplyLock().notify();
356 } else if (msg.isPureNack()) {
357 reply = ReplyType.GOT_NACK;
358 logger.trace("signaling receipt of pure nack");
359 getRequestReplyLock().notify();
361 logger.trace("got unsolicited message");
367 @SuppressWarnings("unchecked")
368 private void toAllListeners(Msg msg) {
369 // When we deliver the message, the recipient
370 // may in turn call removeListener() or addListener(),
371 // thereby corrupting the very same list we are iterating
372 // through. That's why we make a copy of it, and
373 // iterate through the copy.
374 ArrayList<MsgListener> tempList = null;
375 synchronized (listeners) {
376 tempList = (ArrayList<MsgListener>) listeners.clone();
378 for (MsgListener l : tempList) {
379 l.msg(msg); // deliver msg to listener
384 * Blocking wait for ack or nack from modem.
385 * Called by IOStreamWriter for flow control.
387 * @return true if retransmission is necessary
389 public boolean waitForReply() {
390 reply = ReplyType.WAITING_FOR_ACK;
391 while (reply == ReplyType.WAITING_FOR_ACK) {
393 logger.trace("writer waiting for ack.");
394 // There have been cases observed, in particular for
395 // the Hub, where we get no ack or nack back, causing the binding
396 // to hang in the wait() below, because unsolicited messages
397 // do not trigger a notify(). For this reason we request retransmission
398 // if the wait() times out.
399 getRequestReplyLock().wait(30000); // be patient for 30 msec
400 if (reply == ReplyType.WAITING_FOR_ACK) { // timeout expired without getting ACK or NACK
401 logger.trace("writer timeout expired, asking for retransmit!");
402 reply = ReplyType.GOT_NACK;
405 logger.trace("writer got ack: {}", (reply == ReplyType.GOT_ACK));
407 } catch (InterruptedException e) {
408 break; // done for the day...
411 return (reply == ReplyType.GOT_NACK);
416 * Writes messages to the port. Flow control is implemented following Insteon
417 * documents to avoid over running the modem.
419 * @author Bernd Pfrommer - Initial contribution
421 class IOStreamWriter implements Runnable {
422 private static final int WAIT_TIME = 200; // milliseconds
426 logger.debug("starting writer...");
429 // this call blocks until the lock on the queue is released
430 logger.trace("writer checking message queue");
431 Msg msg = writeQueue.take();
432 if (msg.getData() == null) {
433 logger.warn("found null message in write queue!");
435 logger.debug("writing ({}): {}", msg.getQuietTime(), msg);
436 // To debug race conditions during startup (i.e. make the .items
437 // file definitions be available *before* the modem link records,
438 // slow down the modem traffic with the following statement:
439 // Thread.sleep(500);
440 synchronized (reader.getRequestReplyLock()) {
441 ioStream.write(msg.getData());
442 while (reader.waitForReply()) {
443 Thread.sleep(WAIT_TIME);
444 logger.trace("retransmitting msg: {}", msg);
445 ioStream.write(msg.getData());
448 // if rate limited, need to sleep now.
449 if (msg.getQuietTime() > 0) {
450 Thread.sleep(msg.getQuietTime());
453 } catch (InterruptedException e) {
454 logger.debug("got interrupted exception in write thread");
456 } catch (IOException e) {
457 logger.debug("got an io exception in the write thread");
462 logger.debug("writer thread exiting!");
467 * Class to get info about the modem
469 class Modem implements MsgListener {
470 private @Nullable InsteonDevice device = null;
472 InsteonAddress getAddress() {
473 InsteonDevice device = this.device;
474 return (device == null) ? new InsteonAddress() : (device.getAddress());
478 InsteonDevice getDevice() {
483 public void msg(Msg msg) {
485 if (msg.isPureNack()) {
488 if (msg.getByte("Cmd") == 0x60) {
489 // add the modem to the device list
490 InsteonAddress a = new InsteonAddress(msg.getAddress("IMAddress"));
491 DeviceTypeLoader instance = DeviceTypeLoader.instance();
492 if (instance != null) {
493 DeviceType dt = instance.getDeviceType(InsteonDeviceHandler.PLM_PRODUCT_KEY);
495 logger.warn("unknown modem product key: {} for modem: {}.",
496 InsteonDeviceHandler.PLM_PRODUCT_KEY, a);
498 device = InsteonDevice.makeDevice(dt);
499 initDevice(a, device);
500 mdbb.updateModemDB(a, Port.this, null, true);
503 logger.warn("device type loader instance is null");
505 // can unsubscribe now
506 removeListener(this);
508 } catch (FieldException e) {
509 logger.warn("error parsing im info reply field: ", e);
513 private void initDevice(InsteonAddress a, @Nullable InsteonDevice device) {
514 if (device != null) {
515 device.setAddress(a);
516 device.setProductKey(InsteonDeviceHandler.PLM_PRODUCT_KEY);
517 device.setDriver(driver);
518 device.setIsModem(true);
519 logger.debug("found modem {} in device_types: {}", a, device.toString());
521 logger.warn("device is null");
525 public void initialize() {
527 Msg m = Msg.makeMessage("GetIMInfo");
529 } catch (IOException e) {
530 logger.warn("modem init failed!", e);
531 } catch (InvalidMessageTypeException e) {
532 logger.warn("invalid message", e);