2 * Copyright (c) 2010-2021 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.upb.internal.handler;
15 import static java.nio.charset.StandardCharsets.US_ASCII;
16 import static java.util.concurrent.TimeUnit.MILLISECONDS;
18 import java.io.BufferedInputStream;
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.OutputStream;
22 import java.nio.charset.StandardCharsets;
23 import java.util.concurrent.CompletableFuture;
24 import java.util.concurrent.CompletionStage;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.ThreadPoolExecutor;
30 import java.util.concurrent.TimeUnit;
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.binding.upb.internal.handler.UPBIoHandler.CmdStatus;
35 import org.openhab.binding.upb.internal.message.MessageBuilder;
36 import org.openhab.binding.upb.internal.message.MessageParseException;
37 import org.openhab.binding.upb.internal.message.UPBMessage;
38 import org.openhab.core.common.NamedThreadFactory;
39 import org.openhab.core.io.transport.serial.SerialPort;
40 import org.openhab.core.thing.ThingUID;
41 import org.openhab.core.util.HexUtils;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * Event loop for serial communications. Handles sending and receiving UPB messages.
48 * @author Marcus Better - Initial contribution
52 public class SerialIoThread extends Thread {
53 private static final int WRITE_QUEUE_LENGTH = 128;
54 private static final int ACK_TIMEOUT_MS = 500;
55 private static final byte[] ENABLE_MESSAGE_MODE_CMD = "\u001770028E\n".getBytes(StandardCharsets.US_ASCII);
57 private static final int MAX_READ_SIZE = 128;
58 private static final int CR = 13;
60 private final Logger logger = LoggerFactory.getLogger(SerialIoThread.class);
61 private final MessageListener listener;
62 // Single-threaded executor for writes that serves to serialize writes.
63 private final ExecutorService writeExecutor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS,
64 new LinkedBlockingQueue<>(WRITE_QUEUE_LENGTH), new NamedThreadFactory("upb-serial-writer", true));
65 private final SerialPort serialPort;
67 private volatile @Nullable WriteRunnable currentWrite;
68 private volatile boolean done;
70 public SerialIoThread(final SerialPort serialPort, final MessageListener listener, final ThingUID thingUID) {
71 this.serialPort = serialPort;
72 this.listener = listener;
73 setName("OH-binding-" + thingUID + "-serial-reader");
80 try (final InputStream in = serialPort.getInputStream()) {
82 // should never happen
83 throw new IllegalStateException("serial port is not readable");
85 try (final InputStream bufIn = new BufferedInputStream(in)) {
86 bufIn.mark(MAX_READ_SIZE);
89 final int b = bufIn.read();
91 // the serial input returns -1 on receive timeout
96 // message terminator read, rewind the stream and parse the buffered message
99 processBuffer(bufIn, len);
100 } catch (final IOException e) {
101 logger.warn("buffer overrun, dropped long message", e);
103 bufIn.mark(MAX_READ_SIZE);
109 } catch (final IOException e) {
110 logger.warn("Exception in UPB read thread", e);
112 logger.debug("shutting down receive thread");
113 shutdownAndAwaitTermination(writeExecutor);
116 } catch (final RuntimeException e) {
120 logger.debug("UPB read thread stopped");
124 * Attempts to parse a message from the input stream.
126 * @param in the stream to read from
127 * @param len the number of bytes in the message
129 private void processBuffer(final InputStream in, final int len) {
130 final byte[] buf = new byte[len];
134 } catch (final IOException e) {
135 logger.warn("error reading message", e);
139 // should not happen when replaying the buffered input
140 logger.warn("truncated read, expected={} read={}", len, n);
143 if (logger.isDebugEnabled()) {
144 logger.debug("UPB Message: {}", HexUtils.bytesToHex(buf));
146 final UPBMessage msg;
148 msg = UPBMessage.parse(buf);
149 } catch (final MessageParseException e) {
150 logger.warn("failed to parse message: {}", HexUtils.bytesToHex(buf), e);
156 private void handleMessage(final UPBMessage msg) {
157 final WriteRunnable writeRunnable = currentWrite;
158 switch (msg.getType()) {
160 if (writeRunnable != null) {
161 writeRunnable.ackReceived(true);
165 if (writeRunnable != null) {
166 writeRunnable.ackReceived(false);
172 logger.debug("received ERROR response from PIM");
177 listener.incomingMessage(msg);
180 public CompletionStage<CmdStatus> enqueue(final MessageBuilder msg) {
181 final CompletableFuture<CmdStatus> completion = new CompletableFuture<>();
182 final Runnable task = new WriteRunnable(msg.build(), completion);
184 writeExecutor.execute(task);
185 } catch (final RejectedExecutionException e) {
186 completion.completeExceptionally(e);
191 // puts the PIM is in message mode
192 private void enterMessageMode() {
194 final OutputStream out = serialPort.getOutputStream();
196 throw new IOException("serial port is not writable");
198 out.write(ENABLE_MESSAGE_MODE_CMD);
200 } catch (final IOException e) {
201 logger.warn("error setting message mode", e);
205 void shutdownAndAwaitTermination(final ExecutorService pool) {
208 if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
210 if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
211 logger.warn("executor did not terminate");
214 } catch (final InterruptedException ie) {
216 Thread.currentThread().interrupt();
220 public void terminate() {
224 } catch (final RuntimeException e) {
225 logger.warn("failed to close serial port", e);
229 private class WriteRunnable implements Runnable {
230 private static final int MAX_RETRIES = 3;
232 private final String msg;
233 private final CompletableFuture<CmdStatus> completion;
234 private final CountDownLatch ackLatch = new CountDownLatch(1);
236 private @Nullable Boolean ack;
238 public WriteRunnable(final String msg, final CompletableFuture<CmdStatus> completion) {
240 this.completion = completion;
243 // called by reader thread on ACK or NAK
244 public void ackReceived(final boolean ack) {
245 if (logger.isDebugEnabled()) {
247 logger.debug("ACK received");
249 logger.debug("NAK received");
253 ackLatch.countDown();
260 logger.debug("Writing bytes: {}", msg);
261 final OutputStream out = serialPort.getOutputStream();
263 throw new IOException("serial port is not writable");
265 for (int tries = 0; tries < MAX_RETRIES && ack == null; tries++) {
267 out.write(msg.getBytes(US_ASCII));
270 final boolean acked = ackLatch.await(ACK_TIMEOUT_MS, MILLISECONDS);
274 logger.debug("ack timed out, retrying ({} of {})", tries + 1, MAX_RETRIES);
276 final Boolean ack = this.ack;
278 logger.debug("write not acked");
279 completion.complete(CmdStatus.WRITE_FAILED);
281 completion.complete(CmdStatus.ACK);
283 completion.complete(CmdStatus.NAK);
285 } catch (final IOException | InterruptedException e) {
286 logger.warn("error writing message", e);
287 completion.complete(CmdStatus.WRITE_FAILED);