2 * Copyright (c) 2010-2021 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.upb.internal.handler;
15 import static java.nio.charset.StandardCharsets.US_ASCII;
16 import static java.util.concurrent.TimeUnit.MILLISECONDS;
18 import java.io.BufferedInputStream;
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.OutputStream;
22 import java.nio.charset.StandardCharsets;
23 import java.util.concurrent.CompletableFuture;
24 import java.util.concurrent.CompletionStage;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.ThreadPoolExecutor;
30 import java.util.concurrent.TimeUnit;
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.binding.upb.internal.handler.UPBIoHandler.CmdStatus;
35 import org.openhab.binding.upb.internal.message.MessageParseException;
36 import org.openhab.binding.upb.internal.message.UPBMessage;
37 import org.openhab.core.common.NamedThreadFactory;
38 import org.openhab.core.io.transport.serial.SerialPort;
39 import org.openhab.core.thing.ThingUID;
40 import org.openhab.core.util.HexUtils;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
45 * Event loop for serial communications. Handles sending and receiving UPB messages.
47 * @author Marcus Better - Initial contribution
51 public class SerialIoThread extends Thread {
52 private static final int WRITE_QUEUE_LENGTH = 128;
53 private static final int ACK_TIMEOUT_MS = 500;
54 private static final byte[] ENABLE_MESSAGE_MODE_CMD = "\u001770028E\n".getBytes(StandardCharsets.US_ASCII);
56 private static final int MAX_READ_SIZE = 128;
57 private static final int CR = 13;
59 private final Logger logger = LoggerFactory.getLogger(SerialIoThread.class);
60 private final MessageListener listener;
61 // Single-threaded executor for writes that serves to serialize writes.
62 private final ExecutorService writeExecutor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS,
63 new LinkedBlockingQueue<>(WRITE_QUEUE_LENGTH), new NamedThreadFactory("upb-serial-writer", true));
64 private final SerialPort serialPort;
66 private volatile @Nullable WriteRunnable currentWrite;
67 private volatile boolean done;
69 public SerialIoThread(final SerialPort serialPort, final MessageListener listener, final ThingUID thingUID) {
70 this.serialPort = serialPort;
71 this.listener = listener;
72 setName("OH-binding-" + thingUID + "-serial-reader");
79 try (final InputStream in = serialPort.getInputStream()) {
81 // should never happen
82 throw new IllegalStateException("serial port is not readable");
84 try (final InputStream bufIn = new BufferedInputStream(in)) {
85 bufIn.mark(MAX_READ_SIZE);
88 final int b = bufIn.read();
90 // the serial input returns -1 on receive timeout
95 // message terminator read, rewind the stream and parse the buffered message
98 processBuffer(bufIn, len);
99 } catch (final IOException e) {
100 logger.warn("buffer overrun, dropped long message", e);
102 bufIn.mark(MAX_READ_SIZE);
108 } catch (final IOException e) {
109 logger.warn("Exception in UPB read thread", e);
111 logger.debug("shutting down receive thread");
112 shutdownAndAwaitTermination(writeExecutor);
115 } catch (final RuntimeException e) {
119 logger.debug("UPB read thread stopped");
123 * Attempts to parse a message from the input stream.
125 * @param in the stream to read from
126 * @param len the number of bytes in the message
128 private void processBuffer(final InputStream in, final int len) {
129 final byte[] buf = new byte[len];
133 } catch (final IOException e) {
134 logger.warn("error reading message", e);
138 // should not happen when replaying the buffered input
139 logger.warn("truncated read, expected={} read={}", len, n);
142 if (logger.isDebugEnabled()) {
143 logger.debug("UPB Message: {}", HexUtils.bytesToHex(buf));
145 final UPBMessage msg;
147 msg = UPBMessage.parse(buf);
148 } catch (final MessageParseException e) {
149 logger.warn("failed to parse message: {}", HexUtils.bytesToHex(buf), e);
155 private void handleMessage(final UPBMessage msg) {
156 final WriteRunnable writeRunnable = currentWrite;
157 switch (msg.getType()) {
159 if (writeRunnable != null) {
160 writeRunnable.ackReceived(true);
164 if (writeRunnable != null) {
165 writeRunnable.ackReceived(false);
171 logger.debug("received ERROR response from PIM");
176 listener.incomingMessage(msg);
179 public CompletionStage<CmdStatus> enqueue(final String msg) {
180 return enqueue(msg, 1);
183 private CompletionStage<CmdStatus> enqueue(final String msg, int numAttempts) {
184 final CompletableFuture<CmdStatus> completion = new CompletableFuture<>();
185 final Runnable task = new WriteRunnable(msg, completion, numAttempts);
187 writeExecutor.execute(task);
188 } catch (final RejectedExecutionException e) {
189 completion.completeExceptionally(e);
194 // puts the PIM is in message mode
195 private void enterMessageMode() {
197 final OutputStream out = serialPort.getOutputStream();
199 throw new IOException("serial port is not writable");
201 out.write(ENABLE_MESSAGE_MODE_CMD);
203 } catch (final IOException e) {
204 logger.warn("error setting message mode", e);
208 void shutdownAndAwaitTermination(final ExecutorService pool) {
211 if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
213 if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
214 logger.warn("executor did not terminate");
217 } catch (final InterruptedException ie) {
219 Thread.currentThread().interrupt();
223 public void terminate() {
227 } catch (final RuntimeException e) {
228 logger.warn("failed to close serial port", e);
232 private class WriteRunnable implements Runnable {
233 private static final int MAX_RETRIES = 3;
235 private final String msg;
236 private final CompletableFuture<CmdStatus> completion;
237 private final CountDownLatch ackLatch = new CountDownLatch(1);
238 private final int numAttempts;
240 private @Nullable Boolean ack;
242 public WriteRunnable(final String msg, final CompletableFuture<CmdStatus> completion, int numAttempts) {
244 this.completion = completion;
245 this.numAttempts = numAttempts;
248 // called by reader thread on ACK or NAK
249 public void ackReceived(final boolean ack) {
251 ackLatch.countDown();
258 logger.debug("Writing bytes: {}", msg);
259 final OutputStream out = serialPort.getOutputStream();
261 throw new IOException("serial port is not writable");
265 out.write(msg.getBytes(US_ASCII));
268 final boolean latched = ackLatch.await(ACK_TIMEOUT_MS, MILLISECONDS);
270 final Boolean ack = this.ack;
272 logger.debug("write not acked, attempt {}", numAttempts);
273 res = CmdStatus.WRITE_FAILED;
275 completion.complete(CmdStatus.ACK);
278 logger.debug("NAK received, attempt {}", numAttempts);
282 logger.debug("ack timed out, attempt {}", numAttempts);
283 res = CmdStatus.WRITE_FAILED;
285 if (numAttempts < MAX_RETRIES) {
286 enqueue(msg, numAttempts + 1).thenAccept(completion::complete);
288 completion.complete(res);
290 } catch (final IOException | InterruptedException e) {
291 logger.warn("error writing message", e);
292 completion.complete(CmdStatus.WRITE_FAILED);