]> git.basschouten.com Git - openhab-addons.git/commitdiff
[mqtt] Avoid parallel streams with common thread pool to avoid deadlocks (#13621)
authorSami Salonen <ssalonen@gmail.com>
Tue, 6 Dec 2022 12:02:45 +0000 (14:02 +0200)
committerGitHub <noreply@github.com>
Tue, 6 Dec 2022 12:02:45 +0000 (13:02 +0100)
To mitigate issue https://github.com/openhab/openhab-core/issues/3125
(common thread pool exhaustion when combining parallel streams with
synchronization or locks)

Signed-off-by: Sami Salonen <ssalonen@gmail.com>
bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/AbstractMQTTThingHandler.java
bundles/org.openhab.binding.mqtt.homeassistant/src/main/java/org/openhab/binding/mqtt/homeassistant/internal/DiscoverComponents.java
bundles/org.openhab.binding.mqtt.homeassistant/src/main/java/org/openhab/binding/mqtt/homeassistant/internal/component/AbstractComponent.java
bundles/org.openhab.binding.mqtt.homeassistant/src/main/java/org/openhab/binding/mqtt/homeassistant/internal/handler/HomeAssistantThingHandler.java

index 9f7cc39be4d9ae59b22f38126e19173cfe8dfd4d..898463ef4f5d312da0b1e48758a2b9cb9181f0bb 100644 (file)
@@ -105,8 +105,11 @@ public abstract class AbstractMQTTThingHandler extends BaseThingHandler
      * @return A future that completes normal on success and exceptionally on any errors.
      */
     protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
-        return availabilityStates.values().parallelStream().map(cChannel -> cChannel.start(connection, scheduler, 0))
-                .collect(FutureCollector.allOf());
+        return availabilityStates.values().stream().map(cChannel -> {
+            final CompletableFuture<@Nullable Void> fut = cChannel == null ? CompletableFuture.completedFuture(null)
+                    : cChannel.start(connection, scheduler, 0);
+            return fut;
+        }).collect(FutureCollector.allOf());
     }
 
     /**
index 1e2077491d3e9889219122ad24744a032df62fe3..3c4bb42d6f7c1a759fca0b73faacd4f2367cdb44 100644 (file)
@@ -149,7 +149,7 @@ public class DiscoverComponents implements MqttMessageSubscriber {
         this.connectionRef = new WeakReference<>(connection);
 
         // Subscribe to the wildcard topic and start receive MQTT retained topics
-        this.topics.parallelStream().map(t -> connection.subscribe(t, this)).collect(FutureCollector.allOf())
+        this.topics.stream().map(t -> connection.subscribe(t, this)).collect(FutureCollector.allOf())
                 .thenRun(this::subscribeSuccess).exceptionally(this::subscribeFail);
 
         return discoverFinishedFuture;
@@ -161,7 +161,7 @@ public class DiscoverComponents implements MqttMessageSubscriber {
         if (connection != null && discoverTime > 0) {
             this.stopDiscoveryFuture = scheduler.schedule(() -> {
                 this.stopDiscoveryFuture = null;
-                this.topics.parallelStream().forEach(t -> connection.unsubscribe(t, this));
+                this.topics.stream().forEach(t -> connection.unsubscribe(t, this));
                 this.discoveredListener = null;
                 discoverFinishedFuture.complete(null);
             }, discoverTime, TimeUnit.MILLISECONDS);
@@ -180,7 +180,7 @@ public class DiscoverComponents implements MqttMessageSubscriber {
         this.discoveredListener = null;
         final MqttBrokerConnection connection = connectionRef.get();
         if (connection != null) {
-            this.topics.parallelStream().forEach(t -> connection.unsubscribe(t, this));
+            this.topics.stream().forEach(t -> connection.unsubscribe(t, this));
             connectionRef.clear();
         }
         discoverFinishedFuture.completeExceptionally(e);
index e5d8963394f6919a5b8f9fc4fb5b48bcf566329c..c286693e91fa3358db91c5b443193d47d94bc374 100644 (file)
@@ -120,7 +120,7 @@ public abstract class AbstractComponent<C extends AbstractChannelConfiguration>
      */
     public CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
             int timeout) {
-        return channels.values().parallelStream().map(cChannel -> cChannel.start(connection, scheduler, timeout))
+        return channels.values().stream().map(cChannel -> cChannel.start(connection, scheduler, timeout))
                 .collect(FutureCollector.allOf());
     }
 
@@ -131,7 +131,7 @@ public abstract class AbstractComponent<C extends AbstractChannelConfiguration>
      *         exceptionally on errors.
      */
     public CompletableFuture<@Nullable Void> stop() {
-        return channels.values().parallelStream().map(ComponentChannel::stop).collect(FutureCollector.allOf());
+        return channels.values().stream().map(ComponentChannel::stop).collect(FutureCollector.allOf());
     }
 
     /**
index 89f560ab2e65a3faf858cb106d8bd7673d20a6ce..820d2b02eade2f917541f4faa0cac35b9e3a9342 100644 (file)
@@ -198,7 +198,7 @@ public class HomeAssistantThingHandler extends AbstractMQTTThingHandler
         // Start all known components and channels within the components and put the Thing offline
         // if any subscribing failed ( == broker connection lost)
         CompletableFuture<@Nullable Void> future = CompletableFuture.allOf(super.start(connection),
-                haComponents.values().parallelStream().map(e -> e.start(connection, scheduler, attributeReceiveTimeout))
+                haComponents.values().stream().map(e -> e.start(connection, scheduler, attributeReceiveTimeout))
                         .reduce(CompletableFuture.completedFuture(null), (a, v) -> a.thenCompose(b -> v)) // reduce to
                                                                                                           // one
                         .exceptionally(e -> {
@@ -216,7 +216,7 @@ public class HomeAssistantThingHandler extends AbstractMQTTThingHandler
             discoverComponents.stopDiscovery();
             delayedProcessing.join();
             // haComponents does not need to be synchronised -> the discovery thread is disabled
-            haComponents.values().parallelStream().map(AbstractComponent::stop) //
+            haComponents.values().stream().map(AbstractComponent::stop) //
                     // we need to join all the stops, otherwise they might not be done when start is called
                     .collect(FutureCollector.allOf()).join();