2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.hue.internal.connection;
15 import java.io.BufferedReader;
16 import java.io.ByteArrayInputStream;
17 import java.io.Closeable;
18 import java.io.IOException;
19 import java.io.InputStreamReader;
20 import java.io.Reader;
21 import java.net.InetSocketAddress;
22 import java.nio.ByteBuffer;
23 import java.nio.charset.StandardCharsets;
24 import java.time.Duration;
25 import java.time.Instant;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.List;
30 import java.util.Objects;
31 import java.util.Optional;
32 import java.util.Properties;
33 import java.util.concurrent.CompletableFuture;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.Semaphore;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.TimeoutException;
40 import java.util.concurrent.locks.Lock;
41 import java.util.concurrent.locks.ReadWriteLock;
42 import java.util.concurrent.locks.ReentrantReadWriteLock;
43 import java.util.stream.Collectors;
45 import javax.ws.rs.core.MediaType;
47 import org.eclipse.jdt.annotation.NonNullByDefault;
48 import org.eclipse.jdt.annotation.Nullable;
49 import org.eclipse.jetty.client.HttpClient;
50 import org.eclipse.jetty.client.api.ContentResponse;
51 import org.eclipse.jetty.client.api.Request;
52 import org.eclipse.jetty.client.util.StringContentProvider;
53 import org.eclipse.jetty.http.HttpFields;
54 import org.eclipse.jetty.http.HttpHeader;
55 import org.eclipse.jetty.http.HttpMethod;
56 import org.eclipse.jetty.http.HttpStatus;
57 import org.eclipse.jetty.http.HttpURI;
58 import org.eclipse.jetty.http.HttpVersion;
59 import org.eclipse.jetty.http.MetaData;
60 import org.eclipse.jetty.http.MetaData.Response;
61 import org.eclipse.jetty.http2.ErrorCode;
62 import org.eclipse.jetty.http2.api.Session;
63 import org.eclipse.jetty.http2.api.Stream;
64 import org.eclipse.jetty.http2.client.HTTP2Client;
65 import org.eclipse.jetty.http2.frames.DataFrame;
66 import org.eclipse.jetty.http2.frames.GoAwayFrame;
67 import org.eclipse.jetty.http2.frames.HeadersFrame;
68 import org.eclipse.jetty.http2.frames.PingFrame;
69 import org.eclipse.jetty.http2.frames.ResetFrame;
70 import org.eclipse.jetty.util.Callback;
71 import org.eclipse.jetty.util.Promise.Completable;
72 import org.eclipse.jetty.util.ssl.SslContextFactory;
73 import org.openhab.binding.hue.internal.api.dto.clip1.CreateUserRequest;
74 import org.openhab.binding.hue.internal.api.dto.clip1.SuccessResponse;
75 import org.openhab.binding.hue.internal.api.dto.clip2.BridgeConfig;
76 import org.openhab.binding.hue.internal.api.dto.clip2.Event;
77 import org.openhab.binding.hue.internal.api.dto.clip2.Resource;
78 import org.openhab.binding.hue.internal.api.dto.clip2.ResourceReference;
79 import org.openhab.binding.hue.internal.api.dto.clip2.Resources;
80 import org.openhab.binding.hue.internal.api.dto.clip2.enums.ResourceType;
81 import org.openhab.binding.hue.internal.api.serialization.InstantDeserializer;
82 import org.openhab.binding.hue.internal.exceptions.ApiException;
83 import org.openhab.binding.hue.internal.exceptions.HttpUnauthorizedException;
84 import org.openhab.binding.hue.internal.handler.Clip2BridgeHandler;
85 import org.openhab.core.io.net.http.HttpClientFactory;
86 import org.openhab.core.io.net.http.HttpUtil;
87 import org.slf4j.Logger;
88 import org.slf4j.LoggerFactory;
90 import com.google.gson.Gson;
91 import com.google.gson.GsonBuilder;
92 import com.google.gson.JsonArray;
93 import com.google.gson.JsonElement;
94 import com.google.gson.JsonParseException;
95 import com.google.gson.JsonParser;
96 import com.google.gson.JsonSyntaxException;
99 * This class handles HTTP and SSE connections to/from a Hue Bridge running CLIP 2.
101 * It uses the following connection mechanisms:
104 * <li>The primary communication uses HTTP 2 streams over a shared permanent HTTP 2 session.</li>
105 * <li>The 'registerApplicationKey()' method uses HTTP/1.1 over the OH common Jetty client.</li>
106 * <li>The 'isClip2Supported()' static method uses HTTP/1.1 over the OH common Jetty client via 'HttpUtil'.</li>
109 * @author Andrew Fiddian-Green - Initial Contribution
112 public class Clip2Bridge implements Closeable {
115 * Base (abstract) adapter for listening to HTTP 2 stream events.
117 * It implements a CompletableFuture by means of which the caller can wait for the response data to come in. And
118 * which, in the case of fatal errors, gets completed exceptionally.
120 * It handles the following fatal error events by notifying the containing class:
122 * <li>onHeaders() HTTP unauthorized codes</li>
124 private abstract class BaseStreamListenerAdapter<T> extends Stream.Listener.Adapter {
125 protected final CompletableFuture<T> completable = new CompletableFuture<T>();
126 private String contentType = "UNDEFINED";
129 protected T awaitResult() throws ExecutionException, InterruptedException, TimeoutException {
130 return completable.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
134 * Return the HTTP content type.
136 * @return content type e.g. 'application/json'
138 protected String getContentType() {
143 * Return the HTTP status code.
145 * @return status code e.g. 200
147 protected int getStatus() {
152 * Handle an HTTP2 error.
154 * @param error the type of error.
155 * @param session the session on which the error occurred.
157 protected void handleHttp2Error(Http2Error error, Session session) {
158 Http2Exception e = new Http2Exception(error);
159 if (Http2Error.UNAUTHORIZED.equals(error)) {
160 // for external error handling, abstract authorization errors into a separate exception
161 completable.completeExceptionally(new HttpUnauthorizedException("HTTP 2 request not authorized"));
163 completable.completeExceptionally(e);
165 fatalErrorDelayed(this, e, session);
169 * Check the reply headers to see whether the request was authorised.
172 public void onHeaders(@Nullable Stream stream, @Nullable HeadersFrame frame) {
173 Objects.requireNonNull(stream);
174 Objects.requireNonNull(frame);
175 MetaData metaData = frame.getMetaData();
176 if (metaData.isResponse()) {
177 Response responseMetaData = (Response) metaData;
178 contentType = responseMetaData.getFields().get(HttpHeader.CONTENT_TYPE).toLowerCase();
179 status = responseMetaData.getStatus();
181 case HttpStatus.UNAUTHORIZED_401:
182 case HttpStatus.FORBIDDEN_403:
183 handleHttp2Error(Http2Error.UNAUTHORIZED, stream.getSession());
191 * Adapter for listening to regular HTTP 2 GET/PUT request stream events.
193 * It assembles the incoming text data into an HTTP 'content' entity. And when the last data frame arrives, it
194 * returns the full content by completing the CompletableFuture with that data.
196 * In addition to those handled by the parent, it handles the following fatal error events by notifying the
199 * <li>onIdleTimeout()</li>
200 * <li>onTimeout()</li>
202 private class ContentStreamListenerAdapter extends BaseStreamListenerAdapter<String> {
203 private final DataFrameCollector content = new DataFrameCollector();
206 public void onData(@Nullable Stream stream, @Nullable DataFrame frame, @Nullable Callback callback) {
207 Objects.requireNonNull(frame);
208 Objects.requireNonNull(callback);
209 synchronized (this) {
210 content.append(frame.getData());
211 if (frame.isEndStream() && !completable.isDone()) {
212 completable.complete(content.contentAsString().trim());
216 callback.succeeded();
220 public boolean onIdleTimeout(@Nullable Stream stream, @Nullable Throwable x) {
221 Objects.requireNonNull(stream);
222 handleHttp2Error(Http2Error.IDLE, stream.getSession());
227 public void onTimeout(@Nullable Stream stream, @Nullable Throwable x) {
228 Objects.requireNonNull(stream);
229 handleHttp2Error(Http2Error.TIMEOUT, stream.getSession());
234 * Class to collect incoming ByteBuffer data from HTTP 2 Data frames.
236 private static class DataFrameCollector {
237 private byte[] buffer = new byte[512];
238 private int usedSize = 0;
240 public void append(ByteBuffer data) {
241 int dataCapacity = data.capacity();
242 int neededSize = usedSize + dataCapacity;
243 if (neededSize > buffer.length) {
244 int newSize = (dataCapacity < 4096) ? neededSize : Math.max(2 * buffer.length, neededSize);
245 buffer = Arrays.copyOf(buffer, newSize);
247 data.get(buffer, usedSize, dataCapacity);
248 usedSize += dataCapacity;
251 public String contentAsString() {
252 return new String(buffer, 0, usedSize, StandardCharsets.UTF_8);
255 public Reader contentStreamReader() {
256 return new InputStreamReader(new ByteArrayInputStream(buffer, 0, usedSize), StandardCharsets.UTF_8);
259 public void reset() {
265 * Adapter for listening to SSE event stream events.
267 * It receives the incoming text lines. Receipt of the first data line causes the CompletableFuture to complete. It
268 * then parses subsequent data according to the SSE specification. If the line starts with a 'data:' message, it
269 * adds the data to the list of strings. And if the line is empty (i.e. the last line of an event), it passes the
270 * full set of strings to the owner via a call-back method.
272 * The stream must be permanently connected, so it ignores onIdleTimeout() events.
274 * The parent class handles most fatal errors, but since the event stream is supposed to be permanently connected,
275 * the following events are also considered as fatal:
277 * <li>onClosed()</li>
280 private class EventStreamListenerAdapter extends BaseStreamListenerAdapter<Boolean> {
281 private final DataFrameCollector eventData = new DataFrameCollector();
284 public void onClosed(@Nullable Stream stream) {
285 Objects.requireNonNull(stream);
286 handleHttp2Error(Http2Error.CLOSED, stream.getSession());
290 public void onData(@Nullable Stream stream, @Nullable DataFrame frame, @Nullable Callback callback) {
291 Objects.requireNonNull(frame);
292 Objects.requireNonNull(callback);
293 synchronized (this) {
294 eventData.append(frame.getData());
295 BufferedReader reader = new BufferedReader(eventData.contentStreamReader());
296 @SuppressWarnings("null")
297 List<String> receivedLines = reader.lines().collect(Collectors.toList());
299 // a blank line marks the end of an SSE message
300 boolean endOfMessage = !receivedLines.isEmpty()
301 && receivedLines.get(receivedLines.size() - 1).isBlank();
305 // receipt of ANY message means the event stream is established
306 if (!completable.isDone()) {
307 completable.complete(Boolean.TRUE);
309 // append any 'data' field values to the event message
310 StringBuilder eventContent = new StringBuilder();
311 for (String receivedLine : receivedLines) {
312 if (receivedLine.startsWith("data:")) {
313 eventContent.append(receivedLine.substring(5).stripLeading());
316 if (eventContent.length() > 0) {
317 onEventData(eventContent.toString().trim());
321 callback.succeeded();
325 public boolean onIdleTimeout(@Nullable Stream stream, @Nullable Throwable x) {
330 public void onReset(@Nullable Stream stream, @Nullable ResetFrame frame) {
331 Objects.requireNonNull(stream);
332 handleHttp2Error(Http2Error.RESET, stream.getSession());
337 * Enum of potential fatal HTTP 2 session/stream errors.
339 private enum Http2Error {
350 * Private exception for handling HTTP 2 stream and session errors.
352 @SuppressWarnings("serial")
353 private static class Http2Exception extends ApiException {
354 public final Http2Error error;
356 public Http2Exception(Http2Error error) {
360 public Http2Exception(Http2Error error, @Nullable Throwable cause) {
361 super("HTTP 2 stream " + error.toString().toLowerCase(), cause);
367 * Adapter for listening to HTTP 2 session status events.
369 * The session must be permanently connected, so it ignores onIdleTimeout() events.
370 * It also handles the following fatal events by notifying the containing class:
373 * <li>onFailure()</li>
374 * <li>onGoAway()</li>
377 private class SessionListenerAdapter extends Session.Listener.Adapter {
380 public void onClose(@Nullable Session session, @Nullable GoAwayFrame frame) {
381 Objects.requireNonNull(session);
382 fatalErrorDelayed(this, new Http2Exception(Http2Error.CLOSED), session);
386 public void onFailure(@Nullable Session session, @Nullable Throwable failure) {
387 Objects.requireNonNull(session);
388 fatalErrorDelayed(this, new Http2Exception(Http2Error.FAILURE), session);
392 * The Hue bridge uses the 'nginx' web server which sends HTTP2 GO_AWAY frames after a certain number (normally
393 * 999) of GET/PUT calls. This is normal behaviour so we just start a new thread to close and reopen the
397 public void onGoAway(@Nullable Session session, @Nullable GoAwayFrame frame) {
398 Objects.requireNonNull(session);
399 if (http2Session == session) {
400 Thread recreateThread = new Thread(() -> recreateSession());
401 Clip2Bridge.this.recreateThread = recreateThread;
402 recreateThread.start();
407 public boolean onIdleTimeout(@Nullable Session session) {
412 public void onPing(@Nullable Session session, @Nullable PingFrame frame) {
413 Objects.requireNonNull(session);
414 Objects.requireNonNull(frame);
415 if (http2Session == session) {
417 if (!frame.isReply()) {
418 session.ping(new PingFrame(true), Callback.NOOP);
424 public void onReset(@Nullable Session session, @Nullable ResetFrame frame) {
425 Objects.requireNonNull(session);
426 fatalErrorDelayed(this, new Http2Exception(Http2Error.RESET), session);
431 * Synchronizer for accessing the HTTP2 session object. This method wraps the 'sessionUseCreateLock' ReadWriteLock
432 * so that GET/PUT methods can access the session on multiple concurrent threads via the 'read' access lock, yet are
433 * forced to wait if the session is being created via its single thread access 'write' lock.
435 private class SessionSynchronizer implements AutoCloseable {
436 private final Optional<Lock> lockOptional;
438 SessionSynchronizer(boolean requireExclusiveAccess) throws InterruptedException {
439 Lock lock = requireExclusiveAccess ? sessionUseCreateLock.writeLock() : sessionUseCreateLock.readLock();
440 lockOptional = lock.tryLock(TIMEOUT_SECONDS, TimeUnit.SECONDS) ? Optional.of(lock) : Optional.empty();
444 public void close() {
445 lockOptional.ifPresent(lock -> lock.unlock());
450 * Enum showing the online state of the session connection.
458 * Session open for HTTP calls only
462 * Session open for HTTP calls and actively receiving SSE events
468 * Class for throttling HTTP GET and PUT requests to prevent overloading the Hue bridge.
470 * The Hue Bridge can get confused if they receive too many HTTP requests in a short period of time (e.g. on start
471 * up), or if too many HTTP sessions are opened at the same time, which cause it to respond with an HTML error page.
472 * So this class a) waits to acquire permitCount (or no more than MAX_CONCURRENT_SESSIONS) stream permits, and b)
473 * throttles the requests to a maximum of one per REQUEST_INTERVAL_MILLISECS.
475 private class Throttler implements AutoCloseable {
476 private final int permitCount;
479 * @param permitCount indicates how many stream permits to be acquired.
480 * @throws InterruptedException
482 Throttler(int permitCount) throws InterruptedException {
483 this.permitCount = permitCount;
484 streamMutex.acquire(permitCount);
486 synchronized (Clip2Bridge.this) {
487 Instant now = Instant.now();
488 delay = lastRequestTime
489 .map(t -> Math.max(0, Duration.between(now, t).toMillis() + REQUEST_INTERVAL_MILLISECS))
491 lastRequestTime = Optional.of(now.plusMillis(delay));
497 public void close() {
498 streamMutex.release(permitCount);
502 private static final Logger LOGGER = LoggerFactory.getLogger(Clip2Bridge.class);
504 private static final String APPLICATION_ID = "org-openhab-binding-hue-clip2";
505 private static final String APPLICATION_KEY = "hue-application-key";
507 private static final String EVENT_STREAM_ID = "eventStream";
508 private static final String FORMAT_URL_CONFIG = "http://%s/api/0/config";
509 private static final String FORMAT_URL_RESOURCE = "https://%s/clip/v2/resource/";
510 private static final String FORMAT_URL_REGISTER = "http://%s/api";
511 private static final String FORMAT_URL_EVENTS = "https://%s/eventstream/clip/v2";
513 private static final long CLIP2_MINIMUM_VERSION = 1948086000L;
515 public static final int TIMEOUT_SECONDS = 10;
516 private static final int CHECK_ALIVE_SECONDS = 300;
517 private static final int REQUEST_INTERVAL_MILLISECS = 50;
518 private static final int MAX_CONCURRENT_STREAMS = 3;
520 private static final ResourceReference BRIDGE = new ResourceReference().setType(ResourceType.BRIDGE);
523 * Static method to attempt to connect to a Hue Bridge, get its software version, and check if it is high enough to
524 * support the CLIP 2 API.
526 * @param hostName the bridge IP address.
527 * @return true if bridge is online and it supports CLIP 2, or false if it is online and does not support CLIP 2.
528 * @throws IOException if unable to communicate with the bridge.
529 * @throws NumberFormatException if the bridge firmware version is invalid.
531 public static boolean isClip2Supported(String hostName) throws IOException {
533 Properties headers = new Properties();
534 headers.put(HttpHeader.ACCEPT, MediaType.APPLICATION_JSON);
535 response = HttpUtil.executeUrl("GET", String.format(FORMAT_URL_CONFIG, hostName), headers, null, null,
536 TIMEOUT_SECONDS * 1000);
537 BridgeConfig config = new Gson().fromJson(response, BridgeConfig.class);
538 if (Objects.nonNull(config)) {
539 String swVersion = config.swversion;
540 if (Objects.nonNull(swVersion)) {
542 if (Long.parseLong(swVersion) >= CLIP2_MINIMUM_VERSION) {
545 } catch (NumberFormatException e) {
546 LOGGER.debug("isClip2Supported() swVersion '{}' is not a number", swVersion);
553 private final HttpClient httpClient;
554 private final HTTP2Client http2Client;
555 private final String hostName;
556 private final String baseUrl;
557 private final String eventUrl;
558 private final String registrationUrl;
559 private final String applicationKey;
560 private final Clip2BridgeHandler bridgeHandler;
561 private final Gson jsonParser = new GsonBuilder().registerTypeAdapter(Instant.class, new InstantDeserializer())
563 private final Semaphore streamMutex = new Semaphore(MAX_CONCURRENT_STREAMS, true); // i.e. fair
564 private final ReadWriteLock sessionUseCreateLock = new ReentrantReadWriteLock(true); // i.e. fair
565 private final Map<Integer, Future<?>> fatalErrorTasks = new ConcurrentHashMap<>();
567 private boolean recreatingSession;
568 private boolean closing;
569 private State onlineState = State.CLOSED;
570 private Optional<Instant> lastRequestTime = Optional.empty();
571 private Instant sessionExpireTime = Instant.MAX;
573 private @Nullable Session http2Session;
574 private @Nullable Thread recreateThread;
575 private @Nullable Future<?> checkAliveTask;
580 * @param httpClientFactory the OH core HttpClientFactory.
581 * @param bridgeHandler the bridge handler.
582 * @param hostName the host name (ip address) of the Hue bridge
583 * @param applicationKey the application key.
584 * @throws ApiException if unable to open Jetty HTTP/2 client.
586 public Clip2Bridge(HttpClientFactory httpClientFactory, Clip2BridgeHandler bridgeHandler, String hostName,
587 String applicationKey) throws ApiException {
588 LOGGER.debug("Clip2Bridge()");
589 httpClient = httpClientFactory.getCommonHttpClient();
590 http2Client = httpClientFactory.createHttp2Client("hue-clip2", httpClient.getSslContextFactory());
591 http2Client.setConnectTimeout(Clip2Bridge.TIMEOUT_SECONDS * 1000);
592 http2Client.setIdleTimeout(-1);
594 this.bridgeHandler = bridgeHandler;
595 this.hostName = hostName;
596 this.applicationKey = applicationKey;
597 baseUrl = String.format(FORMAT_URL_RESOURCE, hostName);
598 eventUrl = String.format(FORMAT_URL_EVENTS, hostName);
599 registrationUrl = String.format(FORMAT_URL_REGISTER, hostName);
603 * Cancel the given task.
605 * @param cancelTask the task to be cancelled (may be null)
606 * @param mayInterrupt allows cancel() to interrupt the thread.
608 private void cancelTask(@Nullable Future<?> cancelTask, boolean mayInterrupt) {
609 if (Objects.nonNull(cancelTask)) {
610 cancelTask.cancel(mayInterrupt);
615 * Send a ping to the Hue bridge to check that the session is still alive.
617 private void checkAlive() {
618 if (onlineState == State.CLOSED) {
621 LOGGER.debug("checkAlive()");
622 Session session = http2Session;
623 if (Objects.nonNull(session)) {
624 session.ping(new PingFrame(false), Callback.NOOP);
625 if (Instant.now().isAfter(sessionExpireTime)) {
626 fatalError(this, new Http2Exception(Http2Error.TIMEOUT), session.hashCode());
632 * Connection is ok, so reschedule the session check alive expire time. Called in response to incoming ping frames
635 protected void checkAliveOk() {
636 LOGGER.debug("checkAliveOk()");
637 sessionExpireTime = Instant.now().plusSeconds(CHECK_ALIVE_SECONDS * 2);
641 * Close the connection.
644 public void close() {
646 Thread recreateThread = this.recreateThread;
647 if (Objects.nonNull(recreateThread) && recreateThread.isAlive()) {
648 recreateThread.interrupt();
653 } catch (ApiException e) {
658 * Private method to close the connection.
660 private void close2() {
661 synchronized (this) {
662 LOGGER.debug("close2()");
663 boolean notifyHandler = onlineState == State.ACTIVE && !closing && !recreatingSession;
664 onlineState = State.CLOSED;
665 synchronized (fatalErrorTasks) {
666 fatalErrorTasks.values().forEach(task -> cancelTask(task, true));
667 fatalErrorTasks.clear();
669 cancelTask(checkAliveTask, true);
670 checkAliveTask = null;
674 bridgeHandler.onConnectionOffline();
680 * Close the event stream(s) if necessary.
682 private void closeEventStream() {
683 Session session = http2Session;
684 if (Objects.nonNull(session)) {
685 final int sessionId = session.hashCode();
686 session.getStreams().stream().filter(s -> Objects.nonNull(s.getAttribute(EVENT_STREAM_ID)) && !s.isReset())
688 int streamId = s.getId();
689 LOGGER.debug("closeEventStream() sessionId:{}, streamId:{}", sessionId, streamId);
690 s.reset(new ResetFrame(streamId, ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
696 * Close the HTTP 2 session if necessary.
698 private void closeSession() {
699 Session session = http2Session;
700 if (Objects.nonNull(session)) {
701 LOGGER.debug("closeSession() sessionId:{}, openStreamCount:{}", session.hashCode(),
702 session.getStreams().size());
703 session.close(ErrorCode.NO_ERROR.code, "closeSession", Callback.NOOP);
709 * Close the given stream.
711 * @param stream to be closed.
713 private void closeStream(@Nullable Stream stream) {
714 if (Objects.nonNull(stream) && !stream.isReset()) {
715 stream.reset(new ResetFrame(stream.getId(), ErrorCode.NO_ERROR.code), Callback.NOOP);
720 * Method that is called back in case of fatal stream or session events. The error is only processed if the
721 * connection is online, not in process of closing, and the identities of the current session and the session that
722 * caused the error are the same. In other words it ignores errors relating to expired sessions.
724 * @param listener the entity that caused this method to be called.
725 * @param cause the type of exception that caused the error.
726 * @param sessionId the identity of the session on which the error occurred.
728 private synchronized void fatalError(Object listener, Http2Exception cause, int sessionId) {
729 if (onlineState == State.CLOSED || closing) {
732 Session session = http2Session;
733 if (Objects.isNull(session) || session.hashCode() != sessionId) {
736 String listenerId = listener.getClass().getSimpleName();
737 if (listener instanceof ContentStreamListenerAdapter) {
738 // on GET / PUT requests the caller handles errors and closes the stream; the session is still OK
739 LOGGER.debug("fatalError() listener:{}, sessionId:{}, error:{} => ignoring", listenerId, sessionId,
742 if (LOGGER.isDebugEnabled()) {
743 LOGGER.debug("fatalError() listener:{}, sessionId:{}, error:{} => closing", listenerId, sessionId,
746 LOGGER.warn("Fatal error '{}' from '{}' => closing session.", cause.error, listenerId);
753 * Method that is called back in case of fatal stream or session events. Schedules fatalError() to be called after a
754 * delay in order to prevent sequencing issues.
756 * @param listener the entity that caused this method to be called.
757 * @param cause the type of exception that caused the error.
758 * @param session the session on which the error occurred.
760 protected void fatalErrorDelayed(Object listener, Http2Exception cause, Session session) {
761 synchronized (fatalErrorTasks) {
762 final int index = fatalErrorTasks.size();
763 final int sessionId = session.hashCode();
764 fatalErrorTasks.put(index, bridgeHandler.getScheduler().schedule(() -> {
765 fatalError(listener, cause, sessionId);
766 fatalErrorTasks.remove(index);
767 }, 1, TimeUnit.SECONDS));
772 * HTTP GET a Resources object, for a given resource Reference, from the Hue Bridge. The reference is a class
773 * comprising a resource type and an id. If the id is a specific resource id then only the one specific resource
774 * is returned, whereas if it is null then all resources of the given resource type are returned.
776 * It wraps the getResourcesImpl() method in a try/catch block, and transposes any HttpUnAuthorizedException into an
777 * ApiException. Such transposition should never be required in reality since by the time this method is called, the
778 * connection will surely already have been authorised.
780 * @param reference the Reference class to get.
781 * @return a Resource object containing either a list of Resources or a list of Errors.
782 * @throws ApiException if anything fails.
783 * @throws InterruptedException
785 public Resources getResources(ResourceReference reference) throws ApiException, InterruptedException {
786 if (onlineState == State.CLOSED && !recreatingSession) {
787 throw new ApiException("Connection is closed");
789 return getResourcesImpl(reference);
793 * Internal method to send an HTTP 2 GET request to the Hue Bridge and process its response. Uses a Throttler to
794 * prevent too many concurrent calls, and to prevent too frequent calls on the Hue bridge server. Also uses a
795 * SessionSynchronizer to delay accessing the session while it is being recreated.
797 * @param reference the Reference class to get.
798 * @return a Resource object containing either a list of Resources or a list of Errors.
799 * @throws HttpUnauthorizedException if the request was refused as not authorised or forbidden.
800 * @throws ApiException if the communication failed, or an unexpected result occurred.
801 * @throws InterruptedException
803 private Resources getResourcesImpl(ResourceReference reference)
804 throws HttpUnauthorizedException, ApiException, InterruptedException {
805 // work around for issue #15468 (and similar)
806 ResourceType resourceType = reference.getType();
807 if (resourceType == ResourceType.ERROR) {
808 LOGGER.debug("Resource '{}' type '{}' unknown => GET aborted", reference.getId(), resourceType);
809 return new Resources();
811 Stream stream = null;
812 try (Throttler throttler = new Throttler(1);
813 SessionSynchronizer sessionSynchronizer = new SessionSynchronizer(false)) {
814 Session session = getSession();
815 String url = getUrl(reference);
816 LOGGER.trace("GET {} HTTP/2", url);
817 HeadersFrame headers = prepareHeaders(url, MediaType.APPLICATION_JSON);
818 Completable<@Nullable Stream> streamPromise = new Completable<>();
819 ContentStreamListenerAdapter contentStreamListener = new ContentStreamListenerAdapter();
820 session.newStream(headers, streamPromise, contentStreamListener);
821 // wait for stream to be opened
822 stream = Objects.requireNonNull(streamPromise.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
823 // wait for HTTP response contents
824 String contentJson = contentStreamListener.awaitResult();
825 String contentType = contentStreamListener.getContentType();
826 int status = contentStreamListener.getStatus();
827 LOGGER.trace("HTTP/2 {} (Content-Type: {}) << {}", status, contentType, contentJson);
828 if (status != HttpStatus.OK_200) {
829 throw new ApiException(String.format("Unexpected HTTP status '%d'", status));
831 if (!MediaType.APPLICATION_JSON.equals(contentType)) {
832 throw new ApiException("Unexpected Content-Type: " + contentType);
835 Resources resources = Objects.requireNonNull(jsonParser.fromJson(contentJson, Resources.class));
836 if (LOGGER.isDebugEnabled()) {
837 resources.getErrors().forEach(error -> LOGGER.debug("Resources error:{}", error));
840 } catch (JsonParseException e) {
841 throw new ApiException("Parsing error", e);
843 } catch (ExecutionException e) {
844 Throwable cause = e.getCause();
845 if (cause instanceof HttpUnauthorizedException) {
846 throw (HttpUnauthorizedException) cause;
848 throw new ApiException("Error sending request", e);
849 } catch (TimeoutException e) {
850 throw new ApiException("Error sending request", e);
857 * Safe access to the session object.
859 * @return the session.
860 * @throws ApiException if session is null or closed.
862 private Session getSession() throws ApiException {
863 Session session = http2Session;
864 if (Objects.isNull(session) || session.isClosed()) {
865 throw new ApiException("HTTP/2 session is null or closed");
871 * Build a full path to a server end point, based on a Reference class instance. If the reference contains only
872 * a resource type, the method returns the end point url to get all resources of the given resource type. Whereas if
873 * it also contains an id, the method returns the end point url to get the specific single resource with that type
876 * @param reference a Reference class instance.
877 * @return the complete end point url.
879 private String getUrl(ResourceReference reference) {
880 String url = baseUrl + reference.getType().name().toLowerCase();
881 String id = reference.getId();
882 return Objects.isNull(id) || id.isEmpty() ? url : url + "/" + id;
886 * The event stream calls this method when it has received text data. It parses the text as JSON into a list of
887 * Event entries, converts the list of events to a list of resources, and forwards that list to the bridge
890 * @param data the incoming (presumed to be JSON) text.
892 protected void onEventData(String data) {
893 if (onlineState != State.ACTIVE && !recreatingSession) {
896 if (LOGGER.isTraceEnabled()) {
897 LOGGER.trace("onEventData() data:{}", data);
899 LOGGER.debug("onEventData() data length:{}", data.length());
901 JsonElement jsonElement;
903 jsonElement = JsonParser.parseString(data);
904 } catch (JsonSyntaxException e) {
905 LOGGER.debug("onEventData() invalid data '{}'", data, e);
908 if (!(jsonElement instanceof JsonArray)) {
909 LOGGER.debug("onEventData() data is not a JsonArray {}", data);
914 events = jsonParser.fromJson(jsonElement, Event.EVENT_LIST_TYPE);
915 } catch (JsonParseException e) {
916 LOGGER.debug("onEventData() parsing error json:{}", data, e);
919 if (Objects.isNull(events) || events.isEmpty()) {
920 LOGGER.debug("onEventData() event list is null or empty");
923 List<Resource> resources = new ArrayList<>();
924 events.forEach(event -> resources.addAll(event.getData()));
925 if (resources.isEmpty()) {
926 LOGGER.debug("onEventData() resource list is empty");
929 resources.forEach(resource -> resource.markAsSparse());
930 bridgeHandler.onResourcesEvent(resources);
934 * Open the HTTP 2 session and the event stream.
936 * @throws ApiException if there was a communication error.
937 * @throws InterruptedException
939 public void open() throws ApiException, InterruptedException {
940 LOGGER.debug("open()");
943 bridgeHandler.onConnectionOnline();
947 * Make the session active, by opening an HTTP 2 SSE event stream (if necessary).
949 * @throws ApiException if an error was encountered.
950 * @throws InterruptedException
952 private void openActive() throws ApiException, InterruptedException {
953 synchronized (this) {
955 onlineState = State.ACTIVE;
960 * Open the check alive task if necessary.
962 private void openCheckAliveTask() {
963 Future<?> task = checkAliveTask;
964 if (Objects.isNull(task) || task.isCancelled() || task.isDone()) {
965 LOGGER.debug("openCheckAliveTask()");
966 cancelTask(checkAliveTask, false);
967 checkAliveTask = bridgeHandler.getScheduler().scheduleWithFixedDelay(() -> checkAlive(),
968 CHECK_ALIVE_SECONDS, CHECK_ALIVE_SECONDS, TimeUnit.SECONDS);
973 * Implementation to open an HTTP 2 SSE event stream if necessary.
975 * @throws ApiException if an error was encountered.
976 * @throws InterruptedException
978 private void openEventStream() throws ApiException, InterruptedException {
979 Session session = getSession();
980 if (session.getStreams().stream().anyMatch(stream -> Objects.nonNull(stream.getAttribute(EVENT_STREAM_ID)))) {
983 LOGGER.trace("GET {} HTTP/2", eventUrl);
984 Stream stream = null;
986 HeadersFrame headers = prepareHeaders(eventUrl, MediaType.SERVER_SENT_EVENTS);
987 Completable<@Nullable Stream> streamPromise = new Completable<>();
988 EventStreamListenerAdapter eventStreamListener = new EventStreamListenerAdapter();
989 session.newStream(headers, streamPromise, eventStreamListener);
990 // wait for stream to be opened
991 stream = Objects.requireNonNull(streamPromise.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
992 stream.setIdleTimeout(0);
993 stream.setAttribute(EVENT_STREAM_ID, session);
994 // wait for "hi" from the bridge
995 eventStreamListener.awaitResult();
996 LOGGER.debug("openEventStream() sessionId:{} streamId:{}", session.hashCode(), stream.getId());
997 } catch (ExecutionException | TimeoutException e) {
998 if (Objects.nonNull(stream) && !stream.isReset()) {
999 stream.reset(new ResetFrame(stream.getId(), ErrorCode.HTTP_CONNECT_ERROR.code), Callback.NOOP);
1001 throw new ApiException("Error opening event stream", e);
1006 * Private method to open the HTTP 2 session in passive mode.
1008 * @throws ApiException if there was a communication error.
1009 * @throws InterruptedException
1011 private void openPassive() throws ApiException, InterruptedException {
1012 synchronized (this) {
1013 LOGGER.debug("openPassive()");
1014 onlineState = State.CLOSED;
1016 openCheckAliveTask();
1017 onlineState = State.PASSIVE;
1022 * Open the HTTP 2 session if necessary.
1024 * @throws ApiException if it was not possible to create and connect the session.
1025 * @throws InterruptedException
1027 private void openSession() throws ApiException, InterruptedException {
1028 Session session = http2Session;
1029 if (Objects.nonNull(session) && !session.isClosed()) {
1033 InetSocketAddress address = new InetSocketAddress(hostName, 443);
1034 SessionListenerAdapter sessionListener = new SessionListenerAdapter();
1035 Completable<@Nullable Session> sessionPromise = new Completable<>();
1036 http2Client.connect(http2Client.getBean(SslContextFactory.class), address, sessionListener, sessionPromise);
1037 // wait for the (SSL) session to be opened
1038 session = Objects.requireNonNull(sessionPromise.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
1039 LOGGER.debug("openSession() sessionId:{}", session.hashCode());
1040 http2Session = session;
1041 checkAliveOk(); // initialise the session timeout window
1042 } catch (ExecutionException | TimeoutException e) {
1043 throw new ApiException("Error opening HTTP/2 session", e);
1048 * Helper class to create a HeadersFrame for a standard HTTP GET request.
1050 * @param url the server url.
1051 * @param acceptContentType the accepted content type for the response.
1052 * @return the HeadersFrame.
1054 private HeadersFrame prepareHeaders(String url, String acceptContentType) {
1055 return prepareHeaders(url, acceptContentType, "GET", -1, null);
1059 * Helper class to create a HeadersFrame for a more exotic HTTP request.
1061 * @param url the server url.
1062 * @param acceptContentType the accepted content type for the response.
1063 * @param method the HTTP request method.
1064 * @param contentLength the length of the content e.g. for a PUT call.
1065 * @param contentType the respective content type.
1066 * @return the HeadersFrame.
1068 private HeadersFrame prepareHeaders(String url, String acceptContentType, String method, long contentLength,
1069 @Nullable String contentType) {
1070 HttpFields fields = new HttpFields();
1071 fields.put(HttpHeader.ACCEPT, acceptContentType);
1072 if (contentType != null) {
1073 fields.put(HttpHeader.CONTENT_TYPE, contentType);
1075 if (contentLength >= 0) {
1076 fields.putLongField(HttpHeader.CONTENT_LENGTH, contentLength);
1078 fields.put(APPLICATION_KEY, applicationKey);
1079 return new HeadersFrame(new MetaData.Request(method, new HttpURI(url), HttpVersion.HTTP_2, fields), null,
1080 contentLength <= 0);
1084 * Use an HTTP/2 PUT command to send a resource to the server. Uses a Throttler to prevent too many concurrent
1085 * calls, and to prevent too frequent calls on the Hue bridge server. Also uses a SessionSynchronizer to delay
1086 * accessing the session while it is being recreated.
1088 * @param resource the resource to put.
1089 * @return the resource, which may contain errors.
1090 * @throws ApiException if something fails.
1091 * @throws InterruptedException
1093 public Resources putResource(Resource resource) throws ApiException, InterruptedException {
1094 Stream stream = null;
1095 try (Throttler throttler = new Throttler(MAX_CONCURRENT_STREAMS);
1096 SessionSynchronizer sessionSynchronizer = new SessionSynchronizer(false)) {
1097 Session session = getSession();
1098 String requestJson = jsonParser.toJson(resource);
1099 ByteBuffer requestBytes = ByteBuffer.wrap(requestJson.getBytes(StandardCharsets.UTF_8));
1100 String url = getUrl(new ResourceReference().setId(resource.getId()).setType(resource.getType()));
1101 HeadersFrame headers = prepareHeaders(url, MediaType.APPLICATION_JSON, "PUT", requestBytes.capacity(),
1102 MediaType.APPLICATION_JSON);
1103 LOGGER.trace("PUT {} HTTP/2 >> {}", url, requestJson);
1104 Completable<@Nullable Stream> streamPromise = new Completable<>();
1105 ContentStreamListenerAdapter contentStreamListener = new ContentStreamListenerAdapter();
1106 session.newStream(headers, streamPromise, contentStreamListener);
1107 // wait for stream to be opened
1108 stream = Objects.requireNonNull(streamPromise.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
1109 stream.data(new DataFrame(stream.getId(), requestBytes, true), Callback.NOOP);
1110 // wait for HTTP response
1111 String contentJson = contentStreamListener.awaitResult();
1112 String contentType = contentStreamListener.getContentType();
1113 int status = contentStreamListener.getStatus();
1114 LOGGER.trace("HTTP/2 {} (Content-Type: {}) << {}", status, contentType, contentJson);
1115 if (!HttpStatus.isSuccess(status)) {
1116 throw new ApiException(String.format("Unexpected HTTP status '%d'", status));
1118 if (!MediaType.APPLICATION_JSON.equals(contentType)) {
1119 throw new ApiException("Unexpected Content-Type: " + contentType);
1121 if (contentJson.isEmpty()) {
1122 throw new ApiException("Response payload is empty");
1125 return Objects.requireNonNull(jsonParser.fromJson(contentJson, Resources.class));
1126 } catch (JsonParseException e) {
1127 LOGGER.debug("putResource() parsing error json:{}", contentJson, e);
1128 throw new ApiException("Parsing error", e);
1130 } catch (ExecutionException | TimeoutException e) {
1131 throw new ApiException("Error sending PUT request", e);
1133 closeStream(stream);
1138 * Close and re-open the session. Called when the server sends a GO_AWAY message. Acquires a SessionSynchronizer
1139 * 'write' lock to ensure single thread access while the new session is being created. Therefore it waits for any
1140 * already running GET/PUT method calls, which have a 'read' lock, to complete. And also causes any new GET/PUT
1141 * method calls to wait until this method releases the 'write' lock again. Whereby such GET/PUT calls are postponed
1142 * to the new session.
1144 private synchronized void recreateSession() {
1145 try (SessionSynchronizer sessionSynchronizer = new SessionSynchronizer(true)) {
1146 LOGGER.debug("recreateSession()");
1147 recreatingSession = true;
1148 State onlineState = this.onlineState;
1154 if (onlineState == State.ACTIVE) {
1157 } catch (ApiException | InterruptedException e) {
1158 if (LOGGER.isDebugEnabled()) {
1159 LOGGER.debug("recreateSession() exception", e);
1161 LOGGER.warn("recreateSession() {}: {}", e.getClass().getSimpleName(), e.getMessage());
1164 recreatingSession = false;
1165 LOGGER.debug("recreateSession() done");
1170 * Try to register the application key with the hub. Use the given application key if one is provided; otherwise the
1171 * hub will create a new one. Note: this requires an HTTP 1.1 client call.
1173 * @param oldApplicationKey existing application key if any i.e. may be empty.
1174 * @return the existing or a newly created application key.
1175 * @throws HttpUnauthorizedException if the registration failed.
1176 * @throws ApiException if there was a communications error.
1177 * @throws InterruptedException
1179 public String registerApplicationKey(@Nullable String oldApplicationKey)
1180 throws HttpUnauthorizedException, ApiException, InterruptedException {
1181 LOGGER.debug("registerApplicationKey()");
1182 String json = jsonParser.toJson((Objects.isNull(oldApplicationKey) || oldApplicationKey.isEmpty())
1183 ? new CreateUserRequest(APPLICATION_ID)
1184 : new CreateUserRequest(oldApplicationKey, APPLICATION_ID));
1185 Request httpRequest = httpClient.newRequest(registrationUrl).method(HttpMethod.POST)
1186 .timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1187 .content(new StringContentProvider(json), MediaType.APPLICATION_JSON);
1188 ContentResponse contentResponse;
1190 LOGGER.trace("POST {} HTTP/1.1 >> {}", registrationUrl, json);
1191 contentResponse = httpRequest.send();
1192 } catch (TimeoutException | ExecutionException e) {
1193 throw new ApiException("HTTP processing error", e);
1195 int httpStatus = contentResponse.getStatus();
1196 json = contentResponse.getContentAsString().trim();
1197 LOGGER.trace("HTTP/1.1 {} {} << {}", httpStatus, contentResponse.getReason(), json);
1198 if (httpStatus != HttpStatus.OK_200) {
1199 throw new ApiException(String.format("HTTP bad response '%d'", httpStatus));
1202 List<SuccessResponse> entries = jsonParser.fromJson(json, SuccessResponse.GSON_TYPE);
1203 if (Objects.nonNull(entries) && !entries.isEmpty()) {
1204 SuccessResponse response = entries.get(0);
1205 Map<String, Object> responseSuccess = response.success;
1206 if (Objects.nonNull(responseSuccess)) {
1207 String newApplicationKey = (String) responseSuccess.get("username");
1208 if (Objects.nonNull(newApplicationKey)) {
1209 return newApplicationKey;
1213 } catch (JsonParseException e) {
1214 LOGGER.debug("registerApplicationKey() parsing error json:{}", json, e);
1216 throw new HttpUnauthorizedException("Application key registration failed");
1219 private void startHttp2Client() throws ApiException {
1221 http2Client.start();
1222 } catch (Exception e) {
1223 throw new ApiException("Error starting HTTP/2 client", e);
1227 private void stopHttp2Client() throws ApiException {
1230 } catch (Exception e) {
1231 throw new ApiException("Error stopping HTTP/2 client", e);
1236 * Test the Hue Bridge connection state by attempting to connect and trying to execute a basic command that requires
1239 * @throws HttpUnauthorizedException if it was possible to connect but not to authenticate.
1240 * @throws ApiException if it was not possible to connect.
1241 * @throws InterruptedException
1243 public void testConnectionState() throws HttpUnauthorizedException, ApiException, InterruptedException {
1244 LOGGER.debug("testConnectionState()");
1247 getResourcesImpl(BRIDGE);
1248 } catch (ApiException e) {