2 * Copyright (c) 2010-2021 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.webthing.internal;
15 import java.io.IOException;
17 import java.time.Duration;
18 import java.time.Instant;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ScheduledFuture;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import java.util.concurrent.atomic.AtomicReference;
26 import org.eclipse.jdt.annotation.NonNullByDefault;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.eclipse.jetty.client.HttpClient;
29 import org.eclipse.jetty.websocket.client.WebSocketClient;
30 import org.openhab.binding.webthing.internal.channel.Channels;
31 import org.openhab.binding.webthing.internal.client.*;
32 import org.openhab.binding.webthing.internal.link.ChannelToPropertyLink;
33 import org.openhab.binding.webthing.internal.link.PropertyToChannelLink;
34 import org.openhab.binding.webthing.internal.link.UnknownPropertyException;
35 import org.openhab.core.thing.*;
36 import org.openhab.core.thing.binding.BaseThingHandler;
37 import org.openhab.core.types.Command;
38 import org.openhab.core.types.RefreshType;
39 import org.openhab.core.types.State;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
44 * The {@link WebThingHandler} is responsible for handling commands, which are
45 * sent to one of the channels.
47 * @author Gregor Roth - Initial contribution
50 public class WebThingHandler extends BaseThingHandler implements ChannelHandler {
51 private static final Duration RECONNECT_PERIOD = Duration.ofHours(23);
52 private static final Duration HEALTH_CHECK_PERIOD = Duration.ofSeconds(70);
53 private static final ItemChangedListener EMPTY_ITEM_CHANGED_LISTENER = (channelUID, stateCommand) -> {
56 private final Logger logger = LoggerFactory.getLogger(WebThingHandler.class);
57 private final HttpClient httpClient;
58 private final WebSocketClient webSocketClient;
59 private final AtomicBoolean isActivated = new AtomicBoolean(true);
60 private final Map<ChannelUID, ItemChangedListener> itemChangedListenerMap = new ConcurrentHashMap<>();
61 private final AtomicReference<Optional<ConsumedThing>> webThingConnectionRef = new AtomicReference<>(
63 private final AtomicReference<Instant> lastReconnect = new AtomicReference<>(Instant.now());
64 private final AtomicReference<Optional<ScheduledFuture<?>>> watchdogHandle = new AtomicReference<>(
66 private @Nullable URI webThingURI = null;
68 public WebThingHandler(Thing thing, HttpClient httpClient, WebSocketClient webSocketClient) {
70 this.httpClient = httpClient;
71 this.webSocketClient = webSocketClient;
74 private boolean isOnline() {
75 return getThing().getStatus() == ThingStatus.ONLINE;
78 private boolean isDisconnected() {
79 return (getThing().getStatus() == ThingStatus.OFFLINE) || (getThing().getStatus() == ThingStatus.UNKNOWN);
82 private boolean isAlive() {
83 return webThingConnectionRef.get().map(ConsumedThing::isAlive).orElse(false);
87 public void initialize() {
88 updateStatus(ThingStatus.UNKNOWN);
89 isActivated.set(true); // set with true, even though the connect may fail. In this case retries will be
92 // perform connect in background
93 scheduler.execute(() -> {
94 // WebThing URI present?
95 var uri = toUri(getConfigAs(WebThingConfiguration.class).webThingURI);
97 logger.debug("try to connect WebThing {}", uri);
98 var connected = tryReconnect(uri);
100 logger.debug("WebThing {} connected", getWebThingLabel());
103 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
104 "webThing uri has not been set");
105 logger.warn("could not initialize WebThing. URI is not set or invalid. {}", this.webThingURI);
109 // starting watchdog that checks the healthiness of the WebThing connection, periodically
111 .getAndSet(Optional.of(scheduler.scheduleWithFixedDelay(this::checkWebThingConnection,
112 HEALTH_CHECK_PERIOD.getSeconds(), HEALTH_CHECK_PERIOD.getSeconds(), TimeUnit.SECONDS)))
113 .ifPresent(future -> future.cancel(true));
116 private @Nullable URI toUri(@Nullable String uri) {
119 return URI.create(uri);
121 } catch (IllegalArgumentException illegalURIException) {
128 public void dispose() {
130 isActivated.set(false); // set to false to avoid reconnecting
132 // terminate WebThing connection as well as the alive watchdog
133 webThingConnectionRef.getAndSet(Optional.empty()).ifPresent(ConsumedThing::close);
134 watchdogHandle.getAndSet(Optional.empty()).ifPresent(future -> future.cancel(true));
140 private boolean tryReconnect(@Nullable URI uri) {
141 if (isActivated.get()) { // will try reconnect only, if activated
143 // create the client-side WebThing representation
145 var webThing = ConsumedThingFactory.instance().create(webSocketClient, httpClient, uri, scheduler,
147 this.webThingConnectionRef.getAndSet(Optional.of(webThing)).ifPresent(ConsumedThing::close);
149 // update the Thing structure based on the WebThing description
150 thingStructureChanged(webThing);
152 // link the Thing's channels with the WebThing properties to forward properties/item updates
153 establishWebThingChannelLinks(webThing);
155 lastReconnect.set(Instant.now());
156 updateStatus(ThingStatus.ONLINE);
159 } catch (IOException e) {
160 var msg = e.getMessage();
170 public void onError(String reason) {
171 var wasConnectedBefore = isOnline();
172 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, reason);
174 // close the WebThing connection. If the handler is still active, the WebThing connection
175 // will be re-established within the periodically watchdog task
176 webThingConnectionRef.getAndSet(Optional.empty()).ifPresent(ConsumedThing::close);
178 if (wasConnectedBefore) { // to reduce log messages, just log in case of connection state changed
179 logger.debug("WebThing {} disconnected {}. Try reconnect (each {} sec)", getWebThingLabel(), reason,
180 HEALTH_CHECK_PERIOD.getSeconds());
182 logger.debug("WebThing {} is offline {}. Try reconnect (each {} sec)", getWebThingLabel(), reason,
183 HEALTH_CHECK_PERIOD.getSeconds());
187 private String getWebThingLabel() {
188 if (getThing().getLabel() == null) {
189 return "" + webThingURI;
191 return "'" + getThing().getLabel() + "' (" + webThingURI + ")";
196 * updates the thing structure. Refer https://www.openhab.org/docs/developer/bindings/#updating-the-thing-structure
198 * @param webThing the WebThing that is used for the new structure
200 private void thingStructureChanged(ConsumedThing webThing) {
201 var thingBuilder = editThing().withLabel(webThing.getThingDescription().title);
203 // create a channel for each WebThing property
204 for (var entry : webThing.getThingDescription().properties.entrySet()) {
205 var channel = Channels.createChannel(thing.getUID(), entry.getKey(), entry.getValue());
206 // add channel (and remove a previous one, if exist)
207 thingBuilder.withoutChannel(channel.getUID()).withChannel(channel);
209 var thing = thingBuilder.build();
211 // and update the thing
216 * connects each WebThing property with a corresponding openHAB channel. After this changes will be synchronized
217 * between a WebThing property and the openHAB channel
219 * @param webThing the WebThing to be connected
220 * @throws IOException if the channels can not be connected
222 private void establishWebThingChannelLinks(ConsumedThing webThing) throws IOException {
223 // remove all registered listeners
224 itemChangedListenerMap.clear();
226 // create new links (listeners will be registered, implicitly)
227 for (var namePropertyPair : webThing.getThingDescription().properties.entrySet()) {
229 // determine the name of the associated channel
230 var channelUID = Channels.createChannelUID(getThing().getUID(), namePropertyPair.getKey());
232 // will try to establish a link, if channel is present
233 var channel = getThing().getChannel(channelUID);
234 if (channel != null) {
235 // establish downstream link
236 PropertyToChannelLink.establish(webThing, namePropertyPair.getKey(), this, channel);
238 // establish upstream link
239 if (!namePropertyPair.getValue().readOnly) {
240 ChannelToPropertyLink.establish(this, channel, webThing, namePropertyPair.getKey());
243 } catch (UnknownPropertyException upe) {
244 logger.warn("WebThing {} property {} could not be linked with a channel", getWebThingLabel(),
245 namePropertyPair.getKey(), upe);
251 public void handleCommand(ChannelUID channelUID, Command command) {
252 if (command instanceof State) {
253 itemChangedListenerMap.getOrDefault(channelUID, EMPTY_ITEM_CHANGED_LISTENER).onItemStateChanged(channelUID,
255 } else if (command instanceof RefreshType) {
256 tryReconnect(webThingURI);
261 // ChannelHandler methods
263 public void observeChannel(ChannelUID channelUID, ItemChangedListener listener) {
264 itemChangedListenerMap.put(channelUID, listener);
268 public void updateItemState(ChannelUID channelUID, Command command) {
269 if (isActivated.get()) {
270 postCommand(channelUID, command);
276 private void checkWebThingConnection() {
277 // try reconnect, if necessary
278 if (isDisconnected() || (isOnline() && !isAlive())) {
279 logger.debug("try reconnecting WebThing {}", getWebThingLabel());
280 if (tryReconnect(webThingURI)) {
281 logger.debug("WebThing {} reconnected", getWebThingLabel());
285 // force reconnecting periodically, to fix erroneous states that occurs for unknown reasons
286 var elapsedSinceLastReconnect = Duration.between(lastReconnect.get(), Instant.now());
287 if (isOnline() && (elapsedSinceLastReconnect.getSeconds() > RECONNECT_PERIOD.getSeconds())) {
288 if (tryReconnect(webThingURI)) {
289 logger.debug("WebThing {} reconnected. Initiated by periodic reconnect", getWebThingLabel());
291 logger.debug("could not reconnect WebThing {} (periodic reconnect failed). Next trial in {} sec",
292 getWebThingLabel(), HEALTH_CHECK_PERIOD.getSeconds());