]> git.basschouten.com Git - openhab-addons.git/commitdiff
[upb] Fix retry logic (#11342)
authorMarcus Better <marcusb@users.noreply.github.com>
Sat, 9 Oct 2021 14:29:09 +0000 (10:29 -0400)
committerGitHub <noreply@github.com>
Sat, 9 Oct 2021 14:29:09 +0000 (16:29 +0200)
* [upb] Fix retry logic

The retry logic was broken so it never retried. This fixes it and adds
unit tests for the serial communication and retry behavior.

Signed-off-by: Marcus Better <marcus@better.se>
* Remove excessive log

Signed-off-by: Marcus Better <marcus@better.se>
bundles/org.openhab.binding.upb/src/main/java/org/openhab/binding/upb/internal/handler/SerialIoThread.java
bundles/org.openhab.binding.upb/src/main/java/org/openhab/binding/upb/internal/handler/SerialPIMHandler.java
bundles/org.openhab.binding.upb/src/test/java/org/openhab/binding/upb/internal/SerialIoThreadTest.java [new file with mode: 0644]

index 3244ef66196b3adb8509fd79b0039e35edb57693..813f95aa5fbd44f32eac443a4dd0b18b2820aff7 100644 (file)
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
 import org.openhab.binding.upb.internal.handler.UPBIoHandler.CmdStatus;
-import org.openhab.binding.upb.internal.message.MessageBuilder;
 import org.openhab.binding.upb.internal.message.MessageParseException;
 import org.openhab.binding.upb.internal.message.UPBMessage;
 import org.openhab.core.common.NamedThreadFactory;
@@ -177,9 +176,13 @@ public class SerialIoThread extends Thread {
         listener.incomingMessage(msg);
     }
 
-    public CompletionStage<CmdStatus> enqueue(final MessageBuilder msg) {
+    public CompletionStage<CmdStatus> enqueue(final String msg) {
+        return enqueue(msg, 1);
+    }
+
+    private CompletionStage<CmdStatus> enqueue(final String msg, int numAttempts) {
         final CompletableFuture<CmdStatus> completion = new CompletableFuture<>();
-        final Runnable task = new WriteRunnable(msg.build(), completion);
+        final Runnable task = new WriteRunnable(msg, completion, numAttempts);
         try {
             writeExecutor.execute(task);
         } catch (final RejectedExecutionException e) {
@@ -232,23 +235,18 @@ public class SerialIoThread extends Thread {
         private final String msg;
         private final CompletableFuture<CmdStatus> completion;
         private final CountDownLatch ackLatch = new CountDownLatch(1);
+        private final int numAttempts;
 
         private @Nullable Boolean ack;
 
-        public WriteRunnable(final String msg, final CompletableFuture<CmdStatus> completion) {
+        public WriteRunnable(final String msg, final CompletableFuture<CmdStatus> completion, int numAttempts) {
             this.msg = msg;
             this.completion = completion;
+            this.numAttempts = numAttempts;
         }
 
         // called by reader thread on ACK or NAK
         public void ackReceived(final boolean ack) {
-            if (logger.isDebugEnabled()) {
-                if (ack) {
-                    logger.debug("ACK received");
-                } else {
-                    logger.debug("NAK received");
-                }
-            }
             this.ack = ack;
             ackLatch.countDown();
         }
@@ -262,25 +260,32 @@ public class SerialIoThread extends Thread {
                 if (out == null) {
                     throw new IOException("serial port is not writable");
                 }
-                for (int tries = 0; tries < MAX_RETRIES && ack == null; tries++) {
-                    out.write(0x14);
-                    out.write(msg.getBytes(US_ASCII));
-                    out.write(0x0d);
-                    out.flush();
-                    final boolean acked = ackLatch.await(ACK_TIMEOUT_MS, MILLISECONDS);
-                    if (acked) {
-                        break;
+                final CmdStatus res;
+                out.write(0x14);
+                out.write(msg.getBytes(US_ASCII));
+                out.write(0x0d);
+                out.flush();
+                final boolean latched = ackLatch.await(ACK_TIMEOUT_MS, MILLISECONDS);
+                if (latched) {
+                    final Boolean ack = this.ack;
+                    if (ack == null) {
+                        logger.debug("write not acked, attempt {}", numAttempts);
+                        res = CmdStatus.WRITE_FAILED;
+                    } else if (ack) {
+                        completion.complete(CmdStatus.ACK);
+                        return;
+                    } else {
+                        logger.debug("NAK received, attempt {}", numAttempts);
+                        res = CmdStatus.NAK;
                     }
-                    logger.debug("ack timed out, retrying ({} of {})", tries + 1, MAX_RETRIES);
+                } else {
+                    logger.debug("ack timed out, attempt {}", numAttempts);
+                    res = CmdStatus.WRITE_FAILED;
                 }
-                final Boolean ack = this.ack;
-                if (ack == null) {
-                    logger.debug("write not acked");
-                    completion.complete(CmdStatus.WRITE_FAILED);
-                } else if (ack) {
-                    completion.complete(CmdStatus.ACK);
+                if (numAttempts < MAX_RETRIES) {
+                    enqueue(msg, numAttempts + 1).thenAccept(completion::complete);
                 } else {
-                    completion.complete(CmdStatus.NAK);
+                    completion.complete(res);
                 }
             } catch (final IOException | InterruptedException e) {
                 logger.warn("error writing message", e);
index 0117999b7da89c573c28b537caa86ddc81867206..a95abfb6ecdce55a30bbee1d10377e12017ab2ed 100644 (file)
@@ -155,7 +155,7 @@ public class SerialPIMHandler extends PIMHandler {
     public CompletionStage<CmdStatus> sendPacket(final MessageBuilder msg) {
         final SerialIoThread receiveThread = this.receiveThread;
         if (receiveThread != null) {
-            return receiveThread.enqueue(msg);
+            return receiveThread.enqueue(msg.build());
         } else {
             return exceptionallyCompletedFuture(new IllegalStateException("I/O thread not active"));
         }
diff --git a/bundles/org.openhab.binding.upb/src/test/java/org/openhab/binding/upb/internal/SerialIoThreadTest.java b/bundles/org.openhab.binding.upb/src/test/java/org/openhab/binding/upb/internal/SerialIoThreadTest.java
new file mode 100644 (file)
index 0000000..ec09fe8
--- /dev/null
@@ -0,0 +1,192 @@
+/**
+ * Copyright (c) 2010-2021 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.binding.upb.internal;
+
+import static java.nio.charset.StandardCharsets.US_ASCII;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.openhab.binding.upb.internal.handler.MessageListener;
+import org.openhab.binding.upb.internal.handler.SerialIoThread;
+import org.openhab.binding.upb.internal.handler.UPBIoHandler.CmdStatus;
+import org.openhab.binding.upb.internal.message.Command;
+import org.openhab.binding.upb.internal.message.MessageBuilder;
+import org.openhab.binding.upb.internal.message.UPBMessage;
+import org.openhab.core.io.transport.serial.SerialPort;
+import org.openhab.core.thing.ThingUID;
+
+/**
+ * @author Marcus Better - Initial contribution
+ */
+public class SerialIoThreadTest {
+
+    private static final String ENABLE_MESSAGE_MODE_CMD = "\u001770028E\n";
+
+    private final ThingUID thingUID = new ThingUID("a", "b", "c");
+    private final Listener msgListener = new Listener();
+    private final PipedOutputStream in = new PipedOutputStream();
+    private final OutputStreamWriter inbound = new OutputStreamWriter(in, US_ASCII);
+    private final PipedOutputStream out = new PipedOutputStream();
+
+    private @Mock SerialPort serialPort;
+    private SerialIoThread thread;
+    private InputStreamReader outbound;
+    final char[] buf = new char[256];
+
+    @BeforeEach
+    public void setup() throws IOException {
+        serialPort = mock(SerialPort.class);
+        outbound = new InputStreamReader(new PipedInputStream(out), US_ASCII);
+        when(serialPort.getInputStream()).thenReturn(new PipedInputStream(in));
+        when(serialPort.getOutputStream()).thenReturn(out);
+        thread = new SerialIoThread(serialPort, msgListener, thingUID);
+        thread.start();
+    }
+
+    @AfterEach
+    public void cleanup() {
+        thread.terminate();
+    }
+
+    @Test
+    public void testName() {
+        assertEquals("OH-binding-a:b:c-serial-reader", thread.getName());
+        assertTrue(thread.isDaemon());
+    }
+
+    @Test
+    public void receive() throws Exception {
+        writeInbound("PU8905FA011220FFFF47\r");
+        final UPBMessage msg = msgListener.readInbound();
+        assertEquals(Command.ACTIVATE, msg.getCommand());
+        assertEquals(1, msg.getDestination());
+        writeInbound("PU8905FA011221FFFF48\r");
+        final UPBMessage msg2 = msgListener.readInbound();
+        assertEquals(Command.DEACTIVATE, msg2.getCommand());
+        verifyMessageModeCmd();
+    }
+
+    @Test
+    public void send() throws Exception {
+        final String msg = MessageBuilder.forCommand(Command.GOTO).args((byte) 10).network((byte) 2)
+                .destination((byte) 5).build();
+        final CompletionStage<CmdStatus> fut = thread.enqueue(msg);
+        verifyMessageModeCmd();
+        final int n = outbound.read(buf);
+        assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n));
+        ack();
+        final CmdStatus res = fut.toCompletableFuture().join();
+        assertEquals(CmdStatus.ACK, res);
+    }
+
+    @Test
+    public void resend() throws Exception {
+        final String msg = MessageBuilder.forCommand(Command.GOTO).args((byte) 10).network((byte) 2)
+                .destination((byte) 5).build();
+        final CompletableFuture<CmdStatus> fut = thread.enqueue(msg).toCompletableFuture();
+        verifyMessageModeCmd();
+        int n = outbound.read(buf);
+        assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n));
+        nak();
+
+        // should re-send
+        n = outbound.read(buf);
+        assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n));
+        assertFalse(fut.isDone());
+        ack();
+        final CmdStatus res = fut.join();
+        assertEquals(CmdStatus.ACK, res);
+    }
+
+    @Test
+    public void resendMaxAttempts() throws Exception {
+        final String msg = MessageBuilder.forCommand(Command.GOTO).args((byte) 10).network((byte) 2)
+                .destination((byte) 5).build();
+        final CompletableFuture<CmdStatus> fut = thread.enqueue(msg).toCompletableFuture();
+        verifyMessageModeCmd();
+        int n = outbound.read(buf);
+        assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n));
+        nak();
+
+        // retry
+        n = outbound.read(buf);
+        assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n));
+        assertFalse(fut.isDone());
+        // no response - wait for ack timeout
+
+        // last retry
+        n = outbound.read(buf);
+        assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n));
+        assertFalse(fut.isDone());
+        nak();
+        final CmdStatus res = fut.join();
+        assertEquals(CmdStatus.NAK, res);
+    }
+
+    private void ack() throws IOException {
+        writeInbound("PK\r");
+    }
+
+    private void nak() throws IOException {
+        writeInbound("PN\r");
+    }
+
+    private void writeInbound(String s) throws IOException {
+        inbound.write(s);
+        inbound.flush();
+    }
+
+    private void verifyMessageModeCmd() throws IOException {
+        final int n = outbound.read(buf, 0, ENABLE_MESSAGE_MODE_CMD.length());
+        assertEquals(ENABLE_MESSAGE_MODE_CMD, new String(buf, 0, n));
+    }
+
+    private static class Listener implements MessageListener {
+
+        private final BlockingQueue<UPBMessage> messages = new LinkedBlockingQueue<>();
+
+        @Override
+        public void incomingMessage(final UPBMessage msg) {
+            messages.offer(msg);
+        }
+
+        @Override
+        public void onError(final Throwable t) {
+        }
+
+        public UPBMessage readInbound() {
+            try {
+                return messages.take();
+            } catch (InterruptedException e) {
+                return null;
+            }
+        }
+    }
+}