import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
+import org.openhab.binding.mqtt.generic.utils.FutureCollector;
import org.openhab.binding.mqtt.generic.values.OnOffValue;
import org.openhab.binding.mqtt.generic.values.Value;
import org.openhab.binding.mqtt.handler.AbstractBrokerHandler;
* @param connection A started broker connection
* @return A future that completes normal on success and exceptionally on any errors.
*/
- protected abstract CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection);
+ protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
+ return availabilityStates.values().parallelStream().map(cChannel -> cChannel.start(connection, scheduler, 0))
+ .collect(FutureCollector.allOf());
+ }
/**
* Called when the MQTT connection disappeared.
// Start all known components and channels within the components and put the Thing offline
// if any subscribing failed ( == broker connection lost)
- CompletableFuture<@Nullable Void> future = haComponents.values().parallelStream()
- .map(e -> e.start(connection, scheduler, attributeReceiveTimeout))
- .reduce(CompletableFuture.completedFuture(null), (a, v) -> a.thenCompose(b -> v)) // reduce to one
- .exceptionally(e -> {
- updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, e.getMessage());
- return null;
- });
+ CompletableFuture<@Nullable Void> future = CompletableFuture.allOf(super.start(connection),
+ haComponents.values().parallelStream().map(e -> e.start(connection, scheduler, attributeReceiveTimeout))
+ .reduce(CompletableFuture.completedFuture(null), (a, v) -> a.thenCompose(b -> v)) // reduce to
+ // one
+ .exceptionally(e -> {
+ updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, e.getMessage());
+ return null;
+ }));
return future
.thenCompose(b -> discoverComponents.startDiscovery(connection, 0, discoveryHomeAssistantIDs, this));