]> git.basschouten.com Git - openhab-addons.git/blob
813f95aa5fbd44f32eac443a4dd0b18b2820aff7
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.upb.internal.handler;
14
15 import static java.nio.charset.StandardCharsets.US_ASCII;
16 import static java.util.concurrent.TimeUnit.MILLISECONDS;
17
18 import java.io.BufferedInputStream;
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.OutputStream;
22 import java.nio.charset.StandardCharsets;
23 import java.util.concurrent.CompletableFuture;
24 import java.util.concurrent.CompletionStage;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.ThreadPoolExecutor;
30 import java.util.concurrent.TimeUnit;
31
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.binding.upb.internal.handler.UPBIoHandler.CmdStatus;
35 import org.openhab.binding.upb.internal.message.MessageParseException;
36 import org.openhab.binding.upb.internal.message.UPBMessage;
37 import org.openhab.core.common.NamedThreadFactory;
38 import org.openhab.core.io.transport.serial.SerialPort;
39 import org.openhab.core.thing.ThingUID;
40 import org.openhab.core.util.HexUtils;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * Event loop for serial communications. Handles sending and receiving UPB messages.
46  *
47  * @author Marcus Better - Initial contribution
48  *
49  */
50 @NonNullByDefault
51 public class SerialIoThread extends Thread {
52     private static final int WRITE_QUEUE_LENGTH = 128;
53     private static final int ACK_TIMEOUT_MS = 500;
54     private static final byte[] ENABLE_MESSAGE_MODE_CMD = "\u001770028E\n".getBytes(StandardCharsets.US_ASCII);
55
56     private static final int MAX_READ_SIZE = 128;
57     private static final int CR = 13;
58
59     private final Logger logger = LoggerFactory.getLogger(SerialIoThread.class);
60     private final MessageListener listener;
61     // Single-threaded executor for writes that serves to serialize writes.
62     private final ExecutorService writeExecutor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS,
63             new LinkedBlockingQueue<>(WRITE_QUEUE_LENGTH), new NamedThreadFactory("upb-serial-writer", true));
64     private final SerialPort serialPort;
65
66     private volatile @Nullable WriteRunnable currentWrite;
67     private volatile boolean done;
68
69     public SerialIoThread(final SerialPort serialPort, final MessageListener listener, final ThingUID thingUID) {
70         this.serialPort = serialPort;
71         this.listener = listener;
72         setName("OH-binding-" + thingUID + "-serial-reader");
73         setDaemon(true);
74     }
75
76     @Override
77     public void run() {
78         enterMessageMode();
79         try (final InputStream in = serialPort.getInputStream()) {
80             if (in == null) {
81                 // should never happen
82                 throw new IllegalStateException("serial port is not readable");
83             }
84             try (final InputStream bufIn = new BufferedInputStream(in)) {
85                 bufIn.mark(MAX_READ_SIZE);
86                 int len = 0;
87                 while (!done) {
88                     final int b = bufIn.read();
89                     if (b == -1) {
90                         // the serial input returns -1 on receive timeout
91                         continue;
92                     }
93                     len++;
94                     if (b == CR) {
95                         // message terminator read, rewind the stream and parse the buffered message
96                         try {
97                             bufIn.reset();
98                             processBuffer(bufIn, len);
99                         } catch (final IOException e) {
100                             logger.warn("buffer overrun, dropped long message", e);
101                         } finally {
102                             bufIn.mark(MAX_READ_SIZE);
103                             len = 0;
104                         }
105                     }
106                 }
107             }
108         } catch (final IOException e) {
109             logger.warn("Exception in UPB read thread", e);
110         } finally {
111             logger.debug("shutting down receive thread");
112             shutdownAndAwaitTermination(writeExecutor);
113             try {
114                 serialPort.close();
115             } catch (final RuntimeException e) {
116                 // ignore
117             }
118         }
119         logger.debug("UPB read thread stopped");
120     }
121
122     /**
123      * Attempts to parse a message from the input stream.
124      *
125      * @param in the stream to read from
126      * @param len the number of bytes in the message
127      */
128     private void processBuffer(final InputStream in, final int len) {
129         final byte[] buf = new byte[len];
130         final int n;
131         try {
132             n = in.read(buf);
133         } catch (final IOException e) {
134             logger.warn("error reading message", e);
135             return;
136         }
137         if (n < len) {
138             // should not happen when replaying the buffered input
139             logger.warn("truncated read, expected={} read={}", len, n);
140             return;
141         }
142         if (logger.isDebugEnabled()) {
143             logger.debug("UPB Message: {}", HexUtils.bytesToHex(buf));
144         }
145         final UPBMessage msg;
146         try {
147             msg = UPBMessage.parse(buf);
148         } catch (final MessageParseException e) {
149             logger.warn("failed to parse message: {}", HexUtils.bytesToHex(buf), e);
150             return;
151         }
152         handleMessage(msg);
153     }
154
155     private void handleMessage(final UPBMessage msg) {
156         final WriteRunnable writeRunnable = currentWrite;
157         switch (msg.getType()) {
158             case ACK:
159                 if (writeRunnable != null) {
160                     writeRunnable.ackReceived(true);
161                 }
162                 break;
163             case NAK:
164                 if (writeRunnable != null) {
165                     writeRunnable.ackReceived(false);
166                 }
167                 break;
168             case ACCEPT:
169                 break;
170             case ERROR:
171                 logger.debug("received ERROR response from PIM");
172                 break;
173             default:
174                 // ignore
175         }
176         listener.incomingMessage(msg);
177     }
178
179     public CompletionStage<CmdStatus> enqueue(final String msg) {
180         return enqueue(msg, 1);
181     }
182
183     private CompletionStage<CmdStatus> enqueue(final String msg, int numAttempts) {
184         final CompletableFuture<CmdStatus> completion = new CompletableFuture<>();
185         final Runnable task = new WriteRunnable(msg, completion, numAttempts);
186         try {
187             writeExecutor.execute(task);
188         } catch (final RejectedExecutionException e) {
189             completion.completeExceptionally(e);
190         }
191         return completion;
192     }
193
194     // puts the PIM is in message mode
195     private void enterMessageMode() {
196         try {
197             final OutputStream out = serialPort.getOutputStream();
198             if (out == null) {
199                 throw new IOException("serial port is not writable");
200             }
201             out.write(ENABLE_MESSAGE_MODE_CMD);
202             out.flush();
203         } catch (final IOException e) {
204             logger.warn("error setting message mode", e);
205         }
206     }
207
208     void shutdownAndAwaitTermination(final ExecutorService pool) {
209         pool.shutdown();
210         try {
211             if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
212                 pool.shutdownNow();
213                 if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
214                     logger.warn("executor did not terminate");
215                 }
216             }
217         } catch (final InterruptedException ie) {
218             pool.shutdownNow();
219             Thread.currentThread().interrupt();
220         }
221     }
222
223     public void terminate() {
224         done = true;
225         try {
226             serialPort.close();
227         } catch (final RuntimeException e) {
228             logger.warn("failed to close serial port", e);
229         }
230     }
231
232     private class WriteRunnable implements Runnable {
233         private static final int MAX_RETRIES = 3;
234
235         private final String msg;
236         private final CompletableFuture<CmdStatus> completion;
237         private final CountDownLatch ackLatch = new CountDownLatch(1);
238         private final int numAttempts;
239
240         private @Nullable Boolean ack;
241
242         public WriteRunnable(final String msg, final CompletableFuture<CmdStatus> completion, int numAttempts) {
243             this.msg = msg;
244             this.completion = completion;
245             this.numAttempts = numAttempts;
246         }
247
248         // called by reader thread on ACK or NAK
249         public void ackReceived(final boolean ack) {
250             this.ack = ack;
251             ackLatch.countDown();
252         }
253
254         @Override
255         public void run() {
256             currentWrite = this;
257             try {
258                 logger.debug("Writing bytes: {}", msg);
259                 final OutputStream out = serialPort.getOutputStream();
260                 if (out == null) {
261                     throw new IOException("serial port is not writable");
262                 }
263                 final CmdStatus res;
264                 out.write(0x14);
265                 out.write(msg.getBytes(US_ASCII));
266                 out.write(0x0d);
267                 out.flush();
268                 final boolean latched = ackLatch.await(ACK_TIMEOUT_MS, MILLISECONDS);
269                 if (latched) {
270                     final Boolean ack = this.ack;
271                     if (ack == null) {
272                         logger.debug("write not acked, attempt {}", numAttempts);
273                         res = CmdStatus.WRITE_FAILED;
274                     } else if (ack) {
275                         completion.complete(CmdStatus.ACK);
276                         return;
277                     } else {
278                         logger.debug("NAK received, attempt {}", numAttempts);
279                         res = CmdStatus.NAK;
280                     }
281                 } else {
282                     logger.debug("ack timed out, attempt {}", numAttempts);
283                     res = CmdStatus.WRITE_FAILED;
284                 }
285                 if (numAttempts < MAX_RETRIES) {
286                     enqueue(msg, numAttempts + 1).thenAccept(completion::complete);
287                 } else {
288                     completion.complete(res);
289                 }
290             } catch (final IOException | InterruptedException e) {
291                 logger.warn("error writing message", e);
292                 completion.complete(CmdStatus.WRITE_FAILED);
293             }
294         }
295     }
296 }