]> git.basschouten.com Git - openhab-addons.git/blob
3244ef66196b3adb8509fd79b0039e35edb57693
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.upb.internal.handler;
14
15 import static java.nio.charset.StandardCharsets.US_ASCII;
16 import static java.util.concurrent.TimeUnit.MILLISECONDS;
17
18 import java.io.BufferedInputStream;
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.OutputStream;
22 import java.nio.charset.StandardCharsets;
23 import java.util.concurrent.CompletableFuture;
24 import java.util.concurrent.CompletionStage;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.ThreadPoolExecutor;
30 import java.util.concurrent.TimeUnit;
31
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.binding.upb.internal.handler.UPBIoHandler.CmdStatus;
35 import org.openhab.binding.upb.internal.message.MessageBuilder;
36 import org.openhab.binding.upb.internal.message.MessageParseException;
37 import org.openhab.binding.upb.internal.message.UPBMessage;
38 import org.openhab.core.common.NamedThreadFactory;
39 import org.openhab.core.io.transport.serial.SerialPort;
40 import org.openhab.core.thing.ThingUID;
41 import org.openhab.core.util.HexUtils;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Event loop for serial communications. Handles sending and receiving UPB messages.
47  *
48  * @author Marcus Better - Initial contribution
49  *
50  */
51 @NonNullByDefault
52 public class SerialIoThread extends Thread {
53     private static final int WRITE_QUEUE_LENGTH = 128;
54     private static final int ACK_TIMEOUT_MS = 500;
55     private static final byte[] ENABLE_MESSAGE_MODE_CMD = "\u001770028E\n".getBytes(StandardCharsets.US_ASCII);
56
57     private static final int MAX_READ_SIZE = 128;
58     private static final int CR = 13;
59
60     private final Logger logger = LoggerFactory.getLogger(SerialIoThread.class);
61     private final MessageListener listener;
62     // Single-threaded executor for writes that serves to serialize writes.
63     private final ExecutorService writeExecutor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS,
64             new LinkedBlockingQueue<>(WRITE_QUEUE_LENGTH), new NamedThreadFactory("upb-serial-writer", true));
65     private final SerialPort serialPort;
66
67     private volatile @Nullable WriteRunnable currentWrite;
68     private volatile boolean done;
69
70     public SerialIoThread(final SerialPort serialPort, final MessageListener listener, final ThingUID thingUID) {
71         this.serialPort = serialPort;
72         this.listener = listener;
73         setName("OH-binding-" + thingUID + "-serial-reader");
74         setDaemon(true);
75     }
76
77     @Override
78     public void run() {
79         enterMessageMode();
80         try (final InputStream in = serialPort.getInputStream()) {
81             if (in == null) {
82                 // should never happen
83                 throw new IllegalStateException("serial port is not readable");
84             }
85             try (final InputStream bufIn = new BufferedInputStream(in)) {
86                 bufIn.mark(MAX_READ_SIZE);
87                 int len = 0;
88                 while (!done) {
89                     final int b = bufIn.read();
90                     if (b == -1) {
91                         // the serial input returns -1 on receive timeout
92                         continue;
93                     }
94                     len++;
95                     if (b == CR) {
96                         // message terminator read, rewind the stream and parse the buffered message
97                         try {
98                             bufIn.reset();
99                             processBuffer(bufIn, len);
100                         } catch (final IOException e) {
101                             logger.warn("buffer overrun, dropped long message", e);
102                         } finally {
103                             bufIn.mark(MAX_READ_SIZE);
104                             len = 0;
105                         }
106                     }
107                 }
108             }
109         } catch (final IOException e) {
110             logger.warn("Exception in UPB read thread", e);
111         } finally {
112             logger.debug("shutting down receive thread");
113             shutdownAndAwaitTermination(writeExecutor);
114             try {
115                 serialPort.close();
116             } catch (final RuntimeException e) {
117                 // ignore
118             }
119         }
120         logger.debug("UPB read thread stopped");
121     }
122
123     /**
124      * Attempts to parse a message from the input stream.
125      *
126      * @param in the stream to read from
127      * @param len the number of bytes in the message
128      */
129     private void processBuffer(final InputStream in, final int len) {
130         final byte[] buf = new byte[len];
131         final int n;
132         try {
133             n = in.read(buf);
134         } catch (final IOException e) {
135             logger.warn("error reading message", e);
136             return;
137         }
138         if (n < len) {
139             // should not happen when replaying the buffered input
140             logger.warn("truncated read, expected={} read={}", len, n);
141             return;
142         }
143         if (logger.isDebugEnabled()) {
144             logger.debug("UPB Message: {}", HexUtils.bytesToHex(buf));
145         }
146         final UPBMessage msg;
147         try {
148             msg = UPBMessage.parse(buf);
149         } catch (final MessageParseException e) {
150             logger.warn("failed to parse message: {}", HexUtils.bytesToHex(buf), e);
151             return;
152         }
153         handleMessage(msg);
154     }
155
156     private void handleMessage(final UPBMessage msg) {
157         final WriteRunnable writeRunnable = currentWrite;
158         switch (msg.getType()) {
159             case ACK:
160                 if (writeRunnable != null) {
161                     writeRunnable.ackReceived(true);
162                 }
163                 break;
164             case NAK:
165                 if (writeRunnable != null) {
166                     writeRunnable.ackReceived(false);
167                 }
168                 break;
169             case ACCEPT:
170                 break;
171             case ERROR:
172                 logger.debug("received ERROR response from PIM");
173                 break;
174             default:
175                 // ignore
176         }
177         listener.incomingMessage(msg);
178     }
179
180     public CompletionStage<CmdStatus> enqueue(final MessageBuilder msg) {
181         final CompletableFuture<CmdStatus> completion = new CompletableFuture<>();
182         final Runnable task = new WriteRunnable(msg.build(), completion);
183         try {
184             writeExecutor.execute(task);
185         } catch (final RejectedExecutionException e) {
186             completion.completeExceptionally(e);
187         }
188         return completion;
189     }
190
191     // puts the PIM is in message mode
192     private void enterMessageMode() {
193         try {
194             final OutputStream out = serialPort.getOutputStream();
195             if (out == null) {
196                 throw new IOException("serial port is not writable");
197             }
198             out.write(ENABLE_MESSAGE_MODE_CMD);
199             out.flush();
200         } catch (final IOException e) {
201             logger.warn("error setting message mode", e);
202         }
203     }
204
205     void shutdownAndAwaitTermination(final ExecutorService pool) {
206         pool.shutdown();
207         try {
208             if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
209                 pool.shutdownNow();
210                 if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
211                     logger.warn("executor did not terminate");
212                 }
213             }
214         } catch (final InterruptedException ie) {
215             pool.shutdownNow();
216             Thread.currentThread().interrupt();
217         }
218     }
219
220     public void terminate() {
221         done = true;
222         try {
223             serialPort.close();
224         } catch (final RuntimeException e) {
225             logger.warn("failed to close serial port", e);
226         }
227     }
228
229     private class WriteRunnable implements Runnable {
230         private static final int MAX_RETRIES = 3;
231
232         private final String msg;
233         private final CompletableFuture<CmdStatus> completion;
234         private final CountDownLatch ackLatch = new CountDownLatch(1);
235
236         private @Nullable Boolean ack;
237
238         public WriteRunnable(final String msg, final CompletableFuture<CmdStatus> completion) {
239             this.msg = msg;
240             this.completion = completion;
241         }
242
243         // called by reader thread on ACK or NAK
244         public void ackReceived(final boolean ack) {
245             if (logger.isDebugEnabled()) {
246                 if (ack) {
247                     logger.debug("ACK received");
248                 } else {
249                     logger.debug("NAK received");
250                 }
251             }
252             this.ack = ack;
253             ackLatch.countDown();
254         }
255
256         @Override
257         public void run() {
258             currentWrite = this;
259             try {
260                 logger.debug("Writing bytes: {}", msg);
261                 final OutputStream out = serialPort.getOutputStream();
262                 if (out == null) {
263                     throw new IOException("serial port is not writable");
264                 }
265                 for (int tries = 0; tries < MAX_RETRIES && ack == null; tries++) {
266                     out.write(0x14);
267                     out.write(msg.getBytes(US_ASCII));
268                     out.write(0x0d);
269                     out.flush();
270                     final boolean acked = ackLatch.await(ACK_TIMEOUT_MS, MILLISECONDS);
271                     if (acked) {
272                         break;
273                     }
274                     logger.debug("ack timed out, retrying ({} of {})", tries + 1, MAX_RETRIES);
275                 }
276                 final Boolean ack = this.ack;
277                 if (ack == null) {
278                     logger.debug("write not acked");
279                     completion.complete(CmdStatus.WRITE_FAILED);
280                 } else if (ack) {
281                     completion.complete(CmdStatus.ACK);
282                 } else {
283                     completion.complete(CmdStatus.NAK);
284                 }
285             } catch (final IOException | InterruptedException e) {
286                 logger.warn("error writing message", e);
287                 completion.complete(CmdStatus.WRITE_FAILED);
288             }
289         }
290     }
291 }