2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.boschshc.internal.devices.bridge;
15 import static org.eclipse.jetty.http.HttpMethod.POST;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.TimeoutException;
21 import java.util.function.Consumer;
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.eclipse.jetty.client.api.Request;
26 import org.eclipse.jetty.client.api.Result;
27 import org.eclipse.jetty.client.util.BufferingResponseListener;
28 import org.openhab.binding.boschshc.internal.devices.bridge.dto.LongPollError;
29 import org.openhab.binding.boschshc.internal.devices.bridge.dto.LongPollResult;
30 import org.openhab.binding.boschshc.internal.devices.bridge.dto.SubscribeResult;
31 import org.openhab.binding.boschshc.internal.exceptions.BoschSHCException;
32 import org.openhab.binding.boschshc.internal.exceptions.LongPollingFailedException;
33 import org.openhab.binding.boschshc.internal.serialization.GsonUtils;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 import com.google.gson.JsonSyntaxException;
40 * Handles the long polling to the Smart Home Controller.
42 * @author Christian Oeing - Initial contribution
45 public class LongPolling {
47 private final Logger logger = LoggerFactory.getLogger(LongPolling.class);
50 * Executor to schedule long polls.
52 private final ScheduledExecutorService scheduler;
55 * Handler for long poll results.
57 private final Consumer<LongPollResult> handleResult;
60 * Handler for unrecoverable.
62 private final Consumer<Throwable> handleFailure;
65 * Current running long polling request.
67 private @Nullable Request request;
70 * Indicates if long polling was aborted.
72 private boolean aborted = false;
74 public LongPolling(ScheduledExecutorService scheduler, Consumer<LongPollResult> handleResult,
75 Consumer<Throwable> handleFailure) {
76 this.scheduler = scheduler;
77 this.handleResult = handleResult;
78 this.handleFailure = handleFailure;
81 public void start(BoschHttpClient httpClient) throws LongPollingFailedException {
82 // Subscribe to state updates.
83 String subscriptionId = this.subscribe(httpClient);
84 this.executeLongPoll(httpClient, subscriptionId);
88 // Abort long polling.
90 Request request = this.request;
91 if (request != null) {
92 request.abort(new AbortLongPolling());
98 * Subscribe to events and store the subscription ID needed for long polling.
100 * @param httpClient Http client to use for sending subscription request
101 * @return Subscription id
103 private String subscribe(BoschHttpClient httpClient) throws LongPollingFailedException {
105 String url = httpClient.getBoschShcUrl("remote/json-rpc");
106 JsonRpcRequest subscriptionRequest = new JsonRpcRequest("2.0", "RE/subscribe",
107 new String[] { "com/bosch/sh/remote/*", null });
108 logger.debug("Subscribe: Sending request: {} - using httpClient {}", subscriptionRequest, httpClient);
109 Request httpRequest = httpClient.createRequest(url, POST, subscriptionRequest);
110 SubscribeResult response = httpClient.sendRequest(httpRequest, SubscribeResult.class,
111 SubscribeResult::isValid, null);
113 logger.debug("Subscribe: Got subscription ID: {} {}", response.getResult(), response.getJsonrpc());
114 return response.getResult();
115 } catch (TimeoutException | ExecutionException | BoschSHCException e) {
116 throw new LongPollingFailedException(
117 String.format("Error on subscribe (Http client: %s): %s", httpClient.toString(), e.getMessage()),
119 } catch (InterruptedException e) {
120 Thread.currentThread().interrupt();
121 throw new LongPollingFailedException(
122 String.format("Interrupted subscribe (Http client: %s): %s", httpClient.toString(), e.getMessage()),
128 * Create a new subscription for long polling.
130 * @param httpClient Http client to send requests to
132 private void resubscribe(BoschHttpClient httpClient) {
134 String subscriptionId = this.subscribe(httpClient);
135 this.executeLongPoll(httpClient, subscriptionId);
136 } catch (LongPollingFailedException e) {
137 this.handleFailure.accept(e);
141 private void executeLongPoll(BoschHttpClient httpClient, String subscriptionId) {
142 scheduler.execute(() -> this.longPoll(httpClient, subscriptionId));
146 * Start long polling the home controller. Once a long poll resolves, a new one
149 private void longPoll(BoschHttpClient httpClient, String subscriptionId) {
150 logger.debug("Sending long poll request");
152 JsonRpcRequest requestContent = new JsonRpcRequest("2.0", "RE/longPoll", new String[] { subscriptionId, "20" });
153 String url = httpClient.getBoschShcUrl("remote/json-rpc");
154 Request longPollRequest = httpClient.createRequest(url, POST, requestContent);
156 // Long polling responds after 20 seconds with an empty response if no update
157 // has happened. 10 second threshold was added to not time out if response
158 // from controller takes a bit longer than 20 seconds.
159 longPollRequest.timeout(30, TimeUnit.SECONDS);
161 this.request = longPollRequest;
162 LongPolling longPolling = this;
163 longPollRequest.send(new BufferingResponseListener() {
165 public void onComplete(@Nullable Result result) {
166 // NOTE: This handler runs inside the HTTP thread, so we schedule the response
167 // handling in a new thread because the HTTP thread is terminated after the
169 scheduler.execute(() -> longPolling.onLongPollComplete(httpClient, subscriptionId, result,
170 this.getContentAsString()));
176 * This is the handler for responses of long poll requests.
178 * @param httpClient HTTP client which received the response
179 * @param subscriptionId Id of subscription the response is for
180 * @param result Complete result of the response
181 * @param content Content of the response
183 private void onLongPollComplete(BoschHttpClient httpClient, String subscriptionId, @Nullable Result result,
184 @Nullable String content) {
185 // Check if thing is still online
187 logger.debug("Canceling long polling for subscription id {} because it was aborted", subscriptionId);
191 // Check if response was failure or success
192 Throwable failure = result != null ? result.getFailure() : null;
193 if (failure != null) {
194 handleLongPollFailure(subscriptionId, failure);
196 handleLongPollResponse(httpClient, subscriptionId, content);
201 * Attempts to parse and process the long poll response content.
203 * If the response cannot be parsed as {@link LongPollResult}, an attempt is made to parse a {@link LongPollError}.
204 * In case a {@link LongPollError} is present with the code <code>SUBSCRIPTION_INVALID</code>, a re-subscription is
207 * If the response does not contain syntactically valid JSON, a new subscription is attempted with a delay of 15
210 * @param httpClient HTTP client which received the response
211 * @param subscriptionId Id of subscription the response is for
212 * @param content Content of the response
214 private void handleLongPollResponse(BoschHttpClient httpClient, String subscriptionId, @Nullable String content) {
215 logger.debug("Long poll response: {}", content);
218 LongPollResult longPollResult = GsonUtils.DEFAULT_GSON_INSTANCE.fromJson(content, LongPollResult.class);
219 if (longPollResult != null && longPollResult.result != null) {
220 this.handleResult.accept(longPollResult);
222 logger.debug("Long poll response contained no result: {}", content);
224 // Check if we got a proper result from the SHC
225 LongPollError longPollError = GsonUtils.DEFAULT_GSON_INSTANCE.fromJson(content, LongPollError.class);
227 if (longPollError != null && longPollError.error != null) {
228 logger.debug("Got long poll error: {} (code: {})", longPollError.error.message,
229 longPollError.error.code);
231 if (longPollError.error.code == LongPollError.SUBSCRIPTION_INVALID) {
232 logger.debug("Subscription {} became invalid, subscribing again", subscriptionId);
233 this.resubscribe(httpClient);
238 } catch (JsonSyntaxException e) {
239 this.handleFailure.accept(
240 new LongPollingFailedException("Could not deserialize long poll response: '" + content + "'", e));
245 this.longPoll(httpClient, subscriptionId);
248 private void handleLongPollFailure(String subscriptionId, Throwable failure) {
249 if (failure instanceof ExecutionException) {
250 if (failure.getCause() instanceof AbortLongPolling) {
251 logger.debug("Canceling long polling for subscription id {} because it was aborted", subscriptionId);
253 this.handleFailure.accept(
254 new LongPollingFailedException("Unexpected exception during long polling request", failure));
257 this.handleFailure.accept(
258 new LongPollingFailedException("Unexpected exception during long polling request", failure));
262 @SuppressWarnings("serial")
263 private class AbortLongPolling extends BoschSHCException {