]> git.basschouten.com Git - openhab-addons.git/commitdiff
[network] Improve threading (#16315)
authorWouter Born <github@maindrain.net>
Tue, 6 Feb 2024 16:18:02 +0000 (17:18 +0100)
committerGitHub <noreply@github.com>
Tue, 6 Feb 2024 16:18:02 +0000 (17:18 +0100)
* [network] Improve threading

* Use timeouts with CompletableFutures
* Use seperate executor when waiting for results
* Catch exceptions when joining CompletableFutures
* Stop previous detection when starting a new one

Fixes #16305

Signed-off-by: Wouter Born <github@maindrain.net>
bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/PresenceDetection.java
bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/PresenceDetectionTest.java

index f6b4cd2b60e758d35a69472f992d6d4bade0e338..04541ebf26eedb8c5124808b9e5002c6fc381fc1 100644 (file)
@@ -25,7 +25,9 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -92,6 +94,7 @@ public class PresenceDetection implements IPRequestReceivedCallback {
     private Set<String> networkInterfaceNames = Set.of();
     private @Nullable ScheduledFuture<?> refreshJob;
     protected @Nullable ExecutorService detectionExecutorService;
+    protected @Nullable ExecutorService waitForResultExecutorService;
     private String dhcpState = "off";
     int detectionChecks;
     private String lastReachableNetworkInterfaceName = "";
@@ -295,6 +298,21 @@ public class PresenceDetection implements IPRequestReceivedCallback {
         }
     }
 
+    private void stopDetection() {
+        ExecutorService detectionExecutorService = this.detectionExecutorService;
+        if (detectionExecutorService != null) {
+            logger.debug("Shutting down detectionExecutorService");
+            detectionExecutorService.shutdownNow();
+            this.detectionExecutorService = null;
+        }
+        ExecutorService waitForResultExecutorService = this.waitForResultExecutorService;
+        if (waitForResultExecutorService != null) {
+            logger.debug("Shutting down waitForResultExecutorService");
+            waitForResultExecutorService.shutdownNow();
+            this.waitForResultExecutorService = null;
+        }
+    }
+
     /**
      * Perform a presence detection with ICMP-, ARP ping and TCP connection attempts simultaneously.
      * A fixed thread pool will be created with as many threads as necessary to perform all tests at once.
@@ -333,50 +351,61 @@ public class PresenceDetection implements IPRequestReceivedCallback {
             return CompletableFuture.completedFuture(pdv);
         }
 
+        stopDetection();
+
         ExecutorService detectionExecutorService = getThreadsFor(detectionChecks);
         this.detectionExecutorService = detectionExecutorService;
+        ExecutorService waitForResultExecutorService = getThreadsFor(1);
+        this.waitForResultExecutorService = waitForResultExecutorService;
 
         List<CompletableFuture<Void>> completableFutures = new ArrayList<>();
 
         for (Integer tcpPort : tcpPorts) {
-            completableFutures.add(CompletableFuture.runAsync(() -> {
+            addAsyncDetection(completableFutures, () -> {
                 Thread.currentThread().setName("presenceDetectionTCP_" + hostname + " " + tcpPort);
                 performServicePing(pdv, tcpPort);
-            }, detectionExecutorService));
+            }, detectionExecutorService);
         }
 
         // ARP ping for IPv4 addresses. Use single executor for Windows tool and
         // each own executor for each network interface for other tools
         if (arpPingMethod == ArpPingUtilEnum.ELI_FULKERSON_ARP_PING_FOR_WINDOWS) {
-            completableFutures.add(CompletableFuture.runAsync(() -> {
+            addAsyncDetection(completableFutures, () -> {
                 Thread.currentThread().setName("presenceDetectionARP_" + hostname + " ");
                 // arp-ping.exe tool capable of handling multiple interfaces by itself
                 performArpPing(pdv, "");
-            }, detectionExecutorService));
+            }, detectionExecutorService);
         } else if (interfaceNames != null) {
             for (final String interfaceName : interfaceNames) {
-                completableFutures.add(CompletableFuture.runAsync(() -> {
+                addAsyncDetection(completableFutures, () -> {
                     Thread.currentThread().setName("presenceDetectionARP_" + hostname + " " + interfaceName);
                     performArpPing(pdv, interfaceName);
-                }, detectionExecutorService));
+                }, detectionExecutorService);
             }
         }
 
         // ICMP ping
         if (pingMethod != null) {
-            completableFutures.add(CompletableFuture.runAsync(() -> {
+            addAsyncDetection(completableFutures, () -> {
                 Thread.currentThread().setName("presenceDetectionICMP_" + hostname);
                 if (pingMethod == IpPingMethodEnum.JAVA_PING) {
                     performJavaPing(pdv);
                 } else {
                     performSystemPing(pdv);
                 }
-            }, detectionExecutorService));
+            }, detectionExecutorService);
         }
 
         return CompletableFuture.supplyAsync(() -> {
+            Thread.currentThread().setName("presenceDetectionResult_" + hostname);
             logger.debug("Waiting for {} detection futures for {} to complete", completableFutures.size(), hostname);
-            completableFutures.forEach(CompletableFuture::join);
+            completableFutures.forEach(completableFuture -> {
+                try {
+                    completableFuture.join();
+                } catch (CancellationException | CompletionException e) {
+                    logger.debug("Detection future failed to complete", e);
+                }
+            });
             logger.debug("All {} detection futures for {} have completed", completableFutures.size(), hostname);
 
             if (!pdv.isReachable()) {
@@ -392,7 +421,13 @@ public class PresenceDetection implements IPRequestReceivedCallback {
             detectionChecks = 0;
 
             return pdv;
-        }, scheduledExecutorService);
+        }, waitForResultExecutorService);
+    }
+
+    private void addAsyncDetection(List<CompletableFuture<Void>> completableFutures, Runnable detectionRunnable,
+            ExecutorService executorService) {
+        completableFutures.add(CompletableFuture.runAsync(detectionRunnable, executorService)
+                .orTimeout(timeout.plusSeconds(3).toMillis(), TimeUnit.MILLISECONDS));
     }
 
     /**
index bf9d09fc6a8e765a088ce32ce3461ebd29dbde64..0f770e9157a218be30508665c3c01ae01c6da483 100644 (file)
@@ -53,6 +53,7 @@ public class PresenceDetectionTest {
 
     private @Mock @NonNullByDefault({}) Consumer<PresenceDetectionValue> callback;
     private @Mock @NonNullByDefault({}) ExecutorService detectionExecutorService;
+    private @Mock @NonNullByDefault({}) ExecutorService waitForResultExecutorService;
     private @Mock @NonNullByDefault({}) ScheduledExecutorService scheduledExecutorService;
     private @Mock @NonNullByDefault({}) PresenceDetectionListener listener;
     private @Mock @NonNullByDefault({}) NetworkUtils networkUtils;
@@ -90,6 +91,8 @@ public class PresenceDetectionTest {
         doNothing().when(subject).performSystemPing(any());
         doNothing().when(subject).performServicePing(any(), anyInt());
 
+        doReturn(waitForResultExecutorService).when(subject).getThreadsFor(1);
+
         subject.getValue(callback -> {
         });
 
@@ -99,7 +102,7 @@ public class PresenceDetectionTest {
 
         // "Wait" for the presence detection to finish
         ArgumentCaptor<Runnable> runnableCapture = ArgumentCaptor.forClass(Runnable.class);
-        verify(scheduledExecutorService, times(1)).execute(runnableCapture.capture());
+        verify(waitForResultExecutorService, times(1)).execute(runnableCapture.capture());
         runnableCapture.getValue().run();
 
         assertThat(subject.detectionChecks, is(0));
@@ -114,7 +117,8 @@ public class PresenceDetectionTest {
                 anyString(), any(), any());
         doReturn(pingResult).when(networkUtils).servicePing(anyString(), anyInt(), any());
 
-        doReturn(detectionExecutorService).when(subject).getThreadsFor(anyInt());
+        doReturn(detectionExecutorService).when(subject).getThreadsFor(3);
+        doReturn(waitForResultExecutorService).when(subject).getThreadsFor(1);
 
         subject.performPresenceDetection();
 
@@ -129,7 +133,7 @@ public class PresenceDetectionTest {
 
         // "Wait" for the presence detection to finish
         ArgumentCaptor<Runnable> runnableCapture = ArgumentCaptor.forClass(Runnable.class);
-        verify(scheduledExecutorService, times(1)).execute(runnableCapture.capture());
+        verify(waitForResultExecutorService, times(1)).execute(runnableCapture.capture());
         runnableCapture.getValue().run();
 
         assertThat(subject.detectionChecks, is(0));
@@ -154,7 +158,8 @@ public class PresenceDetectionTest {
                 anyString(), any(), any());
         doReturn(pingResult).when(networkUtils).servicePing(anyString(), anyInt(), any());
 
-        doReturn(detectionExecutorService).when(subject).getThreadsFor(anyInt());
+        doReturn(detectionExecutorService).when(subject).getThreadsFor(3);
+        doReturn(waitForResultExecutorService).when(subject).getThreadsFor(1);
 
         // We expect no valid value
         assertTrue(subject.cache.isExpired());
@@ -174,7 +179,7 @@ public class PresenceDetectionTest {
 
         // "Wait" for the presence detection to finish
         capture = ArgumentCaptor.forClass(Runnable.class);
-        verify(scheduledExecutorService, times(1)).execute(capture.capture());
+        verify(waitForResultExecutorService, times(1)).execute(capture.capture());
         capture.getValue().run();
 
         // Although there are multiple partial results and a final result,