}
private void internalStop() {
- logger.debug("Unsubscribed channel {} form topic: {}", this.channelUID, config.stateTopic);
+ logger.debug("Unsubscribed channel {} from topic: {}", this.channelUID, config.stateTopic);
this.connection = null;
this.channelStateUpdateListener = null;
hasSubscribed = false;
}
String valueStr = new String(payload, StandardCharsets.UTF_8);
+ String originalValueStr = valueStr;
// Check if there is a manipulation annotation attached to the field
- final MQTTvalueTransform transform = field.getAnnotation(MQTTvalueTransform.class);
- Object value;
- if (transform != null) {
- // Add a prefix/suffix to the value
- valueStr = transform.prefix() + valueStr + transform.suffix();
- // Split the value if the field is an array. Convert numbers/enums if necessary.
- value = field.getType().isArray() ? valueStr.split(transform.splitCharacter())
- : numberConvert(valueStr, field.getType());
- } else if (field.getType().isArray()) {
- throw new IllegalArgumentException("No split character defined!");
- } else {
- // Convert numbers/enums if necessary
- value = numberConvert(valueStr, field.getType());
+ try {
+ final MQTTvalueTransform transform = field.getAnnotation(MQTTvalueTransform.class);
+ Object value;
+ if (transform != null) {
+ // Add a prefix/suffix to the value
+ valueStr = transform.prefix() + valueStr + transform.suffix();
+ // Split the value if the field is an array. Convert numbers/enums if necessary.
+ value = field.getType().isArray() ? valueStr.split(transform.splitCharacter())
+ : numberConvert(valueStr, field.getType());
+ } else if (field.getType().isArray()) {
+ throw new IllegalArgumentException("No split character defined!");
+ } else {
+ // Convert numbers/enums if necessary
+ value = numberConvert(valueStr, field.getType());
+ }
+ receivedValue = true;
+ changeConsumer.fieldChanged(field, value);
+ future.complete(null);
+ } catch (IllegalArgumentException e) {
+ if (mandatory) {
+ future.completeExceptionally(e);
+ } else {
+ logger.warn("Unable to interpret {} from topic {}", originalValueStr, topic);
+ future.complete(null);
+ }
}
- receivedValue = true;
- changeConsumer.fieldChanged(field, value);
- future.complete(null);
}
void timeoutReached() {
assertThat(future.isDone(), is(true));
}
+
+ @Test
+ public void ignoresInvalidEnum() throws IllegalArgumentException, IllegalAccessException {
+ final Attributes attributes = spy(new Attributes());
+
+ doAnswer(this::createSubscriberAnswer).when(attributes).createSubscriber(any(), any(), anyString(),
+ anyBoolean());
+
+ verify(connection, times(0)).subscribe(anyString(), any());
+
+ // Subscribe now to all fields
+ CompletableFuture<Void> future = attributes.subscribeAndReceive(connection, executor, "homie/device123",
+ fieldChangedObserver, 10);
+ assertThat(future.isDone(), is(true));
+
+ SubscribeFieldToMQTTtopic field = attributes.subscriptions.stream().filter(f -> f.field.getName() == "state")
+ .findFirst().get();
+ field.processMessage(field.topic, "garbage".getBytes());
+ verify(fieldChangedObserver, times(0)).attributeChanged(any(), any(), any(), any(), anyBoolean());
+ assertThat(attributes.state.toString(), is("unknown"));
+ }
}
@Override
public void initialize() {
- logger.debug("About to initialize Homie device {}", device.attributes.name);
config = getConfigAs(HandlerConfiguration.class);
+ logger.debug("About to initialize Homie device {}", config.deviceid);
if (config.deviceid.isEmpty()) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, "Object ID unknown");
return;
@Override
protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
- logger.debug("About to start Homie device {}", device.attributes.name);
+ logger.debug("About to start Homie device {}", config.deviceid);
if (connection.getQos() != 1) {
// QoS 1 is required.
logger.warn(
return device.subscribe(connection, scheduler, attributeReceiveTimeout).thenCompose((Void v) -> {
return device.startChannels(connection, scheduler, attributeReceiveTimeout, this);
}).thenRun(() -> {
- logger.debug("Homie device {} fully attached (start)", device.attributes.name);
+ logger.debug("Homie device {} fully attached (start)", config.deviceid);
});
}
@Override
protected void stop() {
- logger.debug("About to stop Homie device {}", device.attributes.name);
+ logger.debug("About to stop Homie device {}", config.deviceid);
final ScheduledFuture<?> heartBeatTimer = this.heartBeatTimer;
if (heartBeatTimer != null) {
heartBeatTimer.cancel(false);
final MqttBrokerConnection connection = this.connection;
if (connection != null) {
device.startChannels(connection, scheduler, attributeReceiveTimeout, this).thenRun(() -> {
- logger.debug("Homie device {} fully attached (accept)", device.attributes.name);
+ logger.debug("Homie device {} fully attached (accept)", config.deviceid);
});
}
}