*/
package org.openhab.binding.deconz.internal.handler;
-import static org.openhab.binding.deconz.internal.Util.buildUrl;
-
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.deconz.internal.dto.DeconzBaseMessage;
-import org.openhab.binding.deconz.internal.netutils.AsyncHttpClient;
import org.openhab.binding.deconz.internal.netutils.WebSocketConnection;
import org.openhab.binding.deconz.internal.netutils.WebSocketMessageListener;
import org.openhab.binding.deconz.internal.types.ResourceType;
private final Logger logger = LoggerFactory.getLogger(DeconzBaseThingHandler.class);
protected final ResourceType resourceType;
protected ThingConfig config = new ThingConfig();
- protected DeconzBridgeConfig bridgeConfig = new DeconzBridgeConfig();
protected final Gson gson;
private @Nullable ScheduledFuture<?> initializationJob;
protected @Nullable WebSocketConnection connection;
- protected @Nullable AsyncHttpClient http;
public DeconzBaseThingHandler(Thing thing, Gson gson, ResourceType resourceType) {
super(thing);
final WebSocketConnection webSocketConnection = bridgeHandler.getWebsocketConnection();
this.connection = webSocketConnection;
- this.http = bridgeHandler.getHttp();
- this.bridgeConfig = bridgeHandler.getBridgeConfig();
updateStatus(ThingStatus.UNKNOWN, ThingStatusDetail.NONE);
*/
protected void sendCommand(@Nullable Object object, Command originalCommand, ChannelUID channelUID,
String commandUrl, @Nullable Runnable acceptProcessing) {
- AsyncHttpClient asyncHttpClient = http;
- if (asyncHttpClient == null) {
+ DeconzBridgeHandler bridgeHandler = getBridgeHandler();
+ if (bridgeHandler == null) {
return;
}
- String url = buildUrl(bridgeConfig.host, bridgeConfig.httpPort, bridgeConfig.apikey,
- resourceType.getIdentifier(), config.id, commandUrl);
-
- String json = object == null ? null : gson.toJson(object);
- logger.trace("Sending {} to {} {} via {}", json, resourceType, config.id, url);
+ String endpoint = Stream.of(resourceType.getIdentifier(), config.id, commandUrl)
+ .collect(Collectors.joining("/"));
- asyncHttpClient.put(url, json, bridgeConfig.timeout).thenAccept(v -> {
+ bridgeHandler.sendObject(endpoint, object).thenAccept(v -> {
if (acceptProcessing != null) {
acceptProcessing.run();
}
private int websocketPort = 0;
/** Prevent a dispose/init cycle while this flag is set. Use for property updates */
private boolean ignoreConfigurationUpdate;
- private boolean websocketReconnect = false;
+ private boolean thingDisposing = false;
private final ExpiringCacheAsync<Optional<BridgeFullState>> fullStateCache = new ExpiringCacheAsync<>(1000);
@Override
public Collection<Class<? extends ThingHandlerService>> getServices() {
- return Collections.singleton(ThingDiscoveryService.class);
+ return Set.of(ThingDiscoveryService.class);
}
@Override
* @param r The response
*/
private void parseAPIKeyResponse(AsyncHttpClient.Result r) {
+ if (thingDisposing) {
+ // discard response if thing handler is already disposing
+ return;
+ }
if (r.getResponseCode() == 403) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_PENDING,
"Allow authentication for 3rd party apps. Trying again in " + POLL_FREQUENCY_SEC + " seconds");
stopTimer();
- scheduledFuture = scheduler.schedule(() -> requestApiKey(), POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
+ scheduledFuture = scheduler.schedule(this::requestApiKey, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
} else if (r.getResponseCode() == 200) {
ApiKeyMessage[] response = Objects.requireNonNull(gson.fromJson(r.getBody(), ApiKeyMessage[].class));
if (response.length == 0) {
*/
private CompletableFuture<Optional<BridgeFullState>> refreshFullStateCache() {
logger.trace("{} starts refreshing the fullStateCache", thing.getUID());
- if (config.apikey == null) {
+ if (config.apikey == null || thingDisposing) {
return CompletableFuture.completedFuture(Optional.empty());
}
String url = buildUrl(config.getHostWithoutPort(), config.httpPort, config.apikey);
*/
public void initializeBridgeState() {
getBridgeFullState().thenAccept(fullState -> fullState.ifPresentOrElse(state -> {
+ if (thingDisposing) {
+ // discard response if thing handler is already disposing
+ return;
+ }
if (state.config.name.isEmpty()) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE,
"You are connected to a HUE bridge, not a deCONZ software!");
// Use requested websocket port if no specific port is given
websocketPort = config.port == 0 ? state.config.websocketport : config.port;
- websocketReconnect = true;
startWebsocket();
}, () -> {
// initial response was empty, re-trying in POLL_FREQUENCY_SEC seconds
- scheduledFuture = scheduler.schedule(this::initializeBridgeState, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
+ if (!thingDisposing) {
+ scheduledFuture = scheduler.schedule(this::initializeBridgeState, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
+ }
})).exceptionally(e -> {
if (e != null) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE, e.getMessage());
* {@link #initializeBridgeState} need to be called first to obtain the websocket port.
*/
private void startWebsocket() {
- if (websocket.isConnected() || websocketPort == 0 || websocketReconnect == false) {
+ if (websocket.isConnected() || websocketPort == 0 || thingDisposing) {
return;
}
* Perform a request to the REST API for generating an API key.
*
*/
- private CompletableFuture<?> requestApiKey() {
+ private void requestApiKey() {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_PENDING, "Requesting API Key");
stopTimer();
String url = buildUrl(config.getHostWithoutPort(), config.httpPort);
- return http.post(url, "{\"devicetype\":\"openHAB\"}", config.timeout).thenAccept(this::parseAPIKeyResponse)
+ http.post(url, "{\"devicetype\":\"openHAB\"}", config.timeout).thenAccept(this::parseAPIKeyResponse)
.exceptionally(e -> {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getMessage());
logger.warn("Authorisation failed", e);
@Override
public void initialize() {
- logger.debug("Start initializing!");
+ logger.debug("Start initializing bridge {}", thing.getUID());
+ thingDisposing = false;
config = getConfigAs(DeconzBridgeConfig.class);
if (config.apikey == null) {
requestApiKey();
@Override
public void dispose() {
- websocketReconnect = false;
+ thingDisposing = true;
stopTimer();
websocket.close();
}
- @Override
- public void connectionError(@Nullable Throwable e) {
- if (e != null) {
- updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getMessage());
- } else {
- updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, "Unknown reason");
- }
- stopTimer();
- // Wait for POLL_FREQUENCY_SEC after a connection error before trying again
- scheduledFuture = scheduler.schedule(this::startWebsocket, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
- }
-
@Override
public void connectionEstablished() {
stopTimer();
@Override
public void connectionLost(String reason) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, reason);
- startWebsocket();
+
+ stopTimer();
+ // Wait for POLL_FREQUENCY_SEC after a connection was closed before trying again
+ scheduledFuture = scheduler.schedule(this::startWebsocket, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
}
/**
}
/**
- * Return the http connection.
+ * Send an object to the gateway
+ *
+ * @param endPoint the endpoint (e.g. "lights/2/state")
+ * @param object the object (or null if no object)
+ * @return CompletableFuture of the result
*/
- public AsyncHttpClient getHttp() {
- return http;
- }
+ public CompletableFuture<AsyncHttpClient.Result> sendObject(String endPoint, @Nullable Object object) {
+ String json = object == null ? null : gson.toJson(object);
+ String url = buildUrl(config.host, config.httpPort, config.apikey, endPoint);
+ logger.trace("Sending {} via {}", json, url);
- /**
- * Return the bridge configuration.
- */
- public DeconzBridgeConfig getBridgeConfig() {
- return config;
+ return http.put(url, json, config.timeout);
}
}
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jdt.annotation.NonNullByDefault;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
@WebSocket
@NonNullByDefault
public class WebSocketConnection {
+ private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
private final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
private final WebSocketClient client;
private final Gson gson;
private final WebSocketConnectionListener connectionListener;
- private final Map<Map.Entry<ResourceType, String>, WebSocketMessageListener> listeners = new ConcurrentHashMap<>();
+ private final Map<String, WebSocketMessageListener> listeners = new ConcurrentHashMap<>();
private ConnectionState connectionState = ConnectionState.DISCONNECTED;
+ private @Nullable Session session;
public WebSocketConnection(WebSocketConnectionListener listener, WebSocketClient client, Gson gson) {
this.connectionListener = listener;
this.client = client;
this.client.setMaxIdleTimeout(0);
this.gson = gson;
- this.socketName = ((QueuedThreadPool) client.getExecutor()).getName() + "$" + this.hashCode();
+ this.socketName = "Websocket$" + System.currentTimeMillis() + "-" + INSTANCE_COUNTER.incrementAndGet();
}
public void start(String ip) {
} else if (connectionState == ConnectionState.CONNECTING) {
logger.debug("{} already connecting", socketName);
return;
+ } else if (connectionState == ConnectionState.DISCONNECTING) {
+ logger.warn("{} trying to re-connect while still disconnecting", socketName);
}
try {
URI destUri = URI.create("ws://" + ip);
logger.debug("Trying to connect {} to {}", socketName, destUri);
client.connect(this, destUri).get();
} catch (Exception e) {
- connectionListener.connectionError(e);
+ connectionListener.connectionLost("Error while connecting: " + e.getMessage());
}
}
}
public void registerListener(ResourceType resourceType, String sensorID, WebSocketMessageListener listener) {
- listeners.put(Map.entry(resourceType, sensorID), listener);
+ listeners.put(getListenerId(resourceType, sensorID), listener);
}
public void unregisterListener(ResourceType resourceType, String sensorID) {
- listeners.remove(Map.entry(resourceType, sensorID));
+ listeners.remove(getListenerId(resourceType, sensorID));
}
@SuppressWarnings("unused")
@OnWebSocketConnect
public void onConnect(Session session) {
connectionState = ConnectionState.CONNECTED;
- logger.debug("{} successfully connected to {}", socketName, session.getRemoteAddress().getAddress());
+ logger.debug("{} successfully connected to {}: {}", socketName, session.getRemoteAddress().getAddress(),
+ session.hashCode());
connectionListener.connectionEstablished();
+ this.session = session;
}
@SuppressWarnings({ "null", "unused" })
@OnWebSocketMessage
- public void onMessage(String message) {
- logger.trace("Raw data received by websocket {}: {}", socketName, message);
-
- DeconzBaseMessage changedMessage = Objects.requireNonNull(gson.fromJson(message, DeconzBaseMessage.class));
- if (changedMessage.r == ResourceType.UNKNOWN) {
- logger.trace("Received message has unknown resource type. Skipping message.");
+ public void onMessage(Session session, String message) {
+ if (!session.equals(this.session)) {
+ handleWrongSession(session, message);
return;
}
+ logger.trace("{} received raw data: {}", socketName, message);
- WebSocketMessageListener listener = listeners.get(Map.entry(changedMessage.r, changedMessage.id));
- if (listener == null) {
- logger.debug(
- "Couldn't find listener for id {} with resource type {}. Either no thing for this id has been defined or this is a bug.",
- changedMessage.id, changedMessage.r);
- return;
+ try {
+ DeconzBaseMessage changedMessage = Objects.requireNonNull(gson.fromJson(message, DeconzBaseMessage.class));
+ if (changedMessage.r == ResourceType.UNKNOWN) {
+ logger.trace("Received message has unknown resource type. Skipping message.");
+ return;
+ }
+
+ WebSocketMessageListener listener = listeners.get(getListenerId(changedMessage.r, changedMessage.id));
+ if (listener == null) {
+ logger.debug(
+ "Couldn't find listener for id {} with resource type {}. Either no thing for this id has been defined or this is a bug.",
+ changedMessage.id, changedMessage.r);
+ return;
+ }
+
+ Class<? extends DeconzBaseMessage> expectedMessageType = changedMessage.r.getExpectedMessageType();
+ if (expectedMessageType == null) {
+ logger.warn(
+ "BUG! Could not get expected message type for resource type {}. Please report this incident.",
+ changedMessage.r);
+ return;
+ }
+
+ DeconzBaseMessage deconzMessage = gson.fromJson(message, expectedMessageType);
+ if (deconzMessage != null) {
+ listener.messageReceived(changedMessage.id, deconzMessage);
+
+ }
+ } catch (RuntimeException e) {
+ // we need to catch all processing exceptions, otherwise they could affect the connection
+ logger.warn("{} encountered an error while processing the message {}: {}", socketName, message,
+ e.getMessage());
}
+ }
- Class<? extends DeconzBaseMessage> expectedMessageType = changedMessage.r.getExpectedMessageType();
- if (expectedMessageType == null) {
- logger.warn("BUG! Could not get expected message type for resource type {}. Please report this incident.",
- changedMessage.r);
+ @SuppressWarnings("unused")
+ @OnWebSocketError
+ public void onError(Session session, Throwable cause) {
+ if (!session.equals(this.session)) {
+ handleWrongSession(session, "Connection error: " + cause.getMessage());
return;
}
+ logger.warn("{} connection errored, closing: {}", socketName, cause.getMessage());
- DeconzBaseMessage deconzMessage = gson.fromJson(message, expectedMessageType);
- if (deconzMessage != null) {
- listener.messageReceived(changedMessage.id, deconzMessage);
+ Session storedSession = this.session;
+ if (storedSession != null && storedSession.isOpen()) {
+ storedSession.close(-1, "Processing error");
}
}
- @SuppressWarnings("unused")
- @OnWebSocketError
- public void onError(Throwable cause) {
- connectionState = ConnectionState.DISCONNECTED;
- connectionListener.connectionError(cause);
- }
-
@SuppressWarnings("unused")
@OnWebSocketClose
- public void onClose(int statusCode, String reason) {
+ public void onClose(Session session, int statusCode, String reason) {
+ if (!session.equals(this.session)) {
+ handleWrongSession(session, "Connection closed: " + statusCode + " / " + reason);
+ return;
+ }
+ logger.trace("{} closed connection: {} / {}", socketName, statusCode, reason);
connectionState = ConnectionState.DISCONNECTED;
+ this.session = null;
connectionListener.connectionLost(reason);
}
+ private void handleWrongSession(Session session, String message) {
+ logger.warn("{}/{} received and discarded message for other session {}: {}.", socketName, session.hashCode(),
+ session.hashCode(), message);
+ if (session.isOpen()) {
+ // Close the session if it is still open. It should already be closed anyway
+ session.close();
+ }
+ }
+
/**
* check connection state (successfully connected)
*
return connectionState == ConnectionState.CONNECTED;
}
+ /**
+ * create a unique identifier for a listener
+ *
+ * @param resourceType the listener resource-type (LIGHT, SENSOR, ...)
+ * @param id the listener id (same as deconz-id)
+ * @return a unique string for this listener
+ */
+ private String getListenerId(ResourceType resourceType, String id) {
+ return resourceType.name() + "$" + id;
+ }
+
/**
* used internally to represent the connection state
*/
*/
@NonNullByDefault
public interface WebSocketConnectionListener {
- /**
- * An error occurred during connection or while connecting.
- *
- * @param e The error
- */
- void connectionError(Throwable e);
-
/**
* Connection successfully established.
*/