]> git.basschouten.com Git - openhab-addons.git/blob
7924702d6be13ca43815e2943bfebbe47110e471
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2024 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.boschshc.internal.devices.bridge;
14
15 import static org.eclipse.jetty.http.HttpMethod.POST;
16
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.TimeoutException;
21 import java.util.function.Consumer;
22
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.eclipse.jetty.client.api.Request;
26 import org.eclipse.jetty.client.api.Result;
27 import org.eclipse.jetty.client.util.BufferingResponseListener;
28 import org.openhab.binding.boschshc.internal.devices.bridge.dto.LongPollError;
29 import org.openhab.binding.boschshc.internal.devices.bridge.dto.LongPollResult;
30 import org.openhab.binding.boschshc.internal.devices.bridge.dto.SubscribeResult;
31 import org.openhab.binding.boschshc.internal.exceptions.BoschSHCException;
32 import org.openhab.binding.boschshc.internal.exceptions.LongPollingFailedException;
33 import org.openhab.binding.boschshc.internal.serialization.GsonUtils;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import com.google.gson.JsonSyntaxException;
38
39 /**
40  * Handles the long polling to the Smart Home Controller.
41  *
42  * @author Christian Oeing - Initial contribution
43  */
44 @NonNullByDefault
45 public class LongPolling {
46
47     private final Logger logger = LoggerFactory.getLogger(LongPolling.class);
48
49     /**
50      * Executor to schedule long polls.
51      */
52     private final ScheduledExecutorService scheduler;
53
54     /**
55      * Handler for long poll results.
56      */
57     private final Consumer<LongPollResult> handleResult;
58
59     /**
60      * Handler for unrecoverable.
61      */
62     private final Consumer<Throwable> handleFailure;
63
64     /**
65      * Current running long polling request.
66      */
67     private @Nullable Request request;
68
69     /**
70      * Indicates if long polling was aborted.
71      */
72     private boolean aborted = false;
73
74     public LongPolling(ScheduledExecutorService scheduler, Consumer<LongPollResult> handleResult,
75             Consumer<Throwable> handleFailure) {
76         this.scheduler = scheduler;
77         this.handleResult = handleResult;
78         this.handleFailure = handleFailure;
79     }
80
81     public void start(BoschHttpClient httpClient) throws LongPollingFailedException {
82         // Subscribe to state updates.
83         String subscriptionId = this.subscribe(httpClient);
84         this.executeLongPoll(httpClient, subscriptionId);
85     }
86
87     public void stop() {
88         // Abort long polling.
89         this.aborted = true;
90         Request request = this.request;
91         if (request != null) {
92             request.abort(new AbortLongPolling());
93             this.request = null;
94         }
95     }
96
97     /**
98      * Subscribe to events and store the subscription ID needed for long polling.
99      *
100      * @param httpClient Http client to use for sending subscription request
101      * @return Subscription id
102      */
103     private String subscribe(BoschHttpClient httpClient) throws LongPollingFailedException {
104         try {
105             String url = httpClient.getBoschShcUrl("remote/json-rpc");
106             JsonRpcRequest subscriptionRequest = new JsonRpcRequest("2.0", "RE/subscribe",
107                     new String[] { "com/bosch/sh/remote/*", null });
108             logger.debug("Subscribe: Sending request: {} - using httpClient {}", subscriptionRequest, httpClient);
109             Request httpRequest = httpClient.createRequest(url, POST, subscriptionRequest);
110             SubscribeResult response = httpClient.sendRequest(httpRequest, SubscribeResult.class,
111                     SubscribeResult::isValid, null);
112
113             logger.debug("Subscribe: Got subscription ID: {} {}", response.getResult(), response.getJsonrpc());
114             return response.getResult();
115         } catch (TimeoutException | ExecutionException | BoschSHCException e) {
116             throw new LongPollingFailedException(
117                     String.format("Error on subscribe (Http client: %s): %s", httpClient.toString(), e.getMessage()),
118                     e);
119         } catch (InterruptedException e) {
120             Thread.currentThread().interrupt();
121             throw new LongPollingFailedException(
122                     String.format("Interrupted subscribe (Http client: %s): %s", httpClient.toString(), e.getMessage()),
123                     e);
124         }
125     }
126
127     /**
128      * Create a new subscription for long polling.
129      *
130      * @param httpClient Http client to send requests to
131      */
132     private void resubscribe(BoschHttpClient httpClient) {
133         try {
134             String subscriptionId = this.subscribe(httpClient);
135             this.executeLongPoll(httpClient, subscriptionId);
136         } catch (LongPollingFailedException e) {
137             this.handleFailure.accept(e);
138         }
139     }
140
141     private void executeLongPoll(BoschHttpClient httpClient, String subscriptionId) {
142         scheduler.execute(() -> this.longPoll(httpClient, subscriptionId));
143     }
144
145     /**
146      * Start long polling the home controller. Once a long poll resolves, a new one
147      * is started.
148      */
149     private void longPoll(BoschHttpClient httpClient, String subscriptionId) {
150         logger.debug("Sending long poll request");
151
152         JsonRpcRequest requestContent = new JsonRpcRequest("2.0", "RE/longPoll", new String[] { subscriptionId, "20" });
153         String url = httpClient.getBoschShcUrl("remote/json-rpc");
154         Request longPollRequest = httpClient.createRequest(url, POST, requestContent);
155
156         // Long polling responds after 20 seconds with an empty response if no update
157         // has happened. 10 second threshold was added to not time out if response
158         // from controller takes a bit longer than 20 seconds.
159         longPollRequest.timeout(30, TimeUnit.SECONDS);
160
161         this.request = longPollRequest;
162         LongPolling longPolling = this;
163         longPollRequest.send(new BufferingResponseListener() {
164             @Override
165             public void onComplete(@Nullable Result result) {
166                 // NOTE: This handler runs inside the HTTP thread, so we schedule the response
167                 // handling in a new thread because the HTTP thread is terminated after the
168                 // timeout expires.
169                 scheduler.execute(() -> longPolling.onLongPollComplete(httpClient, subscriptionId, result,
170                         this.getContentAsString()));
171             }
172         });
173     }
174
175     /**
176      * This is the handler for responses of long poll requests.
177      *
178      * @param httpClient HTTP client which received the response
179      * @param subscriptionId Id of subscription the response is for
180      * @param result Complete result of the response
181      * @param content Content of the response
182      */
183     private void onLongPollComplete(BoschHttpClient httpClient, String subscriptionId, @Nullable Result result,
184             String content) {
185         // Check if thing is still online
186         if (this.aborted) {
187             logger.debug("Canceling long polling for subscription id {} because it was aborted", subscriptionId);
188             return;
189         }
190
191         // Check if response was failure or success
192         Throwable failure = result != null ? result.getFailure() : null;
193         if (failure != null) {
194             handleLongPollFailure(subscriptionId, failure);
195         } else {
196             handleLongPollResponse(httpClient, subscriptionId, content);
197         }
198     }
199
200     /**
201      * Attempts to parse and process the long poll response content.
202      * <p>
203      * If the response cannot be parsed as {@link LongPollResult}, an attempt is made to parse a {@link LongPollError}.
204      * In case a {@link LongPollError} is present with the code <code>SUBSCRIPTION_INVALID</code>, a re-subscription is
205      * initiated.
206      * <p>
207      * If the response does not contain syntactically valid JSON, a new subscription is attempted with a delay of 15
208      * seconds.
209      * 
210      * @param httpClient HTTP client which received the response
211      * @param subscriptionId Id of subscription the response is for
212      * @param content Content of the response
213      */
214     private void handleLongPollResponse(BoschHttpClient httpClient, String subscriptionId, String content) {
215         logger.debug("Long poll response: {}", content);
216
217         try {
218             LongPollResult longPollResult = GsonUtils.DEFAULT_GSON_INSTANCE.fromJson(content, LongPollResult.class);
219             if (longPollResult != null && longPollResult.result != null) {
220                 this.handleResult.accept(longPollResult);
221             } else {
222                 logger.debug("Long poll response contained no result: {}", content);
223
224                 // Check if we got a proper result from the SHC
225                 LongPollError longPollError = GsonUtils.DEFAULT_GSON_INSTANCE.fromJson(content, LongPollError.class);
226
227                 if (longPollError != null && longPollError.error != null) {
228                     logger.debug("Got long poll error: {} (code: {})", longPollError.error.message,
229                             longPollError.error.code);
230
231                     if (longPollError.error.code == LongPollError.SUBSCRIPTION_INVALID) {
232                         logger.debug("Subscription {} became invalid, subscribing again", subscriptionId);
233                         this.resubscribe(httpClient);
234                         return;
235                     }
236                 }
237             }
238         } catch (JsonSyntaxException e) {
239             this.handleFailure.accept(
240                     new LongPollingFailedException("Could not deserialize long poll response: '" + content + "'", e));
241             return;
242         }
243
244         // Execute next run
245         this.longPoll(httpClient, subscriptionId);
246     }
247
248     private void handleLongPollFailure(String subscriptionId, Throwable failure) {
249         if (failure instanceof ExecutionException) {
250             if (failure.getCause() instanceof AbortLongPolling) {
251                 logger.debug("Canceling long polling for subscription id {} because it was aborted", subscriptionId);
252             } else {
253                 this.handleFailure.accept(
254                         new LongPollingFailedException("Unexpected exception during long polling request", failure));
255             }
256         } else {
257             this.handleFailure.accept(
258                     new LongPollingFailedException("Unexpected exception during long polling request", failure));
259         }
260     }
261
262     @SuppressWarnings("serial")
263     private class AbortLongPolling extends BoschSHCException {
264     }
265 }