]> git.basschouten.com Git - openhab-addons.git/blob
3e2686dc3755a100832943d4fcb47e0f71b9beb0
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.nest.internal.wwn.rest;
14
15 import static org.openhab.binding.nest.internal.wwn.WWNBindingConstants.KEEP_ALIVE_MILLIS;
16
17 import java.util.List;
18 import java.util.concurrent.CopyOnWriteArrayList;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.TimeUnit;
22
23 import javax.ws.rs.client.Client;
24 import javax.ws.rs.client.ClientBuilder;
25 import javax.ws.rs.sse.InboundSseEvent;
26 import javax.ws.rs.sse.SseEventSource;
27
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.openhab.binding.nest.internal.wwn.WWNUtils;
31 import org.openhab.binding.nest.internal.wwn.dto.WWNTopLevelData;
32 import org.openhab.binding.nest.internal.wwn.dto.WWNTopLevelStreamingData;
33 import org.openhab.binding.nest.internal.wwn.exceptions.FailedResolvingWWNUrlException;
34 import org.openhab.binding.nest.internal.wwn.handler.WWNRedirectUrlSupplier;
35 import org.openhab.binding.nest.internal.wwn.listener.WWNStreamingDataListener;
36 import org.osgi.service.jaxrs.client.SseEventSourceFactory;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * A client that generates events based on Nest streaming WWN REST API Server-Sent Events (SSE).
42  *
43  * @author Wouter Born - Initial contribution
44  * @author Wouter Born - Replace polling with REST streaming
45  */
46 @NonNullByDefault
47 public class WWNStreamingRestClient {
48
49     // Assume connection timeout when 2 keep alive message should have been received
50     private static final long CONNECTION_TIMEOUT_MILLIS = 2 * KEEP_ALIVE_MILLIS + KEEP_ALIVE_MILLIS / 2;
51
52     public static final String AUTH_REVOKED = "auth_revoked";
53     public static final String ERROR = "error";
54     public static final String KEEP_ALIVE = "keep-alive";
55     public static final String OPEN = "open";
56     public static final String PUT = "put";
57
58     private final Logger logger = LoggerFactory.getLogger(WWNStreamingRestClient.class);
59
60     private final String accessToken;
61     private final ClientBuilder clientBuilder;
62     private final SseEventSourceFactory eventSourceFactory;
63     private final WWNRedirectUrlSupplier redirectUrlSupplier;
64     private final ScheduledExecutorService scheduler;
65
66     private final Object startStopLock = new Object();
67     private final List<WWNStreamingDataListener> listeners = new CopyOnWriteArrayList<>();
68
69     private @Nullable ScheduledFuture<?> checkConnectionJob;
70     private boolean connected;
71     private @Nullable SseEventSource eventSource;
72     private long lastEventTimestamp;
73     private @Nullable WWNTopLevelData lastReceivedTopLevelData;
74
75     public WWNStreamingRestClient(String accessToken, ClientBuilder clientBuilder,
76             SseEventSourceFactory eventSourceFactory, WWNRedirectUrlSupplier redirectUrlSupplier,
77             ScheduledExecutorService scheduler) {
78         this.accessToken = accessToken;
79         this.clientBuilder = clientBuilder;
80         this.eventSourceFactory = eventSourceFactory;
81         this.redirectUrlSupplier = redirectUrlSupplier;
82         this.scheduler = scheduler;
83     }
84
85     private SseEventSource createEventSource() throws FailedResolvingWWNUrlException {
86         Client client = clientBuilder.register(new WWNStreamingRequestFilter(accessToken)).build();
87         SseEventSource eventSource = eventSourceFactory.newSource(client.target(redirectUrlSupplier.getRedirectUrl()));
88         eventSource.register(this::onEvent, this::onError);
89         return eventSource;
90     }
91
92     private void checkConnection() {
93         long millisSinceLastEvent = System.currentTimeMillis() - lastEventTimestamp;
94         if (millisSinceLastEvent > CONNECTION_TIMEOUT_MILLIS) {
95             logger.debug("Check: Disconnected from streaming events, millisSinceLastEvent={}", millisSinceLastEvent);
96             synchronized (startStopLock) {
97                 stopCheckConnectionJob(false);
98                 if (connected) {
99                     connected = false;
100                     listeners.forEach(listener -> listener.onDisconnected());
101                 }
102                 redirectUrlSupplier.resetCache();
103                 reopenEventSource();
104                 startCheckConnectionJob();
105             }
106         } else {
107             logger.debug("Check: Receiving streaming events, millisSinceLastEvent={}", millisSinceLastEvent);
108         }
109     }
110
111     /**
112      * Closes the existing EventSource and opens a new EventSource as workaround when the EventSource fails to reconnect
113      * itself.
114      */
115     private void reopenEventSource() {
116         try {
117             logger.debug("Reopening EventSource");
118             closeEventSource(10, TimeUnit.SECONDS);
119
120             logger.debug("Opening new EventSource");
121             SseEventSource localEventSource = createEventSource();
122             localEventSource.open();
123
124             eventSource = localEventSource;
125         } catch (FailedResolvingWWNUrlException e) {
126             logger.debug("Failed to resolve Nest redirect URL while opening new EventSource");
127         }
128     }
129
130     public void start() {
131         synchronized (startStopLock) {
132             logger.debug("Opening EventSource and starting checkConnection job");
133             reopenEventSource();
134             startCheckConnectionJob();
135             logger.debug("Started");
136         }
137     }
138
139     public void stop() {
140         synchronized (startStopLock) {
141             logger.debug("Closing EventSource and stopping checkConnection job");
142             stopCheckConnectionJob(true);
143             closeEventSource(0, TimeUnit.SECONDS);
144             logger.debug("Stopped");
145         }
146     }
147
148     private void closeEventSource(long timeout, TimeUnit timeoutUnit) {
149         SseEventSource localEventSource = eventSource;
150         if (localEventSource != null) {
151             if (!localEventSource.isOpen()) {
152                 logger.debug("Existing EventSource is already closed");
153             } else if (localEventSource.close(timeout, timeoutUnit)) {
154                 logger.debug("Succesfully closed existing EventSource");
155             } else {
156                 logger.debug("Failed to close existing EventSource");
157             }
158             eventSource = null;
159         }
160     }
161
162     private void startCheckConnectionJob() {
163         ScheduledFuture<?> localCheckConnectionJob = checkConnectionJob;
164         if (localCheckConnectionJob == null || localCheckConnectionJob.isCancelled()) {
165             checkConnectionJob = scheduler.scheduleWithFixedDelay(this::checkConnection, CONNECTION_TIMEOUT_MILLIS,
166                     KEEP_ALIVE_MILLIS, TimeUnit.MILLISECONDS);
167         }
168     }
169
170     private void stopCheckConnectionJob(boolean mayInterruptIfRunning) {
171         ScheduledFuture<?> localCheckConnectionJob = checkConnectionJob;
172         if (localCheckConnectionJob != null && !localCheckConnectionJob.isCancelled()) {
173             localCheckConnectionJob.cancel(mayInterruptIfRunning);
174             checkConnectionJob = null;
175         }
176     }
177
178     public boolean addStreamingDataListener(WWNStreamingDataListener listener) {
179         return listeners.add(listener);
180     }
181
182     public boolean removeStreamingDataListener(WWNStreamingDataListener listener) {
183         return listeners.remove(listener);
184     }
185
186     public @Nullable WWNTopLevelData getLastReceivedTopLevelData() {
187         return lastReceivedTopLevelData;
188     }
189
190     private void onEvent(InboundSseEvent inboundEvent) {
191         try {
192             lastEventTimestamp = System.currentTimeMillis();
193
194             String name = inboundEvent.getName();
195             String data = inboundEvent.readData();
196
197             logger.debug("Received '{}' event, data: {}", name, data);
198
199             if (!connected) {
200                 logger.debug("Connected to streaming events");
201                 connected = true;
202                 listeners.forEach(listener -> listener.onConnected());
203             }
204
205             if (AUTH_REVOKED.equals(name)) {
206                 logger.debug("API authorization has been revoked for access token: {}", data);
207                 listeners.forEach(listener -> listener.onAuthorizationRevoked(data));
208             } else if (ERROR.equals(name)) {
209                 logger.warn("Error occurred: {}", data);
210                 listeners.forEach(listener -> listener.onError(data));
211             } else if (KEEP_ALIVE.equals(name)) {
212                 logger.debug("Received message to keep connection alive");
213             } else if (OPEN.equals(name)) {
214                 logger.debug("Event stream opened");
215             } else if (PUT.equals(name)) {
216                 logger.debug("Data has changed (or initial data sent)");
217                 WWNTopLevelData topLevelData = WWNUtils.fromJson(data, WWNTopLevelStreamingData.class).getData();
218                 lastReceivedTopLevelData = topLevelData;
219                 listeners.forEach(listener -> listener.onNewTopLevelData(topLevelData));
220             } else {
221                 logger.debug("Received unhandled event with name '{}' and data '{}'", name, data);
222             }
223         } catch (Exception e) {
224             // catch exceptions here otherwise they will be swallowed by the implementation
225             logger.warn("An exception occurred while processing the inbound event", e);
226         }
227     }
228
229     private void onError(Throwable error) {
230         logger.debug("Error occurred while receiving events", error);
231     }
232 }