2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.upb.internal.handler;
15 import static java.nio.charset.StandardCharsets.US_ASCII;
16 import static java.util.concurrent.TimeUnit.MILLISECONDS;
18 import java.io.BufferedInputStream;
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.OutputStream;
22 import java.nio.charset.StandardCharsets;
23 import java.util.concurrent.CompletableFuture;
24 import java.util.concurrent.CompletionStage;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.ThreadPoolExecutor;
30 import java.util.concurrent.TimeUnit;
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.binding.upb.internal.handler.UPBIoHandler.CmdStatus;
35 import org.openhab.binding.upb.internal.message.MessageParseException;
36 import org.openhab.binding.upb.internal.message.UPBMessage;
37 import org.openhab.core.common.NamedThreadFactory;
38 import org.openhab.core.io.transport.serial.SerialPort;
39 import org.openhab.core.thing.ThingUID;
40 import org.openhab.core.util.HexUtils;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
45 * Event loop for serial communications. Handles sending and receiving UPB messages.
47 * @author Marcus Better - Initial contribution
51 public class SerialIoThread extends Thread {
52 private static final int WRITE_QUEUE_LENGTH = 128;
53 private static final int ACK_TIMEOUT_MS = 500;
54 private static final byte[] ENABLE_MESSAGE_MODE_CMD = "\u001770028E\n".getBytes(StandardCharsets.US_ASCII);
56 private static final int MAX_READ_SIZE = 128;
57 private static final int CR = 13;
59 private final Logger logger = LoggerFactory.getLogger(SerialIoThread.class);
60 private final MessageListener listener;
61 // Single-threaded executor for writes that serves to serialize writes.
62 private final ExecutorService writeExecutor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS,
63 new LinkedBlockingQueue<>(WRITE_QUEUE_LENGTH), new NamedThreadFactory("upb-serial-writer", true)) {
65 protected void beforeExecute(final @Nullable Thread t, final @Nullable Runnable r) {
66 // ensure we have prepared the PIM before allowing any writes
67 super.beforeExecute(t, r);
70 } catch (final InterruptedException e) {
75 private final CountDownLatch initialized = new CountDownLatch(1);
76 private final SerialPort serialPort;
78 private volatile @Nullable WriteRunnable currentWrite;
79 private volatile boolean done;
81 public SerialIoThread(final SerialPort serialPort, final MessageListener listener, final ThingUID thingUID) {
82 this.serialPort = serialPort;
83 this.listener = listener;
84 setName("OH-binding-" + thingUID + "-serial-reader");
91 try (final InputStream in = serialPort.getInputStream()) {
93 // should never happen
94 throw new IllegalStateException("serial port is not readable");
96 try (final InputStream bufIn = new BufferedInputStream(in)) {
97 bufIn.mark(MAX_READ_SIZE);
100 final int b = bufIn.read();
102 // the serial input returns -1 on receive timeout
107 // message terminator read, rewind the stream and parse the buffered message
110 processBuffer(bufIn, len);
111 } catch (final IOException e) {
112 logger.warn("buffer overrun, dropped long message", e);
114 bufIn.mark(MAX_READ_SIZE);
120 } catch (final IOException e) {
121 logger.warn("Exception in UPB read thread", e);
123 logger.debug("shutting down receive thread");
124 shutdownAndAwaitTermination(writeExecutor);
127 } catch (final RuntimeException e) {
131 logger.debug("UPB read thread stopped");
135 * Attempts to parse a message from the input stream.
137 * @param in the stream to read from
138 * @param len the number of bytes in the message
140 private void processBuffer(final InputStream in, final int len) {
141 final byte[] buf = new byte[len];
145 } catch (final IOException e) {
146 logger.warn("error reading message", e);
150 // should not happen when replaying the buffered input
151 logger.warn("truncated read, expected={} read={}", len, n);
154 if (logger.isDebugEnabled()) {
155 logger.debug("UPB Message: {}", formatMessage(buf));
157 final UPBMessage msg;
159 msg = UPBMessage.parse(buf);
160 } catch (final MessageParseException e) {
161 logger.warn("failed to parse message: {}", HexUtils.bytesToHex(buf), e);
167 private void handleMessage(final UPBMessage msg) {
168 final WriteRunnable writeRunnable = currentWrite;
169 switch (msg.getType()) {
171 if (writeRunnable != null) {
172 writeRunnable.ackReceived(true);
176 if (writeRunnable != null) {
177 writeRunnable.ackReceived(false);
183 logger.debug("received ERROR response from PIM");
188 listener.incomingMessage(msg);
191 public CompletionStage<CmdStatus> enqueue(final String msg) {
192 return enqueue(msg, 1);
195 private CompletionStage<CmdStatus> enqueue(final String msg, int numAttempts) {
196 final CompletableFuture<CmdStatus> completion = new CompletableFuture<>();
197 final Runnable task = new WriteRunnable(msg, completion, numAttempts);
199 writeExecutor.execute(task);
200 } catch (final RejectedExecutionException e) {
201 completion.completeExceptionally(e);
206 // puts the PIM is in message mode
207 private void enterMessageMode() {
209 final OutputStream out = serialPort.getOutputStream();
211 throw new IOException("serial port is not writable");
213 out.write(ENABLE_MESSAGE_MODE_CMD);
215 } catch (final IOException e) {
216 logger.warn("error setting message mode", e);
218 // signal that writes can proceed
219 initialized.countDown();
223 void shutdownAndAwaitTermination(final ExecutorService pool) {
226 if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
228 if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
229 logger.warn("executor did not terminate");
232 } catch (final InterruptedException ie) {
234 Thread.currentThread().interrupt();
238 public void terminate() {
242 } catch (final RuntimeException e) {
243 logger.warn("failed to close serial port", e);
247 // format a message for debug logging, include only printable characters
248 private static String formatMessage(byte[] buf) {
250 // omit the final newline
251 if (buf[buf.length - 1] == '\r') {
252 len = buf.length - 1;
256 final String s = new String(buf, 0, len, US_ASCII);
257 if (s.chars().allMatch(c -> c >= 32 && c < 127)) {
260 // presence of non-printable characters is either noise or a misconfiguration, log it in hex
261 return HexUtils.bytesToHex(buf);
265 private class WriteRunnable implements Runnable {
266 private static final int MAX_RETRIES = 3;
268 private final String msg;
269 private final CompletableFuture<CmdStatus> completion;
270 private final CountDownLatch ackLatch = new CountDownLatch(1);
271 private final int numAttempts;
273 private @Nullable Boolean ack;
275 public WriteRunnable(final String msg, final CompletableFuture<CmdStatus> completion, int numAttempts) {
277 this.completion = completion;
278 this.numAttempts = numAttempts;
281 // called by reader thread on ACK or NAK
282 public void ackReceived(final boolean ack) {
284 ackLatch.countDown();
291 logger.debug("Writing bytes: {}", msg);
292 final OutputStream out = serialPort.getOutputStream();
294 throw new IOException("serial port is not writable");
298 out.write(msg.getBytes(US_ASCII));
301 final boolean latched = ackLatch.await(ACK_TIMEOUT_MS, MILLISECONDS);
303 final Boolean ack = this.ack;
305 logger.debug("write not acked, attempt {}", numAttempts);
306 res = CmdStatus.WRITE_FAILED;
308 completion.complete(CmdStatus.ACK);
311 logger.debug("NAK received, attempt {}", numAttempts);
315 logger.debug("ack timed out, attempt {}", numAttempts);
316 res = CmdStatus.WRITE_FAILED;
318 if (numAttempts < MAX_RETRIES) {
319 enqueue(msg, numAttempts + 1).thenAccept(completion::complete);
321 completion.complete(res);
323 } catch (final IOException | InterruptedException e) {
324 logger.warn("error writing message", e);
325 completion.complete(CmdStatus.WRITE_FAILED);