2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.nest.internal.wwn.rest;
15 import static org.openhab.binding.nest.internal.wwn.WWNBindingConstants.KEEP_ALIVE_MILLIS;
17 import java.util.List;
18 import java.util.concurrent.CopyOnWriteArrayList;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.TimeUnit;
23 import javax.ws.rs.client.Client;
24 import javax.ws.rs.client.ClientBuilder;
25 import javax.ws.rs.sse.InboundSseEvent;
26 import javax.ws.rs.sse.SseEventSource;
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.openhab.binding.nest.internal.wwn.WWNUtils;
31 import org.openhab.binding.nest.internal.wwn.dto.WWNTopLevelData;
32 import org.openhab.binding.nest.internal.wwn.dto.WWNTopLevelStreamingData;
33 import org.openhab.binding.nest.internal.wwn.exceptions.FailedResolvingWWNUrlException;
34 import org.openhab.binding.nest.internal.wwn.handler.WWNRedirectUrlSupplier;
35 import org.openhab.binding.nest.internal.wwn.listener.WWNStreamingDataListener;
36 import org.osgi.service.jaxrs.client.SseEventSourceFactory;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
41 * A client that generates events based on Nest streaming WWN REST API Server-Sent Events (SSE).
43 * @author Wouter Born - Initial contribution
44 * @author Wouter Born - Replace polling with REST streaming
47 public class WWNStreamingRestClient {
49 // Assume connection timeout when 2 keep alive message should have been received
50 private static final long CONNECTION_TIMEOUT_MILLIS = 2 * KEEP_ALIVE_MILLIS + KEEP_ALIVE_MILLIS / 2;
52 public static final String AUTH_REVOKED = "auth_revoked";
53 public static final String ERROR = "error";
54 public static final String KEEP_ALIVE = "keep-alive";
55 public static final String OPEN = "open";
56 public static final String PUT = "put";
58 private final Logger logger = LoggerFactory.getLogger(WWNStreamingRestClient.class);
60 private final String accessToken;
61 private final ClientBuilder clientBuilder;
62 private final SseEventSourceFactory eventSourceFactory;
63 private final WWNRedirectUrlSupplier redirectUrlSupplier;
64 private final ScheduledExecutorService scheduler;
66 private final Object startStopLock = new Object();
67 private final List<WWNStreamingDataListener> listeners = new CopyOnWriteArrayList<>();
69 private @Nullable ScheduledFuture<?> checkConnectionJob;
70 private boolean connected;
71 private @Nullable SseEventSource eventSource;
72 private long lastEventTimestamp;
73 private @Nullable WWNTopLevelData lastReceivedTopLevelData;
75 public WWNStreamingRestClient(String accessToken, ClientBuilder clientBuilder,
76 SseEventSourceFactory eventSourceFactory, WWNRedirectUrlSupplier redirectUrlSupplier,
77 ScheduledExecutorService scheduler) {
78 this.accessToken = accessToken;
79 this.clientBuilder = clientBuilder;
80 this.eventSourceFactory = eventSourceFactory;
81 this.redirectUrlSupplier = redirectUrlSupplier;
82 this.scheduler = scheduler;
85 private SseEventSource createEventSource() throws FailedResolvingWWNUrlException {
86 Client client = clientBuilder.register(new WWNStreamingRequestFilter(accessToken)).build();
87 SseEventSource eventSource = eventSourceFactory.newSource(client.target(redirectUrlSupplier.getRedirectUrl()));
88 eventSource.register(this::onEvent, this::onError);
92 private void checkConnection() {
93 long millisSinceLastEvent = System.currentTimeMillis() - lastEventTimestamp;
94 if (millisSinceLastEvent > CONNECTION_TIMEOUT_MILLIS) {
95 logger.debug("Check: Disconnected from streaming events, millisSinceLastEvent={}", millisSinceLastEvent);
96 synchronized (startStopLock) {
97 stopCheckConnectionJob(false);
100 listeners.forEach(listener -> listener.onDisconnected());
102 redirectUrlSupplier.resetCache();
104 startCheckConnectionJob();
107 logger.debug("Check: Receiving streaming events, millisSinceLastEvent={}", millisSinceLastEvent);
112 * Closes the existing EventSource and opens a new EventSource as workaround when the EventSource fails to reconnect
115 private void reopenEventSource() {
117 logger.debug("Reopening EventSource");
118 closeEventSource(10, TimeUnit.SECONDS);
120 logger.debug("Opening new EventSource");
121 SseEventSource localEventSource = createEventSource();
122 localEventSource.open();
124 eventSource = localEventSource;
125 } catch (FailedResolvingWWNUrlException e) {
126 logger.debug("Failed to resolve Nest redirect URL while opening new EventSource");
130 public void start() {
131 synchronized (startStopLock) {
132 logger.debug("Opening EventSource and starting checkConnection job");
134 startCheckConnectionJob();
135 logger.debug("Started");
140 synchronized (startStopLock) {
141 logger.debug("Closing EventSource and stopping checkConnection job");
142 stopCheckConnectionJob(true);
143 closeEventSource(0, TimeUnit.SECONDS);
144 logger.debug("Stopped");
148 private void closeEventSource(long timeout, TimeUnit timeoutUnit) {
149 SseEventSource localEventSource = eventSource;
150 if (localEventSource != null) {
151 if (!localEventSource.isOpen()) {
152 logger.debug("Existing EventSource is already closed");
153 } else if (localEventSource.close(timeout, timeoutUnit)) {
154 logger.debug("Succesfully closed existing EventSource");
156 logger.debug("Failed to close existing EventSource");
162 private void startCheckConnectionJob() {
163 ScheduledFuture<?> localCheckConnectionJob = checkConnectionJob;
164 if (localCheckConnectionJob == null || localCheckConnectionJob.isCancelled()) {
165 checkConnectionJob = scheduler.scheduleWithFixedDelay(this::checkConnection, CONNECTION_TIMEOUT_MILLIS,
166 KEEP_ALIVE_MILLIS, TimeUnit.MILLISECONDS);
170 private void stopCheckConnectionJob(boolean mayInterruptIfRunning) {
171 ScheduledFuture<?> localCheckConnectionJob = checkConnectionJob;
172 if (localCheckConnectionJob != null && !localCheckConnectionJob.isCancelled()) {
173 localCheckConnectionJob.cancel(mayInterruptIfRunning);
174 checkConnectionJob = null;
178 public boolean addStreamingDataListener(WWNStreamingDataListener listener) {
179 return listeners.add(listener);
182 public boolean removeStreamingDataListener(WWNStreamingDataListener listener) {
183 return listeners.remove(listener);
186 public @Nullable WWNTopLevelData getLastReceivedTopLevelData() {
187 return lastReceivedTopLevelData;
190 private void onEvent(InboundSseEvent inboundEvent) {
192 lastEventTimestamp = System.currentTimeMillis();
194 String name = inboundEvent.getName();
195 String data = inboundEvent.readData();
197 logger.debug("Received '{}' event, data: {}", name, data);
200 logger.debug("Connected to streaming events");
202 listeners.forEach(listener -> listener.onConnected());
205 if (AUTH_REVOKED.equals(name)) {
206 logger.debug("API authorization has been revoked for access token: {}", data);
207 listeners.forEach(listener -> listener.onAuthorizationRevoked(data));
208 } else if (ERROR.equals(name)) {
209 logger.warn("Error occurred: {}", data);
210 listeners.forEach(listener -> listener.onError(data));
211 } else if (KEEP_ALIVE.equals(name)) {
212 logger.debug("Received message to keep connection alive");
213 } else if (OPEN.equals(name)) {
214 logger.debug("Event stream opened");
215 } else if (PUT.equals(name)) {
216 logger.debug("Data has changed (or initial data sent)");
217 WWNTopLevelData topLevelData = WWNUtils.fromJson(data, WWNTopLevelStreamingData.class).getData();
218 lastReceivedTopLevelData = topLevelData;
219 listeners.forEach(listener -> listener.onNewTopLevelData(topLevelData));
221 logger.debug("Received unhandled event with name '{}' and data '{}'", name, data);
223 } catch (Exception e) {
224 // catch exceptions here otherwise they will be swallowed by the implementation
225 logger.warn("An exception occurred while processing the inbound event", e);
229 private void onError(Throwable error) {
230 logger.debug("Error occurred while receiving events", error);