]> git.basschouten.com Git - openhab-addons.git/blob
686c7dee008006a48ab13c2ef20d88639e7df5d8
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.upb.internal.handler;
14
15 import static java.nio.charset.StandardCharsets.US_ASCII;
16 import static java.util.concurrent.TimeUnit.MILLISECONDS;
17
18 import java.io.BufferedInputStream;
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.OutputStream;
22 import java.nio.charset.StandardCharsets;
23 import java.util.concurrent.CompletableFuture;
24 import java.util.concurrent.CompletionStage;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.ThreadPoolExecutor;
30 import java.util.concurrent.TimeUnit;
31
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.binding.upb.internal.handler.UPBIoHandler.CmdStatus;
35 import org.openhab.binding.upb.internal.message.MessageParseException;
36 import org.openhab.binding.upb.internal.message.UPBMessage;
37 import org.openhab.core.common.NamedThreadFactory;
38 import org.openhab.core.io.transport.serial.SerialPort;
39 import org.openhab.core.thing.ThingUID;
40 import org.openhab.core.util.HexUtils;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * Event loop for serial communications. Handles sending and receiving UPB messages.
46  *
47  * @author Marcus Better - Initial contribution
48  *
49  */
50 @NonNullByDefault
51 public class SerialIoThread extends Thread {
52     private static final int WRITE_QUEUE_LENGTH = 128;
53     private static final int ACK_TIMEOUT_MS = 500;
54     private static final byte[] ENABLE_MESSAGE_MODE_CMD = "\u001770028E\n".getBytes(StandardCharsets.US_ASCII);
55
56     private static final int MAX_READ_SIZE = 128;
57     private static final int CR = 13;
58
59     private final Logger logger = LoggerFactory.getLogger(SerialIoThread.class);
60     private final MessageListener listener;
61     // Single-threaded executor for writes that serves to serialize writes.
62     private final ExecutorService writeExecutor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS,
63             new LinkedBlockingQueue<>(WRITE_QUEUE_LENGTH), new NamedThreadFactory("upb-serial-writer", true)) {
64         @Override
65         protected void beforeExecute(final @Nullable Thread t, final @Nullable Runnable r) {
66             // ensure we have prepared the PIM before allowing any writes
67             super.beforeExecute(t, r);
68             try {
69                 initialized.await();
70             } catch (final InterruptedException e) {
71                 t.interrupt();
72             }
73         }
74     };
75     private final CountDownLatch initialized = new CountDownLatch(1);
76     private final SerialPort serialPort;
77
78     private volatile @Nullable WriteRunnable currentWrite;
79     private volatile boolean done;
80
81     public SerialIoThread(final SerialPort serialPort, final MessageListener listener, final ThingUID thingUID) {
82         this.serialPort = serialPort;
83         this.listener = listener;
84         setName("OH-binding-" + thingUID + "-serial-reader");
85         setDaemon(true);
86     }
87
88     @Override
89     public void run() {
90         enterMessageMode();
91         try (final InputStream in = serialPort.getInputStream()) {
92             if (in == null) {
93                 // should never happen
94                 throw new IllegalStateException("serial port is not readable");
95             }
96             try (final InputStream bufIn = new BufferedInputStream(in)) {
97                 bufIn.mark(MAX_READ_SIZE);
98                 int len = 0;
99                 while (!done) {
100                     final int b = bufIn.read();
101                     if (b == -1) {
102                         // the serial input returns -1 on receive timeout
103                         continue;
104                     }
105                     len++;
106                     if (b == CR) {
107                         // message terminator read, rewind the stream and parse the buffered message
108                         try {
109                             bufIn.reset();
110                             processBuffer(bufIn, len);
111                         } catch (final IOException e) {
112                             logger.warn("buffer overrun, dropped long message", e);
113                         } finally {
114                             bufIn.mark(MAX_READ_SIZE);
115                             len = 0;
116                         }
117                     }
118                 }
119             }
120         } catch (final IOException e) {
121             logger.warn("Exception in UPB read thread", e);
122         } finally {
123             logger.debug("shutting down receive thread");
124             shutdownAndAwaitTermination(writeExecutor);
125             try {
126                 serialPort.close();
127             } catch (final RuntimeException e) {
128                 // ignore
129             }
130         }
131         logger.debug("UPB read thread stopped");
132     }
133
134     /**
135      * Attempts to parse a message from the input stream.
136      *
137      * @param in the stream to read from
138      * @param len the number of bytes in the message
139      */
140     private void processBuffer(final InputStream in, final int len) {
141         final byte[] buf = new byte[len];
142         final int n;
143         try {
144             n = in.read(buf);
145         } catch (final IOException e) {
146             logger.warn("error reading message", e);
147             return;
148         }
149         if (n < len) {
150             // should not happen when replaying the buffered input
151             logger.warn("truncated read, expected={} read={}", len, n);
152             return;
153         }
154         if (logger.isDebugEnabled()) {
155             logger.debug("UPB Message: {}", formatMessage(buf));
156         }
157         final UPBMessage msg;
158         try {
159             msg = UPBMessage.parse(buf);
160         } catch (final MessageParseException e) {
161             logger.warn("failed to parse message: {}", HexUtils.bytesToHex(buf), e);
162             return;
163         }
164         handleMessage(msg);
165     }
166
167     private void handleMessage(final UPBMessage msg) {
168         final WriteRunnable writeRunnable = currentWrite;
169         switch (msg.getType()) {
170             case ACK:
171                 if (writeRunnable != null) {
172                     writeRunnable.ackReceived(true);
173                 }
174                 break;
175             case NAK:
176                 if (writeRunnable != null) {
177                     writeRunnable.ackReceived(false);
178                 }
179                 break;
180             case ACCEPT:
181                 break;
182             case ERROR:
183                 logger.debug("received ERROR response from PIM");
184                 break;
185             default:
186                 // ignore
187         }
188         listener.incomingMessage(msg);
189     }
190
191     public CompletionStage<CmdStatus> enqueue(final String msg) {
192         return enqueue(msg, 1);
193     }
194
195     private CompletionStage<CmdStatus> enqueue(final String msg, int numAttempts) {
196         final CompletableFuture<CmdStatus> completion = new CompletableFuture<>();
197         final Runnable task = new WriteRunnable(msg, completion, numAttempts);
198         try {
199             writeExecutor.execute(task);
200         } catch (final RejectedExecutionException e) {
201             completion.completeExceptionally(e);
202         }
203         return completion;
204     }
205
206     // puts the PIM is in message mode
207     private void enterMessageMode() {
208         try {
209             final OutputStream out = serialPort.getOutputStream();
210             if (out == null) {
211                 throw new IOException("serial port is not writable");
212             }
213             out.write(ENABLE_MESSAGE_MODE_CMD);
214             out.flush();
215         } catch (final IOException e) {
216             logger.warn("error setting message mode", e);
217         } finally {
218             // signal that writes can proceed
219             initialized.countDown();
220         }
221     }
222
223     void shutdownAndAwaitTermination(final ExecutorService pool) {
224         pool.shutdown();
225         try {
226             if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
227                 pool.shutdownNow();
228                 if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
229                     logger.warn("executor did not terminate");
230                 }
231             }
232         } catch (final InterruptedException ie) {
233             pool.shutdownNow();
234             Thread.currentThread().interrupt();
235         }
236     }
237
238     public void terminate() {
239         done = true;
240         try {
241             serialPort.close();
242         } catch (final RuntimeException e) {
243             logger.warn("failed to close serial port", e);
244         }
245     }
246
247     // format a message for debug logging, include only printable characters
248     private static String formatMessage(byte[] buf) {
249         final int len;
250         // omit the final newline
251         if (buf[buf.length - 1] == '\r') {
252             len = buf.length - 1;
253         } else {
254             len = buf.length;
255         }
256         final String s = new String(buf, 0, len, US_ASCII);
257         if (s.chars().allMatch(c -> c >= 32 && c < 127)) {
258             return s;
259         } else {
260             // presence of non-printable characters is either noise or a misconfiguration, log it in hex
261             return HexUtils.bytesToHex(buf);
262         }
263     }
264
265     private class WriteRunnable implements Runnable {
266         private static final int MAX_RETRIES = 3;
267
268         private final String msg;
269         private final CompletableFuture<CmdStatus> completion;
270         private final CountDownLatch ackLatch = new CountDownLatch(1);
271         private final int numAttempts;
272
273         private @Nullable Boolean ack;
274
275         public WriteRunnable(final String msg, final CompletableFuture<CmdStatus> completion, int numAttempts) {
276             this.msg = msg;
277             this.completion = completion;
278             this.numAttempts = numAttempts;
279         }
280
281         // called by reader thread on ACK or NAK
282         public void ackReceived(final boolean ack) {
283             this.ack = ack;
284             ackLatch.countDown();
285         }
286
287         @Override
288         public void run() {
289             currentWrite = this;
290             try {
291                 logger.debug("Writing bytes: {}", msg);
292                 final OutputStream out = serialPort.getOutputStream();
293                 if (out == null) {
294                     throw new IOException("serial port is not writable");
295                 }
296                 final CmdStatus res;
297                 out.write(0x14);
298                 out.write(msg.getBytes(US_ASCII));
299                 out.write(0x0d);
300                 out.flush();
301                 final boolean latched = ackLatch.await(ACK_TIMEOUT_MS, MILLISECONDS);
302                 if (latched) {
303                     final Boolean ack = this.ack;
304                     if (ack == null) {
305                         logger.debug("write not acked, attempt {}", numAttempts);
306                         res = CmdStatus.WRITE_FAILED;
307                     } else if (ack) {
308                         completion.complete(CmdStatus.ACK);
309                         return;
310                     } else {
311                         logger.debug("NAK received, attempt {}", numAttempts);
312                         res = CmdStatus.NAK;
313                     }
314                 } else {
315                     logger.debug("ack timed out, attempt {}", numAttempts);
316                     res = CmdStatus.WRITE_FAILED;
317                 }
318                 if (numAttempts < MAX_RETRIES) {
319                     enqueue(msg, numAttempts + 1).thenAccept(completion::complete);
320                 } else {
321                     completion.complete(res);
322                 }
323             } catch (final IOException | InterruptedException e) {
324                 logger.warn("error writing message", e);
325                 completion.complete(CmdStatus.WRITE_FAILED);
326             }
327         }
328     }
329 }