@Override
public void dispose() {
logger.debug("Disposing remote openHAB handler for bridge {}", getThing().getUID());
- stopStreamingUpdates();
+ stopStreamingUpdates(false);
stopCheckConnectionJob();
channelsLastStates.clear();
}
if (localCheckConnectionJob == null || localCheckConnectionJob.isCancelled()) {
checkConnectionJob = scheduler.scheduleWithFixedDelay(() -> {
long millisSinceLastEvent = System.currentTimeMillis() - restClient.getLastEventTimestamp();
- if (aliveInterval == 0 || restClient.getLastEventTimestamp() == 0) {
+ if (getThing().getStatus() != ThingStatus.ONLINE || aliveInterval == 0
+ || restClient.getLastEventTimestamp() == 0) {
logger.debug("Time to check server accessibility");
checkConnection();
} else if (millisSinceLastEvent > (aliveInterval * 60000)) {
}
private void stopStreamingUpdates() {
+ stopStreamingUpdates(true);
+ }
+
+ private void stopStreamingUpdates(boolean waitingForCompletion) {
synchronized (restClient) {
- restClient.stop();
+ restClient.stop(waitingForCompletion);
restClient.removeStreamingDataListener(this);
restClient.removeItemsDataListener(this);
}
updateStatus(ThingStatus.ONLINE);
}
+ @Override
+ public void onDisconnected() {
+ updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, "Disconected from the remote server");
+ }
+
@Override
public void onError(String message) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, message);
private String accessToken;
private boolean trustedCertificate;
private boolean connected;
+ private boolean completed;
private @Nullable SseEventSource eventSource;
private long lastEventTimestamp;
}
}
- public void stop() {
+ public void stop(boolean waitingForCompletion) {
synchronized (startStopLock) {
logger.debug("Closing EventSource");
- closeEventSource(0, TimeUnit.SECONDS);
+ closeEventSource(waitingForCompletion);
logger.debug("EventSource stopped");
lastEventTimestamp = 0;
}
.register(new RemoteopenhabStreamingRequestFilter(accessToken)).build();
}
SseEventSource eventSource = eventSourceFactory.newSource(client.target(restSseUrl));
- eventSource.register(this::onEvent, this::onError);
+ eventSource.register(this::onEvent, this::onError, this::onComplete);
return eventSource;
}
return;
}
- closeEventSource(10, TimeUnit.SECONDS);
+ closeEventSource(true);
logger.debug("Opening new EventSource {}", url);
SseEventSource localEventSource = createEventSource(url);
eventSource = localEventSource;
}
- private void closeEventSource(long timeout, TimeUnit timeoutUnit) {
+ private void closeEventSource(boolean waitingForCompletion) {
SseEventSource localEventSource = eventSource;
if (localEventSource != null) {
- if (!localEventSource.isOpen()) {
+ if (!localEventSource.isOpen() || completed) {
logger.debug("Existing EventSource is already closed");
- } else if (localEventSource.close(timeout, timeoutUnit)) {
+ } else if (localEventSource.close(waitingForCompletion ? 10 : 0, TimeUnit.SECONDS)) {
logger.debug("Succesfully closed existing EventSource");
} else {
logger.debug("Failed to close existing EventSource");
}
}
+ private void onComplete() {
+ logger.debug("Disconnected from streaming events");
+ completed = true;
+ listeners.forEach(listener -> listener.onDisconnected());
+ }
+
private void onError(Throwable error) {
logger.debug("Error occurred while receiving events", error);
listeners.forEach(listener -> listener.onError("Error occurred while receiving events"));