* @return A future that completes normal on success and exceptionally on any errors.
*/
protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
- return availabilityStates.values().parallelStream().map(cChannel -> cChannel.start(connection, scheduler, 0))
- .collect(FutureCollector.allOf());
+ return availabilityStates.values().stream().map(cChannel -> {
+ final CompletableFuture<@Nullable Void> fut = cChannel == null ? CompletableFuture.completedFuture(null)
+ : cChannel.start(connection, scheduler, 0);
+ return fut;
+ }).collect(FutureCollector.allOf());
}
/**
this.connectionRef = new WeakReference<>(connection);
// Subscribe to the wildcard topic and start receive MQTT retained topics
- this.topics.parallelStream().map(t -> connection.subscribe(t, this)).collect(FutureCollector.allOf())
+ this.topics.stream().map(t -> connection.subscribe(t, this)).collect(FutureCollector.allOf())
.thenRun(this::subscribeSuccess).exceptionally(this::subscribeFail);
return discoverFinishedFuture;
if (connection != null && discoverTime > 0) {
this.stopDiscoveryFuture = scheduler.schedule(() -> {
this.stopDiscoveryFuture = null;
- this.topics.parallelStream().forEach(t -> connection.unsubscribe(t, this));
+ this.topics.stream().forEach(t -> connection.unsubscribe(t, this));
this.discoveredListener = null;
discoverFinishedFuture.complete(null);
}, discoverTime, TimeUnit.MILLISECONDS);
this.discoveredListener = null;
final MqttBrokerConnection connection = connectionRef.get();
if (connection != null) {
- this.topics.parallelStream().forEach(t -> connection.unsubscribe(t, this));
+ this.topics.stream().forEach(t -> connection.unsubscribe(t, this));
connectionRef.clear();
}
discoverFinishedFuture.completeExceptionally(e);
*/
public CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
int timeout) {
- return channels.values().parallelStream().map(cChannel -> cChannel.start(connection, scheduler, timeout))
+ return channels.values().stream().map(cChannel -> cChannel.start(connection, scheduler, timeout))
.collect(FutureCollector.allOf());
}
* exceptionally on errors.
*/
public CompletableFuture<@Nullable Void> stop() {
- return channels.values().parallelStream().map(ComponentChannel::stop).collect(FutureCollector.allOf());
+ return channels.values().stream().map(ComponentChannel::stop).collect(FutureCollector.allOf());
}
/**
// Start all known components and channels within the components and put the Thing offline
// if any subscribing failed ( == broker connection lost)
CompletableFuture<@Nullable Void> future = CompletableFuture.allOf(super.start(connection),
- haComponents.values().parallelStream().map(e -> e.start(connection, scheduler, attributeReceiveTimeout))
+ haComponents.values().stream().map(e -> e.start(connection, scheduler, attributeReceiveTimeout))
.reduce(CompletableFuture.completedFuture(null), (a, v) -> a.thenCompose(b -> v)) // reduce to
// one
.exceptionally(e -> {
discoverComponents.stopDiscovery();
delayedProcessing.join();
// haComponents does not need to be synchronised -> the discovery thread is disabled
- haComponents.values().parallelStream().map(AbstractComponent::stop) //
+ haComponents.values().stream().map(AbstractComponent::stop) //
// we need to join all the stops, otherwise they might not be done when start is called
.collect(FutureCollector.allOf()).join();