]> git.basschouten.com Git - openhab-addons.git/blob
e5fd1842da26be11658f97e3fc3853e18a517b1b
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.boschshc.internal.devices.bridge;
14
15 import static org.eclipse.jetty.http.HttpMethod.POST;
16
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.TimeoutException;
21 import java.util.function.Consumer;
22
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.eclipse.jetty.client.api.Request;
26 import org.eclipse.jetty.client.api.Result;
27 import org.eclipse.jetty.client.util.BufferingResponseListener;
28 import org.openhab.binding.boschshc.internal.devices.bridge.dto.LongPollError;
29 import org.openhab.binding.boschshc.internal.devices.bridge.dto.LongPollResult;
30 import org.openhab.binding.boschshc.internal.devices.bridge.dto.SubscribeResult;
31 import org.openhab.binding.boschshc.internal.exceptions.BoschSHCException;
32 import org.openhab.binding.boschshc.internal.exceptions.LongPollingFailedException;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 import com.google.gson.Gson;
37
38 /**
39  * Handles the long polling to the Smart Home Controller.
40  *
41  * @author Christian Oeing - Initial contribution
42  */
43 @NonNullByDefault
44 public class LongPolling {
45
46     private final Logger logger = LoggerFactory.getLogger(LongPolling.class);
47
48     /**
49      * gson instance to convert a class to json string and back.
50      */
51     private final Gson gson = new Gson();
52
53     /**
54      * Executor to schedule long polls.
55      */
56     private final ScheduledExecutorService scheduler;
57
58     /**
59      * Handler for long poll results.
60      */
61     private final Consumer<LongPollResult> handleResult;
62
63     /**
64      * Handler for unrecoverable.
65      */
66     private final Consumer<Throwable> handleFailure;
67
68     /**
69      * Current running long polling request.
70      */
71     private @Nullable Request request;
72
73     /**
74      * Indicates if long polling was aborted.
75      */
76     private boolean aborted = false;
77
78     public LongPolling(ScheduledExecutorService scheduler, Consumer<LongPollResult> handleResult,
79             Consumer<Throwable> handleFailure) {
80         this.scheduler = scheduler;
81         this.handleResult = handleResult;
82         this.handleFailure = handleFailure;
83     }
84
85     public void start(BoschHttpClient httpClient) throws LongPollingFailedException {
86         // Subscribe to state updates.
87         String subscriptionId = this.subscribe(httpClient);
88         this.executeLongPoll(httpClient, subscriptionId);
89     }
90
91     public void stop() {
92         // Abort long polling.
93         this.aborted = true;
94         Request request = this.request;
95         if (request != null) {
96             request.abort(new AbortLongPolling());
97             this.request = null;
98         }
99     }
100
101     /**
102      * Subscribe to events and store the subscription ID needed for long polling.
103      *
104      * @param httpClient Http client to use for sending subscription request
105      * @return Subscription id
106      */
107     private String subscribe(BoschHttpClient httpClient) throws LongPollingFailedException {
108         try {
109             String url = httpClient.getBoschShcUrl("remote/json-rpc");
110             JsonRpcRequest request = new JsonRpcRequest("2.0", "RE/subscribe",
111                     new String[] { "com/bosch/sh/remote/*", null });
112             logger.debug("Subscribe: Sending request: {} - using httpClient {}", request.toString(),
113                     httpClient.toString());
114             Request httpRequest = httpClient.createRequest(url, POST, request);
115             SubscribeResult response = httpClient.sendRequest(httpRequest, SubscribeResult.class,
116                     SubscribeResult::isValid, null);
117
118             logger.debug("Subscribe: Got subscription ID: {} {}", response.getResult(), response.getJsonrpc());
119             String subscriptionId = response.getResult();
120             return subscriptionId;
121         } catch (TimeoutException | ExecutionException | BoschSHCException e) {
122             throw new LongPollingFailedException(
123                     String.format("Error on subscribe (Http client: %s): %s", httpClient.toString(), e.getMessage()),
124                     e);
125         } catch (InterruptedException e) {
126             Thread.currentThread().interrupt();
127             throw new LongPollingFailedException(
128                     String.format("Interrupted subscribe (Http client: %s): %s", httpClient.toString(), e.getMessage()),
129                     e);
130         }
131     }
132
133     /**
134      * Create a new subscription for long polling.
135      *
136      * @param httpClient Http client to send requests to
137      */
138     private void resubscribe(BoschHttpClient httpClient) {
139         try {
140             String subscriptionId = this.subscribe(httpClient);
141             this.executeLongPoll(httpClient, subscriptionId);
142         } catch (LongPollingFailedException e) {
143             this.handleFailure.accept(e);
144             return;
145         }
146     }
147
148     private void executeLongPoll(BoschHttpClient httpClient, String subscriptionId) {
149         scheduler.execute(() -> this.longPoll(httpClient, subscriptionId));
150     }
151
152     /**
153      * Start long polling the home controller. Once a long poll resolves, a new one is started.
154      */
155     private void longPoll(BoschHttpClient httpClient, String subscriptionId) {
156         logger.debug("Sending long poll request");
157
158         JsonRpcRequest requestContent = new JsonRpcRequest("2.0", "RE/longPoll", new String[] { subscriptionId, "20" });
159         String url = httpClient.getBoschShcUrl("remote/json-rpc");
160         Request request = httpClient.createRequest(url, POST, requestContent);
161
162         // Long polling responds after 20 seconds with an empty response if no update has happened.
163         // 10 second threshold was added to not time out if response from controller takes a bit longer than 20 seconds.
164         request.timeout(30, TimeUnit.SECONDS);
165
166         this.request = request;
167         LongPolling longPolling = this;
168         request.send(new BufferingResponseListener() {
169             @Override
170             public void onComplete(@Nullable Result result) {
171                 // NOTE: This handler runs inside the HTTP thread, so we schedule the response handling in a new thread
172                 // because the HTTP thread is terminated after the timeout expires.
173                 scheduler.execute(() -> longPolling.onLongPollComplete(httpClient, subscriptionId, result,
174                         this.getContentAsString()));
175             }
176         });
177     }
178
179     /**
180      * This is the handler for responses of long poll requests.
181      *
182      * @param httpClient HTTP client which received the response
183      * @param subscriptionId Id of subscription the response is for
184      * @param result Complete result of the response
185      * @param content Content of the response
186      */
187     private void onLongPollComplete(BoschHttpClient httpClient, String subscriptionId, @Nullable Result result,
188             String content) {
189         // Check if thing is still online
190         if (this.aborted) {
191             logger.debug("Canceling long polling for subscription id {} because it was aborted", subscriptionId);
192             return;
193         }
194
195         // Check if response was failure or success
196         Throwable failure = result != null ? result.getFailure() : null;
197         if (failure != null) {
198             if (failure instanceof ExecutionException) {
199                 if (failure.getCause() instanceof AbortLongPolling) {
200                     logger.debug("Canceling long polling for subscription id {} because it was aborted",
201                             subscriptionId);
202                 } else {
203                     this.handleFailure.accept(new LongPollingFailedException(
204                             "Unexpected exception during long polling request", failure));
205                 }
206             } else {
207                 this.handleFailure.accept(
208                         new LongPollingFailedException("Unexpected exception during long polling request", failure));
209             }
210         } else {
211             logger.debug("Long poll response: {}", content);
212
213             String nextSubscriptionId = subscriptionId;
214
215             LongPollResult longPollResult = gson.fromJson(content, LongPollResult.class);
216             if (longPollResult != null && longPollResult.result != null) {
217                 this.handleResult.accept(longPollResult);
218             } else {
219                 logger.debug("Long poll response contained no result: {}", content);
220
221                 // Check if we got a proper result from the SHC
222                 LongPollError longPollError = gson.fromJson(content, LongPollError.class);
223
224                 if (longPollError != null && longPollError.error != null) {
225                     logger.debug("Got long poll error: {} (code: {})", longPollError.error.message,
226                             longPollError.error.code);
227
228                     if (longPollError.error.code == LongPollError.SUBSCRIPTION_INVALID) {
229                         logger.debug("Subscription {} became invalid, subscribing again", subscriptionId);
230                         this.resubscribe(httpClient);
231                         return;
232                     }
233                 }
234             }
235
236             // Execute next run
237             this.longPoll(httpClient, nextSubscriptionId);
238         }
239     }
240
241     @SuppressWarnings("serial")
242     private class AbortLongPolling extends BoschSHCException {
243     }
244 }