]> git.basschouten.com Git - openhab-addons.git/blob
f1a843af6b901d9a4dc772e3eec50a257a3cba0b
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.boschshc.internal.devices.bridge;
14
15 import static org.eclipse.jetty.http.HttpMethod.POST;
16
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.TimeoutException;
21 import java.util.function.Consumer;
22
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.eclipse.jetty.client.api.Request;
26 import org.eclipse.jetty.client.api.Result;
27 import org.eclipse.jetty.client.util.BufferingResponseListener;
28 import org.openhab.binding.boschshc.internal.devices.bridge.dto.LongPollError;
29 import org.openhab.binding.boschshc.internal.devices.bridge.dto.LongPollResult;
30 import org.openhab.binding.boschshc.internal.devices.bridge.dto.SubscribeResult;
31 import org.openhab.binding.boschshc.internal.exceptions.BoschSHCException;
32 import org.openhab.binding.boschshc.internal.exceptions.LongPollingFailedException;
33 import org.openhab.binding.boschshc.internal.serialization.GsonUtils;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * Handles the long polling to the Smart Home Controller.
39  *
40  * @author Christian Oeing - Initial contribution
41  */
42 @NonNullByDefault
43 public class LongPolling {
44
45     private final Logger logger = LoggerFactory.getLogger(LongPolling.class);
46
47     /**
48      * Executor to schedule long polls.
49      */
50     private final ScheduledExecutorService scheduler;
51
52     /**
53      * Handler for long poll results.
54      */
55     private final Consumer<LongPollResult> handleResult;
56
57     /**
58      * Handler for unrecoverable.
59      */
60     private final Consumer<Throwable> handleFailure;
61
62     /**
63      * Current running long polling request.
64      */
65     private @Nullable Request request;
66
67     /**
68      * Indicates if long polling was aborted.
69      */
70     private boolean aborted = false;
71
72     public LongPolling(ScheduledExecutorService scheduler, Consumer<LongPollResult> handleResult,
73             Consumer<Throwable> handleFailure) {
74         this.scheduler = scheduler;
75         this.handleResult = handleResult;
76         this.handleFailure = handleFailure;
77     }
78
79     public void start(BoschHttpClient httpClient) throws LongPollingFailedException {
80         // Subscribe to state updates.
81         String subscriptionId = this.subscribe(httpClient);
82         this.executeLongPoll(httpClient, subscriptionId);
83     }
84
85     public void stop() {
86         // Abort long polling.
87         this.aborted = true;
88         Request request = this.request;
89         if (request != null) {
90             request.abort(new AbortLongPolling());
91             this.request = null;
92         }
93     }
94
95     /**
96      * Subscribe to events and store the subscription ID needed for long polling.
97      *
98      * @param httpClient Http client to use for sending subscription request
99      * @return Subscription id
100      */
101     private String subscribe(BoschHttpClient httpClient) throws LongPollingFailedException {
102         try {
103             String url = httpClient.getBoschShcUrl("remote/json-rpc");
104             JsonRpcRequest subscriptionRequest = new JsonRpcRequest("2.0", "RE/subscribe",
105                     new String[] { "com/bosch/sh/remote/*", null });
106             logger.debug("Subscribe: Sending request: {} - using httpClient {}", subscriptionRequest, httpClient);
107             Request httpRequest = httpClient.createRequest(url, POST, subscriptionRequest);
108             SubscribeResult response = httpClient.sendRequest(httpRequest, SubscribeResult.class,
109                     SubscribeResult::isValid, null);
110
111             logger.debug("Subscribe: Got subscription ID: {} {}", response.getResult(), response.getJsonrpc());
112             return response.getResult();
113         } catch (TimeoutException | ExecutionException | BoschSHCException e) {
114             throw new LongPollingFailedException(
115                     String.format("Error on subscribe (Http client: %s): %s", httpClient.toString(), e.getMessage()),
116                     e);
117         } catch (InterruptedException e) {
118             Thread.currentThread().interrupt();
119             throw new LongPollingFailedException(
120                     String.format("Interrupted subscribe (Http client: %s): %s", httpClient.toString(), e.getMessage()),
121                     e);
122         }
123     }
124
125     /**
126      * Create a new subscription for long polling.
127      *
128      * @param httpClient Http client to send requests to
129      */
130     private void resubscribe(BoschHttpClient httpClient) {
131         try {
132             String subscriptionId = this.subscribe(httpClient);
133             this.executeLongPoll(httpClient, subscriptionId);
134         } catch (LongPollingFailedException e) {
135             this.handleFailure.accept(e);
136         }
137     }
138
139     private void executeLongPoll(BoschHttpClient httpClient, String subscriptionId) {
140         scheduler.execute(() -> this.longPoll(httpClient, subscriptionId));
141     }
142
143     /**
144      * Start long polling the home controller. Once a long poll resolves, a new one is started.
145      */
146     private void longPoll(BoschHttpClient httpClient, String subscriptionId) {
147         logger.debug("Sending long poll request");
148
149         JsonRpcRequest requestContent = new JsonRpcRequest("2.0", "RE/longPoll", new String[] { subscriptionId, "20" });
150         String url = httpClient.getBoschShcUrl("remote/json-rpc");
151         Request longPollRequest = httpClient.createRequest(url, POST, requestContent);
152
153         // Long polling responds after 20 seconds with an empty response if no update has happened.
154         // 10 second threshold was added to not time out if response from controller takes a bit longer than 20 seconds.
155         longPollRequest.timeout(30, TimeUnit.SECONDS);
156
157         this.request = longPollRequest;
158         LongPolling longPolling = this;
159         longPollRequest.send(new BufferingResponseListener() {
160             @Override
161             public void onComplete(@Nullable Result result) {
162                 // NOTE: This handler runs inside the HTTP thread, so we schedule the response handling in a new thread
163                 // because the HTTP thread is terminated after the timeout expires.
164                 scheduler.execute(() -> longPolling.onLongPollComplete(httpClient, subscriptionId, result,
165                         this.getContentAsString()));
166             }
167         });
168     }
169
170     /**
171      * This is the handler for responses of long poll requests.
172      *
173      * @param httpClient HTTP client which received the response
174      * @param subscriptionId Id of subscription the response is for
175      * @param result Complete result of the response
176      * @param content Content of the response
177      */
178     private void onLongPollComplete(BoschHttpClient httpClient, String subscriptionId, @Nullable Result result,
179             String content) {
180         // Check if thing is still online
181         if (this.aborted) {
182             logger.debug("Canceling long polling for subscription id {} because it was aborted", subscriptionId);
183             return;
184         }
185
186         // Check if response was failure or success
187         Throwable failure = result != null ? result.getFailure() : null;
188         if (failure != null) {
189             handleLongPollFailure(subscriptionId, failure);
190         } else {
191             logger.debug("Long poll response: {}", content);
192
193             String nextSubscriptionId = subscriptionId;
194
195             LongPollResult longPollResult = GsonUtils.DEFAULT_GSON_INSTANCE.fromJson(content, LongPollResult.class);
196             if (longPollResult != null && longPollResult.result != null) {
197                 this.handleResult.accept(longPollResult);
198             } else {
199                 logger.debug("Long poll response contained no result: {}", content);
200
201                 // Check if we got a proper result from the SHC
202                 LongPollError longPollError = GsonUtils.DEFAULT_GSON_INSTANCE.fromJson(content, LongPollError.class);
203
204                 if (longPollError != null && longPollError.error != null) {
205                     logger.debug("Got long poll error: {} (code: {})", longPollError.error.message,
206                             longPollError.error.code);
207
208                     if (longPollError.error.code == LongPollError.SUBSCRIPTION_INVALID) {
209                         logger.debug("Subscription {} became invalid, subscribing again", subscriptionId);
210                         this.resubscribe(httpClient);
211                         return;
212                     }
213                 }
214             }
215
216             // Execute next run
217             this.longPoll(httpClient, nextSubscriptionId);
218         }
219     }
220
221     private void handleLongPollFailure(String subscriptionId, Throwable failure) {
222         if (failure instanceof ExecutionException) {
223             if (failure.getCause() instanceof AbortLongPolling) {
224                 logger.debug("Canceling long polling for subscription id {} because it was aborted", subscriptionId);
225             } else {
226                 this.handleFailure.accept(
227                         new LongPollingFailedException("Unexpected exception during long polling request", failure));
228             }
229         } else {
230             this.handleFailure.accept(
231                     new LongPollingFailedException("Unexpected exception during long polling request", failure));
232         }
233     }
234
235     @SuppressWarnings("serial")
236     private class AbortLongPolling extends BoschSHCException {
237     }
238 }