import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
private Set<String> networkInterfaceNames = Set.of();
private @Nullable ScheduledFuture<?> refreshJob;
protected @Nullable ExecutorService detectionExecutorService;
+ protected @Nullable ExecutorService waitForResultExecutorService;
private String dhcpState = "off";
int detectionChecks;
private String lastReachableNetworkInterfaceName = "";
}
}
+ private void stopDetection() {
+ ExecutorService detectionExecutorService = this.detectionExecutorService;
+ if (detectionExecutorService != null) {
+ logger.debug("Shutting down detectionExecutorService");
+ detectionExecutorService.shutdownNow();
+ this.detectionExecutorService = null;
+ }
+ ExecutorService waitForResultExecutorService = this.waitForResultExecutorService;
+ if (waitForResultExecutorService != null) {
+ logger.debug("Shutting down waitForResultExecutorService");
+ waitForResultExecutorService.shutdownNow();
+ this.waitForResultExecutorService = null;
+ }
+ }
+
/**
* Perform a presence detection with ICMP-, ARP ping and TCP connection attempts simultaneously.
* A fixed thread pool will be created with as many threads as necessary to perform all tests at once.
return CompletableFuture.completedFuture(pdv);
}
+ stopDetection();
+
ExecutorService detectionExecutorService = getThreadsFor(detectionChecks);
this.detectionExecutorService = detectionExecutorService;
+ ExecutorService waitForResultExecutorService = getThreadsFor(1);
+ this.waitForResultExecutorService = waitForResultExecutorService;
List<CompletableFuture<Void>> completableFutures = new ArrayList<>();
for (Integer tcpPort : tcpPorts) {
- completableFutures.add(CompletableFuture.runAsync(() -> {
+ addAsyncDetection(completableFutures, () -> {
Thread.currentThread().setName("presenceDetectionTCP_" + hostname + " " + tcpPort);
performServicePing(pdv, tcpPort);
- }, detectionExecutorService));
+ }, detectionExecutorService);
}
// ARP ping for IPv4 addresses. Use single executor for Windows tool and
// each own executor for each network interface for other tools
if (arpPingMethod == ArpPingUtilEnum.ELI_FULKERSON_ARP_PING_FOR_WINDOWS) {
- completableFutures.add(CompletableFuture.runAsync(() -> {
+ addAsyncDetection(completableFutures, () -> {
Thread.currentThread().setName("presenceDetectionARP_" + hostname + " ");
// arp-ping.exe tool capable of handling multiple interfaces by itself
performArpPing(pdv, "");
- }, detectionExecutorService));
+ }, detectionExecutorService);
} else if (interfaceNames != null) {
for (final String interfaceName : interfaceNames) {
- completableFutures.add(CompletableFuture.runAsync(() -> {
+ addAsyncDetection(completableFutures, () -> {
Thread.currentThread().setName("presenceDetectionARP_" + hostname + " " + interfaceName);
performArpPing(pdv, interfaceName);
- }, detectionExecutorService));
+ }, detectionExecutorService);
}
}
// ICMP ping
if (pingMethod != null) {
- completableFutures.add(CompletableFuture.runAsync(() -> {
+ addAsyncDetection(completableFutures, () -> {
Thread.currentThread().setName("presenceDetectionICMP_" + hostname);
if (pingMethod == IpPingMethodEnum.JAVA_PING) {
performJavaPing(pdv);
} else {
performSystemPing(pdv);
}
- }, detectionExecutorService));
+ }, detectionExecutorService);
}
return CompletableFuture.supplyAsync(() -> {
+ Thread.currentThread().setName("presenceDetectionResult_" + hostname);
logger.debug("Waiting for {} detection futures for {} to complete", completableFutures.size(), hostname);
- completableFutures.forEach(CompletableFuture::join);
+ completableFutures.forEach(completableFuture -> {
+ try {
+ completableFuture.join();
+ } catch (CancellationException | CompletionException e) {
+ logger.debug("Detection future failed to complete", e);
+ }
+ });
logger.debug("All {} detection futures for {} have completed", completableFutures.size(), hostname);
if (!pdv.isReachable()) {
detectionChecks = 0;
return pdv;
- }, scheduledExecutorService);
+ }, waitForResultExecutorService);
+ }
+
+ private void addAsyncDetection(List<CompletableFuture<Void>> completableFutures, Runnable detectionRunnable,
+ ExecutorService executorService) {
+ completableFutures.add(CompletableFuture.runAsync(detectionRunnable, executorService)
+ .orTimeout(timeout.plusSeconds(3).toMillis(), TimeUnit.MILLISECONDS));
}
/**
private @Mock @NonNullByDefault({}) Consumer<PresenceDetectionValue> callback;
private @Mock @NonNullByDefault({}) ExecutorService detectionExecutorService;
+ private @Mock @NonNullByDefault({}) ExecutorService waitForResultExecutorService;
private @Mock @NonNullByDefault({}) ScheduledExecutorService scheduledExecutorService;
private @Mock @NonNullByDefault({}) PresenceDetectionListener listener;
private @Mock @NonNullByDefault({}) NetworkUtils networkUtils;
doNothing().when(subject).performSystemPing(any());
doNothing().when(subject).performServicePing(any(), anyInt());
+ doReturn(waitForResultExecutorService).when(subject).getThreadsFor(1);
+
subject.getValue(callback -> {
});
// "Wait" for the presence detection to finish
ArgumentCaptor<Runnable> runnableCapture = ArgumentCaptor.forClass(Runnable.class);
- verify(scheduledExecutorService, times(1)).execute(runnableCapture.capture());
+ verify(waitForResultExecutorService, times(1)).execute(runnableCapture.capture());
runnableCapture.getValue().run();
assertThat(subject.detectionChecks, is(0));
anyString(), any(), any());
doReturn(pingResult).when(networkUtils).servicePing(anyString(), anyInt(), any());
- doReturn(detectionExecutorService).when(subject).getThreadsFor(anyInt());
+ doReturn(detectionExecutorService).when(subject).getThreadsFor(3);
+ doReturn(waitForResultExecutorService).when(subject).getThreadsFor(1);
subject.performPresenceDetection();
// "Wait" for the presence detection to finish
ArgumentCaptor<Runnable> runnableCapture = ArgumentCaptor.forClass(Runnable.class);
- verify(scheduledExecutorService, times(1)).execute(runnableCapture.capture());
+ verify(waitForResultExecutorService, times(1)).execute(runnableCapture.capture());
runnableCapture.getValue().run();
assertThat(subject.detectionChecks, is(0));
anyString(), any(), any());
doReturn(pingResult).when(networkUtils).servicePing(anyString(), anyInt(), any());
- doReturn(detectionExecutorService).when(subject).getThreadsFor(anyInt());
+ doReturn(detectionExecutorService).when(subject).getThreadsFor(3);
+ doReturn(waitForResultExecutorService).when(subject).getThreadsFor(1);
// We expect no valid value
assertTrue(subject.cache.isExpired());
// "Wait" for the presence detection to finish
capture = ArgumentCaptor.forClass(Runnable.class);
- verify(scheduledExecutorService, times(1)).execute(capture.capture());
+ verify(waitForResultExecutorService, times(1)).execute(capture.capture());
capture.getValue().run();
// Although there are multiple partial results and a final result,