*/
package org.openhab.binding.mqtt.generic;
-import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
-import org.openhab.binding.mqtt.generic.utils.FutureCollector;
import org.openhab.binding.mqtt.generic.values.OnOffValue;
import org.openhab.binding.mqtt.generic.values.Value;
import org.openhab.binding.mqtt.handler.AbstractBrokerHandler;
// We do not set the thing to ONLINE here in the AbstractBase, that is the responsibility of a derived
// class.
try {
- Collection<CompletableFuture<@Nullable Void>> futures = availabilityStates.values().stream().map(s -> {
- if (s != null) {
- return s.start(connection, scheduler, 0);
- }
- return CompletableFuture.allOf();
- }).collect(Collectors.toList());
-
- futures.add(start(connection));
-
- futures.stream().collect(FutureCollector.allOf()).exceptionally(e -> {
- updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getLocalizedMessage());
- return null;
- }).get(subscribeTimeout, TimeUnit.MILLISECONDS);
+ start(connection).get(subscribeTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException ignored) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
"Did not receive all required topics");
*/
@Override
protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
+ // availability topics are also started asynchronously, so no problem here
+ clearAllAvailabilityTopics();
+ initializeAvailabilityTopicsFromConfig();
return channelStateByChannelUID.values().stream().map(c -> c.start(connection, scheduler, 0))
.collect(FutureCollector.allOf()).thenRun(this::calculateThingStatus);
}
@Override
public void initialize() {
- GenericThingConfiguration config = getConfigAs(GenericThingConfiguration.class);
-
- String availabilityTopic = config.availabilityTopic;
-
- if (availabilityTopic != null) {
- addAvailabilityTopic(availabilityTopic, config.payloadAvailable, config.payloadNotAvailable);
- } else {
- clearAllAvailabilityTopics();
- }
+ initializeAvailabilityTopicsFromConfig();
List<ChannelUID> configErrors = new ArrayList<>();
for (Channel channel : thing.getChannels()) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
}
}
+
+ private void initializeAvailabilityTopicsFromConfig() {
+ GenericThingConfiguration config = getConfigAs(GenericThingConfiguration.class);
+
+ String availabilityTopic = config.availabilityTopic;
+
+ if (availabilityTopic != null) {
+ addAvailabilityTopic(availabilityTopic, config.payloadAvailable, config.payloadNotAvailable);
+ } else {
+ clearAllAvailabilityTopics();
+ }
+ }
}
verify(callback).stateUpdated(eq(textChannelUID), argThat(arg -> "UPDATE".equals(arg.toString())));
assertThat(textValue.getChannelState().toString(), is("UPDATE"));
}
+
+ @Test
+ public void handleBridgeStatusChange() {
+ Configuration config = new Configuration();
+ config.put("availabilityTopic", "test/LWT");
+ when(thing.getConfiguration()).thenReturn(config);
+ thingHandler.initialize();
+ thingHandler
+ .bridgeStatusChanged(new ThingStatusInfo(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE, null));
+ thingHandler.bridgeStatusChanged(new ThingStatusInfo(ThingStatus.ONLINE, ThingStatusDetail.NONE, null));
+ verify(connection, times(2)).subscribe(eq("test/LWT"), any());
+ }
}