From 7a5802b09a6498da441d5da45a613a8243b904d6 Mon Sep 17 00:00:00 2001 From: Wouter Born Date: Tue, 6 Feb 2024 17:18:02 +0100 Subject: [PATCH] [network] Improve threading (#16315) * [network] Improve threading * Use timeouts with CompletableFutures * Use seperate executor when waiting for results * Catch exceptions when joining CompletableFutures * Stop previous detection when starting a new one Fixes #16305 Signed-off-by: Wouter Born --- .../network/internal/PresenceDetection.java | 55 +++++++++++++++---- .../internal/PresenceDetectionTest.java | 15 +++-- 2 files changed, 55 insertions(+), 15 deletions(-) diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/PresenceDetection.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/PresenceDetection.java index f6b4cd2b60..04541ebf26 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/PresenceDetection.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/PresenceDetection.java @@ -25,7 +25,9 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -92,6 +94,7 @@ public class PresenceDetection implements IPRequestReceivedCallback { private Set networkInterfaceNames = Set.of(); private @Nullable ScheduledFuture refreshJob; protected @Nullable ExecutorService detectionExecutorService; + protected @Nullable ExecutorService waitForResultExecutorService; private String dhcpState = "off"; int detectionChecks; private String lastReachableNetworkInterfaceName = ""; @@ -295,6 +298,21 @@ public class PresenceDetection implements IPRequestReceivedCallback { } } + private void stopDetection() { + ExecutorService detectionExecutorService = this.detectionExecutorService; + if (detectionExecutorService != null) { + logger.debug("Shutting down detectionExecutorService"); + detectionExecutorService.shutdownNow(); + this.detectionExecutorService = null; + } + ExecutorService waitForResultExecutorService = this.waitForResultExecutorService; + if (waitForResultExecutorService != null) { + logger.debug("Shutting down waitForResultExecutorService"); + waitForResultExecutorService.shutdownNow(); + this.waitForResultExecutorService = null; + } + } + /** * Perform a presence detection with ICMP-, ARP ping and TCP connection attempts simultaneously. * A fixed thread pool will be created with as many threads as necessary to perform all tests at once. @@ -333,50 +351,61 @@ public class PresenceDetection implements IPRequestReceivedCallback { return CompletableFuture.completedFuture(pdv); } + stopDetection(); + ExecutorService detectionExecutorService = getThreadsFor(detectionChecks); this.detectionExecutorService = detectionExecutorService; + ExecutorService waitForResultExecutorService = getThreadsFor(1); + this.waitForResultExecutorService = waitForResultExecutorService; List> completableFutures = new ArrayList<>(); for (Integer tcpPort : tcpPorts) { - completableFutures.add(CompletableFuture.runAsync(() -> { + addAsyncDetection(completableFutures, () -> { Thread.currentThread().setName("presenceDetectionTCP_" + hostname + " " + tcpPort); performServicePing(pdv, tcpPort); - }, detectionExecutorService)); + }, detectionExecutorService); } // ARP ping for IPv4 addresses. Use single executor for Windows tool and // each own executor for each network interface for other tools if (arpPingMethod == ArpPingUtilEnum.ELI_FULKERSON_ARP_PING_FOR_WINDOWS) { - completableFutures.add(CompletableFuture.runAsync(() -> { + addAsyncDetection(completableFutures, () -> { Thread.currentThread().setName("presenceDetectionARP_" + hostname + " "); // arp-ping.exe tool capable of handling multiple interfaces by itself performArpPing(pdv, ""); - }, detectionExecutorService)); + }, detectionExecutorService); } else if (interfaceNames != null) { for (final String interfaceName : interfaceNames) { - completableFutures.add(CompletableFuture.runAsync(() -> { + addAsyncDetection(completableFutures, () -> { Thread.currentThread().setName("presenceDetectionARP_" + hostname + " " + interfaceName); performArpPing(pdv, interfaceName); - }, detectionExecutorService)); + }, detectionExecutorService); } } // ICMP ping if (pingMethod != null) { - completableFutures.add(CompletableFuture.runAsync(() -> { + addAsyncDetection(completableFutures, () -> { Thread.currentThread().setName("presenceDetectionICMP_" + hostname); if (pingMethod == IpPingMethodEnum.JAVA_PING) { performJavaPing(pdv); } else { performSystemPing(pdv); } - }, detectionExecutorService)); + }, detectionExecutorService); } return CompletableFuture.supplyAsync(() -> { + Thread.currentThread().setName("presenceDetectionResult_" + hostname); logger.debug("Waiting for {} detection futures for {} to complete", completableFutures.size(), hostname); - completableFutures.forEach(CompletableFuture::join); + completableFutures.forEach(completableFuture -> { + try { + completableFuture.join(); + } catch (CancellationException | CompletionException e) { + logger.debug("Detection future failed to complete", e); + } + }); logger.debug("All {} detection futures for {} have completed", completableFutures.size(), hostname); if (!pdv.isReachable()) { @@ -392,7 +421,13 @@ public class PresenceDetection implements IPRequestReceivedCallback { detectionChecks = 0; return pdv; - }, scheduledExecutorService); + }, waitForResultExecutorService); + } + + private void addAsyncDetection(List> completableFutures, Runnable detectionRunnable, + ExecutorService executorService) { + completableFutures.add(CompletableFuture.runAsync(detectionRunnable, executorService) + .orTimeout(timeout.plusSeconds(3).toMillis(), TimeUnit.MILLISECONDS)); } /** diff --git a/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/PresenceDetectionTest.java b/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/PresenceDetectionTest.java index bf9d09fc6a..0f770e9157 100644 --- a/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/PresenceDetectionTest.java +++ b/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/PresenceDetectionTest.java @@ -53,6 +53,7 @@ public class PresenceDetectionTest { private @Mock @NonNullByDefault({}) Consumer callback; private @Mock @NonNullByDefault({}) ExecutorService detectionExecutorService; + private @Mock @NonNullByDefault({}) ExecutorService waitForResultExecutorService; private @Mock @NonNullByDefault({}) ScheduledExecutorService scheduledExecutorService; private @Mock @NonNullByDefault({}) PresenceDetectionListener listener; private @Mock @NonNullByDefault({}) NetworkUtils networkUtils; @@ -90,6 +91,8 @@ public class PresenceDetectionTest { doNothing().when(subject).performSystemPing(any()); doNothing().when(subject).performServicePing(any(), anyInt()); + doReturn(waitForResultExecutorService).when(subject).getThreadsFor(1); + subject.getValue(callback -> { }); @@ -99,7 +102,7 @@ public class PresenceDetectionTest { // "Wait" for the presence detection to finish ArgumentCaptor runnableCapture = ArgumentCaptor.forClass(Runnable.class); - verify(scheduledExecutorService, times(1)).execute(runnableCapture.capture()); + verify(waitForResultExecutorService, times(1)).execute(runnableCapture.capture()); runnableCapture.getValue().run(); assertThat(subject.detectionChecks, is(0)); @@ -114,7 +117,8 @@ public class PresenceDetectionTest { anyString(), any(), any()); doReturn(pingResult).when(networkUtils).servicePing(anyString(), anyInt(), any()); - doReturn(detectionExecutorService).when(subject).getThreadsFor(anyInt()); + doReturn(detectionExecutorService).when(subject).getThreadsFor(3); + doReturn(waitForResultExecutorService).when(subject).getThreadsFor(1); subject.performPresenceDetection(); @@ -129,7 +133,7 @@ public class PresenceDetectionTest { // "Wait" for the presence detection to finish ArgumentCaptor runnableCapture = ArgumentCaptor.forClass(Runnable.class); - verify(scheduledExecutorService, times(1)).execute(runnableCapture.capture()); + verify(waitForResultExecutorService, times(1)).execute(runnableCapture.capture()); runnableCapture.getValue().run(); assertThat(subject.detectionChecks, is(0)); @@ -154,7 +158,8 @@ public class PresenceDetectionTest { anyString(), any(), any()); doReturn(pingResult).when(networkUtils).servicePing(anyString(), anyInt(), any()); - doReturn(detectionExecutorService).when(subject).getThreadsFor(anyInt()); + doReturn(detectionExecutorService).when(subject).getThreadsFor(3); + doReturn(waitForResultExecutorService).when(subject).getThreadsFor(1); // We expect no valid value assertTrue(subject.cache.isExpired()); @@ -174,7 +179,7 @@ public class PresenceDetectionTest { // "Wait" for the presence detection to finish capture = ArgumentCaptor.forClass(Runnable.class); - verify(scheduledExecutorService, times(1)).execute(capture.capture()); + verify(waitForResultExecutorService, times(1)).execute(capture.capture()); capture.getValue().run(); // Although there are multiple partial results and a final result, -- 2.47.3