]> git.basschouten.com Git - openhab-addons.git/blob
c1b5a68041466ee1ec314d0328e4c2bbcbdb065b
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.hue.internal.connection;
14
15 import java.io.BufferedReader;
16 import java.io.ByteArrayInputStream;
17 import java.io.Closeable;
18 import java.io.IOException;
19 import java.io.InputStreamReader;
20 import java.io.Reader;
21 import java.net.InetSocketAddress;
22 import java.nio.ByteBuffer;
23 import java.nio.charset.StandardCharsets;
24 import java.time.Duration;
25 import java.time.Instant;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Objects;
31 import java.util.Optional;
32 import java.util.Properties;
33 import java.util.concurrent.CompletableFuture;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.Semaphore;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.TimeoutException;
40 import java.util.stream.Collectors;
41
42 import javax.ws.rs.core.MediaType;
43
44 import org.eclipse.jdt.annotation.NonNullByDefault;
45 import org.eclipse.jdt.annotation.Nullable;
46 import org.eclipse.jetty.client.HttpClient;
47 import org.eclipse.jetty.client.api.ContentResponse;
48 import org.eclipse.jetty.client.api.Request;
49 import org.eclipse.jetty.client.util.StringContentProvider;
50 import org.eclipse.jetty.http.HttpFields;
51 import org.eclipse.jetty.http.HttpHeader;
52 import org.eclipse.jetty.http.HttpMethod;
53 import org.eclipse.jetty.http.HttpStatus;
54 import org.eclipse.jetty.http.HttpURI;
55 import org.eclipse.jetty.http.HttpVersion;
56 import org.eclipse.jetty.http.MetaData;
57 import org.eclipse.jetty.http.MetaData.Response;
58 import org.eclipse.jetty.http2.api.Session;
59 import org.eclipse.jetty.http2.api.Stream;
60 import org.eclipse.jetty.http2.client.HTTP2Client;
61 import org.eclipse.jetty.http2.frames.DataFrame;
62 import org.eclipse.jetty.http2.frames.GoAwayFrame;
63 import org.eclipse.jetty.http2.frames.HeadersFrame;
64 import org.eclipse.jetty.http2.frames.PingFrame;
65 import org.eclipse.jetty.http2.frames.ResetFrame;
66 import org.eclipse.jetty.util.Callback;
67 import org.eclipse.jetty.util.Promise.Completable;
68 import org.eclipse.jetty.util.ssl.SslContextFactory;
69 import org.openhab.binding.hue.internal.dto.CreateUserRequest;
70 import org.openhab.binding.hue.internal.dto.SuccessResponse;
71 import org.openhab.binding.hue.internal.dto.clip2.BridgeConfig;
72 import org.openhab.binding.hue.internal.dto.clip2.Event;
73 import org.openhab.binding.hue.internal.dto.clip2.Resource;
74 import org.openhab.binding.hue.internal.dto.clip2.ResourceReference;
75 import org.openhab.binding.hue.internal.dto.clip2.Resources;
76 import org.openhab.binding.hue.internal.dto.clip2.enums.ResourceType;
77 import org.openhab.binding.hue.internal.exceptions.ApiException;
78 import org.openhab.binding.hue.internal.exceptions.HttpUnauthorizedException;
79 import org.openhab.binding.hue.internal.handler.Clip2BridgeHandler;
80 import org.openhab.core.io.net.http.HttpClientFactory;
81 import org.openhab.core.io.net.http.HttpUtil;
82 import org.slf4j.Logger;
83 import org.slf4j.LoggerFactory;
84
85 import com.google.gson.Gson;
86 import com.google.gson.JsonArray;
87 import com.google.gson.JsonElement;
88 import com.google.gson.JsonParseException;
89 import com.google.gson.JsonParser;
90 import com.google.gson.JsonSyntaxException;
91
92 /**
93  * This class handles HTTP and SSE connections to/from a Hue Bridge running CLIP 2.
94  *
95  * It uses the following connection mechanisms:
96  *
97  * <li>The primary communication uses HTTP 2 streams over a shared permanent HTTP 2 session.</li>
98  * <li>The 'registerApplicationKey()' method uses HTTP/1.1 over the OH common Jetty client.</li>
99  * <li>The 'isClip2Supported()' static method uses HTTP/1.1 over the OH common Jetty client via 'HttpUtil'.</li>
100  *
101  * @author Andrew Fiddian-Green - Initial Contribution
102  */
103 @NonNullByDefault
104 public class Clip2Bridge implements Closeable {
105
106     /**
107      * Base (abstract) adapter for listening to HTTP 2 stream events.
108      *
109      * It implements a CompletableFuture by means of which the caller can wait for the response data to come in. And
110      * which, in the case of fatal errors, gets completed exceptionally.
111      *
112      * It handles the following fatal error events by notifying the containing class:
113      *
114      * <li>onHeaders() HTTP unauthorized codes</li>
115      */
116     private abstract class BaseStreamListenerAdapter<T> extends Stream.Listener.Adapter {
117         protected final CompletableFuture<T> completable = new CompletableFuture<T>();
118         private String contentType = "UNDEFINED";
119
120         protected T awaitResult() throws ExecutionException, InterruptedException, TimeoutException {
121             return completable.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
122         }
123
124         /**
125          * Return the HTTP content type.
126          *
127          * @return content type e.g. 'application/json'
128          */
129         protected String getContentType() {
130             return contentType;
131         }
132
133         protected void handleHttp2Error(Http2Error error) {
134             Http2Exception e = new Http2Exception(error);
135             if (Http2Error.UNAUTHORIZED.equals(error)) {
136                 // for external error handling, abstract authorization errors into a separate exception
137                 completable.completeExceptionally(new HttpUnauthorizedException("HTTP 2 request not authorized"));
138             } else {
139                 completable.completeExceptionally(e);
140             }
141             fatalErrorDelayed(this, e);
142         }
143
144         /**
145          * Check the reply headers to see whether the request was authorised.
146          */
147         @Override
148         public void onHeaders(@Nullable Stream stream, @Nullable HeadersFrame frame) {
149             Objects.requireNonNull(frame);
150             MetaData metaData = frame.getMetaData();
151             if (metaData.isResponse()) {
152                 Response responseMetaData = (Response) metaData;
153                 int httpStatus = responseMetaData.getStatus();
154                 switch (httpStatus) {
155                     case HttpStatus.UNAUTHORIZED_401:
156                     case HttpStatus.FORBIDDEN_403:
157                         handleHttp2Error(Http2Error.UNAUTHORIZED);
158                     default:
159                 }
160                 contentType = responseMetaData.getFields().get(HttpHeader.CONTENT_TYPE).toLowerCase();
161             }
162         }
163     }
164
165     /**
166      * Adapter for listening to regular HTTP 2 GET/PUT request stream events.
167      *
168      * It assembles the incoming text data into an HTTP 'content' entity. And when the last data frame arrives, it
169      * returns the full content by completing the CompletableFuture with that data.
170      *
171      * In addition to those handled by the parent, it handles the following fatal error events by notifying the
172      * containing class:
173      *
174      * <li>onIdleTimeout()</li>
175      * <li>onTimeout()</li>
176      */
177     private class ContentStreamListenerAdapter extends BaseStreamListenerAdapter<String> {
178         private final DataFrameCollector content = new DataFrameCollector();
179
180         @Override
181         public void onData(@Nullable Stream stream, @Nullable DataFrame frame, @Nullable Callback callback) {
182             Objects.requireNonNull(frame);
183             Objects.requireNonNull(callback);
184             synchronized (this) {
185                 content.append(frame.getData());
186                 if (frame.isEndStream() && !completable.isDone()) {
187                     completable.complete(content.contentAsString().trim());
188                     content.reset();
189                 }
190             }
191             callback.succeeded();
192         }
193
194         @Override
195         public boolean onIdleTimeout(@Nullable Stream stream, @Nullable Throwable x) {
196             handleHttp2Error(Http2Error.IDLE);
197             return true;
198         }
199
200         @Override
201         public void onTimeout(@Nullable Stream stream, @Nullable Throwable x) {
202             handleHttp2Error(Http2Error.TIMEOUT);
203         }
204     }
205
206     /**
207      * Class to collect incoming ByteBuffer data from HTTP 2 Data frames.
208      */
209     private static class DataFrameCollector {
210         private byte[] buffer = new byte[512];
211         private int usedSize = 0;
212
213         public void append(ByteBuffer data) {
214             int dataCapacity = data.capacity();
215             int neededSize = usedSize + dataCapacity;
216             if (neededSize > buffer.length) {
217                 int newSize = (dataCapacity < 4096) ? neededSize : Math.max(2 * buffer.length, neededSize);
218                 buffer = Arrays.copyOf(buffer, newSize);
219             }
220             data.get(buffer, usedSize, dataCapacity);
221             usedSize += dataCapacity;
222         }
223
224         public String contentAsString() {
225             return new String(buffer, 0, usedSize, StandardCharsets.UTF_8);
226         }
227
228         public Reader contentStreamReader() {
229             return new InputStreamReader(new ByteArrayInputStream(buffer, 0, usedSize), StandardCharsets.UTF_8);
230         }
231
232         public void reset() {
233             usedSize = 0;
234         }
235     }
236
237     /**
238      * Adapter for listening to SSE event stream events.
239      *
240      * It receives the incoming text lines. Receipt of the first data line causes the CompletableFuture to complete. It
241      * then parses subsequent data according to the SSE specification. If the line starts with a 'data:' message, it
242      * adds the data to the list of strings. And if the line is empty (i.e. the last line of an event), it passes the
243      * full set of strings to the owner via a call-back method.
244      *
245      * The stream must be permanently connected, so it ignores onIdleTimeout() events.
246      *
247      * The parent class handles most fatal errors, but since the event stream is supposed to be permanently connected,
248      * the following events are also considered as fatal:
249      *
250      * <li>onClosed()</li>
251      * <li>onReset()</li>
252      */
253     private class EventStreamListenerAdapter extends BaseStreamListenerAdapter<Boolean> {
254         private final DataFrameCollector eventData = new DataFrameCollector();
255
256         @Override
257         public void onClosed(@Nullable Stream stream) {
258             handleHttp2Error(Http2Error.CLOSED);
259         }
260
261         @Override
262         public void onData(@Nullable Stream stream, @Nullable DataFrame frame, @Nullable Callback callback) {
263             Objects.requireNonNull(frame);
264             Objects.requireNonNull(callback);
265             synchronized (this) {
266                 eventData.append(frame.getData());
267                 BufferedReader reader = new BufferedReader(eventData.contentStreamReader());
268                 @SuppressWarnings("null")
269                 List<String> receivedLines = reader.lines().collect(Collectors.toList());
270
271                 // a blank line marks the end of an SSE message
272                 boolean endOfMessage = !receivedLines.isEmpty()
273                         && receivedLines.get(receivedLines.size() - 1).isBlank();
274
275                 if (endOfMessage) {
276                     eventData.reset();
277                     // receipt of ANY message means the event stream is established
278                     if (!completable.isDone()) {
279                         completable.complete(Boolean.TRUE);
280                     }
281                     // append any 'data' field values to the event message
282                     StringBuilder eventContent = new StringBuilder();
283                     for (String receivedLine : receivedLines) {
284                         if (receivedLine.startsWith("data:")) {
285                             eventContent.append(receivedLine.substring(5).stripLeading());
286                         }
287                     }
288                     if (eventContent.length() > 0) {
289                         onEventData(eventContent.toString().trim());
290                     }
291                 }
292             }
293             callback.succeeded();
294         }
295
296         @Override
297         public boolean onIdleTimeout(@Nullable Stream stream, @Nullable Throwable x) {
298             return false;
299         }
300
301         @Override
302         public void onReset(@Nullable Stream stream, @Nullable ResetFrame frame) {
303             handleHttp2Error(Http2Error.RESET);
304         }
305     }
306
307     /**
308      * Enum of potential fatal HTTP 2 session/stream errors.
309      */
310     private enum Http2Error {
311         CLOSED,
312         FAILURE,
313         TIMEOUT,
314         RESET,
315         IDLE,
316         GO_AWAY,
317         UNAUTHORIZED;
318     }
319
320     /**
321      * Private exception for handling HTTP 2 stream and session errors.
322      */
323     @SuppressWarnings("serial")
324     private static class Http2Exception extends ApiException {
325         public final Http2Error error;
326
327         public Http2Exception(Http2Error error) {
328             this(error, null);
329         }
330
331         public Http2Exception(Http2Error error, @Nullable Throwable cause) {
332             super("HTTP 2 stream " + error.toString().toLowerCase(), cause);
333             this.error = error;
334         }
335     }
336
337     /**
338      * Adapter for listening to HTTP 2 session status events.
339      *
340      * The session must be permanently connected, so it ignores onIdleTimeout() events.
341      * It also handles the following fatal events by notifying the containing class:
342      *
343      * <li>onClose()</li>
344      * <li>onFailure()</li>
345      * <li>onGoAway()</li>
346      * <li>onReset()</li>
347      */
348     private class SessionListenerAdapter extends Session.Listener.Adapter {
349
350         @Override
351         public void onClose(@Nullable Session session, @Nullable GoAwayFrame frame) {
352             fatalErrorDelayed(this, new Http2Exception(Http2Error.CLOSED));
353         }
354
355         @Override
356         public void onFailure(@Nullable Session session, @Nullable Throwable failure) {
357             fatalErrorDelayed(this, new Http2Exception(Http2Error.FAILURE));
358         }
359
360         @Override
361         public void onGoAway(@Nullable Session session, @Nullable GoAwayFrame frame) {
362             fatalErrorDelayed(this, new Http2Exception(Http2Error.GO_AWAY));
363         }
364
365         @Override
366         public boolean onIdleTimeout(@Nullable Session session) {
367             return false;
368         }
369
370         @Override
371         public void onPing(@Nullable Session session, @Nullable PingFrame frame) {
372             checkAliveOk();
373             if (Objects.nonNull(session) && Objects.nonNull(frame) && !frame.isReply()) {
374                 session.ping(new PingFrame(true), Callback.NOOP);
375             }
376         }
377
378         @Override
379         public void onReset(@Nullable Session session, @Nullable ResetFrame frame) {
380             fatalErrorDelayed(this, new Http2Exception(Http2Error.RESET));
381         }
382     }
383
384     /**
385      * Enum showing the online state of the session connection.
386      */
387     private static enum State {
388         /**
389          * Session closed
390          */
391         CLOSED,
392         /**
393          * Session open for HTTP calls only
394          */
395         PASSIVE,
396         /**
397          * Session open for HTTP calls and actively receiving SSE events
398          */
399         ACTIVE;
400     }
401
402     private static final Logger LOGGER = LoggerFactory.getLogger(Clip2Bridge.class);
403
404     private static final String APPLICATION_ID = "org-openhab-binding-hue-clip2";
405     private static final String APPLICATION_KEY = "hue-application-key";
406
407     private static final String EVENT_STREAM_ID = "eventStream";
408     private static final String FORMAT_URL_CONFIG = "http://%s/api/0/config";
409     private static final String FORMAT_URL_RESOURCE = "https://%s/clip/v2/resource/";
410     private static final String FORMAT_URL_REGISTER = "http://%s/api";
411     private static final String FORMAT_URL_EVENTS = "https://%s/eventstream/clip/v2";
412
413     private static final long CLIP2_MINIMUM_VERSION = 1948086000L;
414
415     public static final int TIMEOUT_SECONDS = 10;
416     private static final int CHECK_ALIVE_SECONDS = 300;
417     private static final int REQUEST_INTERVAL_MILLISECS = 50;
418     private static final int MAX_CONCURRENT_STREAMS = 3;
419     private static final int RESTART_AFTER_SECONDS = 5;
420
421     private static final ResourceReference BRIDGE = new ResourceReference().setType(ResourceType.BRIDGE);
422
423     /**
424      * Static method to attempt to connect to a Hue Bridge, get its software version, and check if it is high enough to
425      * support the CLIP 2 API.
426      *
427      * @param hostName the bridge IP address.
428      * @return true if bridge is online and it supports CLIP 2, or false if it is online and does not support CLIP 2.
429      * @throws IOException if unable to communicate with the bridge.
430      * @throws NumberFormatException if the bridge firmware version is invalid.
431      */
432     public static boolean isClip2Supported(String hostName) throws IOException {
433         String response;
434         Properties headers = new Properties();
435         headers.put(HttpHeader.ACCEPT, MediaType.APPLICATION_JSON);
436         response = HttpUtil.executeUrl("GET", String.format(FORMAT_URL_CONFIG, hostName), headers, null, null,
437                 TIMEOUT_SECONDS * 1000);
438         BridgeConfig config = new Gson().fromJson(response, BridgeConfig.class);
439         if (Objects.nonNull(config)) {
440             String swVersion = config.swversion;
441             if (Objects.nonNull(swVersion)) {
442                 try {
443                     if (Long.parseLong(swVersion) >= CLIP2_MINIMUM_VERSION) {
444                         return true;
445                     }
446                 } catch (NumberFormatException e) {
447                     LOGGER.debug("isClip2Supported() swVersion '{}' is not a number", swVersion);
448                 }
449             }
450         }
451         return false;
452     }
453
454     private final HttpClient httpClient;
455     private final HTTP2Client http2Client;
456     private final String hostName;
457     private final String baseUrl;
458     private final String eventUrl;
459     private final String registrationUrl;
460     private final String applicationKey;
461     private final Clip2BridgeHandler bridgeHandler;
462     private final Gson jsonParser = new Gson();
463     private final Semaphore streamMutex = new Semaphore(MAX_CONCURRENT_STREAMS, true);
464
465     private boolean closing;
466     private boolean internalRestartScheduled;
467     private boolean externalRestartScheduled;
468     private State onlineState = State.CLOSED;
469     private Optional<Instant> lastRequestTime = Optional.empty();
470     private Instant sessionExpireTime = Instant.MAX;
471     private @Nullable Session http2Session;
472
473     private @Nullable Future<?> checkAliveTask;
474     private @Nullable Future<?> internalRestartTask;
475     private Map<Integer, Future<?>> fatalErrorTasks = new ConcurrentHashMap<>();
476
477     /**
478      * Constructor.
479      *
480      * @param httpClientFactory the OH core HttpClientFactory.
481      * @param bridgeHandler the bridge handler.
482      * @param hostName the host name (ip address) of the Hue bridge
483      * @param applicationKey the application key.
484      */
485     public Clip2Bridge(HttpClientFactory httpClientFactory, Clip2BridgeHandler bridgeHandler, String hostName,
486             String applicationKey) {
487         LOGGER.debug("Clip2Bridge()");
488         httpClient = httpClientFactory.getCommonHttpClient();
489         http2Client = httpClientFactory.createHttp2Client("hue-clip2", httpClient.getSslContextFactory());
490         http2Client.setConnectTimeout(Clip2Bridge.TIMEOUT_SECONDS * 1000);
491         http2Client.setIdleTimeout(-1);
492         this.bridgeHandler = bridgeHandler;
493         this.hostName = hostName;
494         this.applicationKey = applicationKey;
495         baseUrl = String.format(FORMAT_URL_RESOURCE, hostName);
496         eventUrl = String.format(FORMAT_URL_EVENTS, hostName);
497         registrationUrl = String.format(FORMAT_URL_REGISTER, hostName);
498     }
499
500     /**
501      * Cancel the given task.
502      *
503      * @param cancelTask the task to be cancelled (may be null)
504      * @param mayInterrupt allows cancel() to interrupt the thread.
505      */
506     private void cancelTask(@Nullable Future<?> cancelTask, boolean mayInterrupt) {
507         if (Objects.nonNull(cancelTask)) {
508             cancelTask.cancel(mayInterrupt);
509         }
510     }
511
512     /**
513      * Send a ping to the Hue bridge to check that the session is still alive.
514      */
515     private void checkAlive() {
516         if (onlineState == State.CLOSED) {
517             return;
518         }
519         LOGGER.debug("checkAlive()");
520         Session session = http2Session;
521         if (Objects.nonNull(session)) {
522             session.ping(new PingFrame(false), Callback.NOOP);
523         }
524         if (Instant.now().isAfter(sessionExpireTime)) {
525             fatalError(this, new Http2Exception(Http2Error.TIMEOUT));
526         }
527     }
528
529     /**
530      * Connection is ok, so reschedule the session check alive expire time. Called in response to incoming ping frames
531      * from the bridge.
532      */
533     protected void checkAliveOk() {
534         LOGGER.debug("checkAliveOk()");
535         sessionExpireTime = Instant.now().plusSeconds(CHECK_ALIVE_SECONDS * 2);
536     }
537
538     /**
539      * Close the connection.
540      */
541     @Override
542     public void close() {
543         closing = true;
544         externalRestartScheduled = false;
545         internalRestartScheduled = false;
546         close2();
547     }
548
549     /**
550      * Private method to close the connection.
551      */
552     private void close2() {
553         synchronized (this) {
554             LOGGER.debug("close2()");
555             boolean notifyHandler = onlineState == State.ACTIVE && !internalRestartScheduled
556                     && !externalRestartScheduled && !closing;
557             onlineState = State.CLOSED;
558             synchronized (fatalErrorTasks) {
559                 fatalErrorTasks.values().forEach(task -> cancelTask(task, true));
560                 fatalErrorTasks.clear();
561             }
562             if (!internalRestartScheduled) {
563                 // don't close the task if a restart is current
564                 cancelTask(internalRestartTask, true);
565                 internalRestartTask = null;
566             }
567             cancelTask(checkAliveTask, true);
568             checkAliveTask = null;
569             closeSession();
570             try {
571                 http2Client.stop();
572             } catch (Exception e) {
573                 // ignore
574             }
575             if (notifyHandler) {
576                 bridgeHandler.onConnectionOffline();
577             }
578         }
579     }
580
581     /**
582      * Close the HTTP 2 session if necessary.
583      */
584     private void closeSession() {
585         LOGGER.debug("closeSession()");
586         Session session = http2Session;
587         if (Objects.nonNull(session)) {
588             session.close(0, null, Callback.NOOP);
589         }
590         http2Session = null;
591     }
592
593     /**
594      * Method that is called back in case of fatal stream or session events. Note: under normal operation, the Hue
595      * Bridge sends a 'soft' GO_AWAY command every nine or ten hours, so we handle such soft errors by attempting to
596      * silently close and re-open the connection without notifying the handler of an actual 'hard' error.
597      *
598      * @param listener the entity that caused this method to be called.
599      * @param cause the exception that caused the error.
600      */
601     private synchronized void fatalError(Object listener, Http2Exception cause) {
602         if (externalRestartScheduled || internalRestartScheduled || onlineState == State.CLOSED || closing) {
603             return;
604         }
605         String causeId = listener.getClass().getSimpleName();
606         if (listener instanceof ContentStreamListenerAdapter) {
607             // on GET / PUT requests the caller handles errors and closes the stream; the session is still OK
608             LOGGER.debug("fatalError() {} {} ignoring", causeId, cause.error);
609         } else if (cause.error == Http2Error.GO_AWAY) {
610             LOGGER.debug("fatalError() {} {} scheduling reconnect", causeId, cause.error);
611
612             // schedule task to open again
613             internalRestartScheduled = true;
614             cancelTask(internalRestartTask, false);
615             internalRestartTask = bridgeHandler.getScheduler().schedule(
616                     () -> internalRestart(onlineState == State.ACTIVE), RESTART_AFTER_SECONDS, TimeUnit.SECONDS);
617
618             // force close immediately to be clean when internalRestart() starts
619             close2();
620         } else {
621             if (LOGGER.isDebugEnabled()) {
622                 LOGGER.debug("fatalError() {} {} closing", causeId, cause.error, cause);
623             } else {
624                 LOGGER.warn("Fatal error {} {} => closing session.", causeId, cause.error);
625             }
626             close2();
627         }
628     }
629
630     /**
631      * Method that is called back in case of fatal stream or session events. Schedules fatalError() to be called after a
632      * delay in order to prevent sequencing issues.
633      *
634      * @param listener the entity that caused this method to be called.
635      * @param cause the exception that caused the error.
636      */
637     protected void fatalErrorDelayed(Object listener, Http2Exception cause) {
638         synchronized (fatalErrorTasks) {
639             final int index = fatalErrorTasks.size();
640             fatalErrorTasks.put(index, bridgeHandler.getScheduler().schedule(() -> {
641                 fatalError(listener, cause);
642                 fatalErrorTasks.remove(index);
643             }, 1, TimeUnit.SECONDS));
644         }
645     }
646
647     /**
648      * HTTP GET a Resources object, for a given resource Reference, from the Hue Bridge. The reference is a class
649      * comprising a resource type and an id. If the id is a specific resource id then only the one specific resource
650      * is returned, whereas if it is null then all resources of the given resource type are returned.
651      *
652      * It wraps the getResourcesImpl() method in a try/catch block, and transposes any HttpUnAuthorizedException into an
653      * ApiException. Such transposition should never be required in reality since by the time this method is called, the
654      * connection will surely already have been authorised.
655      *
656      * @param reference the Reference class to get.
657      * @return a Resource object containing either a list of Resources or a list of Errors.
658      * @throws ApiException if anything fails.
659      * @throws InterruptedException
660      */
661     public Resources getResources(ResourceReference reference) throws ApiException, InterruptedException {
662         sleepDuringRestart();
663         if (onlineState == State.CLOSED) {
664             throw new ApiException("getResources() offline");
665         }
666         return getResourcesImpl(reference);
667     }
668
669     /**
670      * Internal method to send an HTTP 2 GET request to the Hue Bridge and process its response.
671      *
672      * @param reference the Reference class to get.
673      * @return a Resource object containing either a list of Resources or a list of Errors.
674      * @throws HttpUnauthorizedException if the request was refused as not authorised or forbidden.
675      * @throws ApiException if the communication failed, or an unexpected result occurred.
676      * @throws InterruptedException
677      */
678     private Resources getResourcesImpl(ResourceReference reference)
679             throws HttpUnauthorizedException, ApiException, InterruptedException {
680         Session session = http2Session;
681         if (Objects.isNull(session) || session.isClosed()) {
682             throw new ApiException("HTTP 2 session is null or closed");
683         }
684         throttle();
685         String url = getUrl(reference);
686         HeadersFrame headers = prepareHeaders(url, MediaType.APPLICATION_JSON);
687         LOGGER.trace("GET {} HTTP/2", url);
688         try {
689             Completable<@Nullable Stream> streamPromise = new Completable<>();
690             ContentStreamListenerAdapter contentStreamListener = new ContentStreamListenerAdapter();
691             session.newStream(headers, streamPromise, contentStreamListener);
692             // wait for stream to be opened
693             Objects.requireNonNull(streamPromise.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
694             // wait for HTTP response contents
695             String contentJson = contentStreamListener.awaitResult();
696             String contentType = contentStreamListener.getContentType();
697             LOGGER.trace("HTTP/2 200 OK (Content-Type: {}) << {}", contentType, contentJson);
698             if (!MediaType.APPLICATION_JSON.equals(contentType)) {
699                 throw new ApiException("Unexpected Content-Type: " + contentType);
700             }
701             try {
702                 Resources resources = Objects.requireNonNull(jsonParser.fromJson(contentJson, Resources.class));
703                 if (LOGGER.isDebugEnabled()) {
704                     resources.getErrors().forEach(error -> LOGGER.debug("Resources error:{}", error));
705                 }
706                 return resources;
707             } catch (JsonParseException e) {
708                 throw new ApiException("Parsing error", e);
709             }
710         } catch (ExecutionException e) {
711             Throwable cause = e.getCause();
712             if (cause instanceof HttpUnauthorizedException) {
713                 throw (HttpUnauthorizedException) cause;
714             }
715             throw new ApiException("Error sending request", e);
716         } catch (TimeoutException e) {
717             throw new ApiException("Error sending request", e);
718         } finally {
719             throttleDone();
720         }
721     }
722
723     /**
724      * Build a full path to a server end point, based on a Reference class instance. If the reference contains only
725      * a resource type, the method returns the end point url to get all resources of the given resource type. Whereas if
726      * it also contains an id, the method returns the end point url to get the specific single resource with that type
727      * and id.
728      *
729      * @param reference a Reference class instance.
730      * @return the complete end point url.
731      */
732     private String getUrl(ResourceReference reference) {
733         String url = baseUrl + reference.getType().name().toLowerCase();
734         String id = reference.getId();
735         return Objects.isNull(id) || id.isEmpty() ? url : url + "/" + id;
736     }
737
738     /**
739      * Restart the session.
740      *
741      * @param active boolean that selects whether to restart in active or passive mode.
742      */
743     private void internalRestart(boolean active) {
744         try {
745             openPassive();
746             if (active) {
747                 openActive();
748             }
749             internalRestartScheduled = false;
750         } catch (ApiException e) {
751             if (LOGGER.isDebugEnabled()) {
752                 LOGGER.debug("internalRestart() failed", e);
753             } else {
754                 LOGGER.warn("Scheduled reconnection task failed.");
755             }
756             internalRestartScheduled = false;
757             close2();
758         } catch (InterruptedException e) {
759         }
760     }
761
762     /**
763      * The event stream calls this method when it has received text data. It parses the text as JSON into a list of
764      * Event entries, converts the list of events to a list of resources, and forwards that list to the bridge
765      * handler.
766      *
767      * @param data the incoming (presumed to be JSON) text.
768      */
769     protected void onEventData(String data) {
770         if (onlineState != State.ACTIVE) {
771             return;
772         }
773         if (LOGGER.isTraceEnabled()) {
774             LOGGER.trace("onEventData() data:{}", data);
775         } else {
776             LOGGER.debug("onEventData() data length:{}", data.length());
777         }
778         JsonElement jsonElement;
779         try {
780             jsonElement = JsonParser.parseString(data);
781         } catch (JsonSyntaxException e) {
782             LOGGER.debug("onEventData() invalid data '{}'", data, e);
783             return;
784         }
785         if (!(jsonElement instanceof JsonArray)) {
786             LOGGER.debug("onEventData() data is not a JsonArray {}", data);
787             return;
788         }
789         List<Event> events;
790         try {
791             events = jsonParser.fromJson(jsonElement, Event.EVENT_LIST_TYPE);
792         } catch (JsonParseException e) {
793             LOGGER.debug("onEventData() parsing error json:{}", data, e);
794             return;
795         }
796         if (Objects.isNull(events) || events.isEmpty()) {
797             LOGGER.debug("onEventData() event list is null or empty");
798             return;
799         }
800         List<Resource> resources = new ArrayList<>();
801         events.forEach(event -> resources.addAll(event.getData()));
802         if (resources.isEmpty()) {
803             LOGGER.debug("onEventData() resource list is empty");
804             return;
805         }
806         resources.forEach(resource -> resource.markAsSparse());
807         bridgeHandler.onResourcesEvent(resources);
808     }
809
810     /**
811      * Open the HTTP 2 session and the event stream.
812      *
813      * @throws ApiException if there was a communication error.
814      * @throws InterruptedException
815      */
816     public void open() throws ApiException, InterruptedException {
817         LOGGER.debug("open()");
818         openPassive();
819         openActive();
820         bridgeHandler.onConnectionOnline();
821     }
822
823     /**
824      * Make the session active, by opening an HTTP 2 SSE event stream (if necessary).
825      *
826      * @throws ApiException if an error was encountered.
827      * @throws InterruptedException
828      */
829     private void openActive() throws ApiException, InterruptedException {
830         synchronized (this) {
831             openEventStream();
832             onlineState = State.ACTIVE;
833         }
834     }
835
836     /**
837      * Open the check alive task if necessary.
838      */
839     private void openCheckAliveTask() {
840         Future<?> task = checkAliveTask;
841         if (Objects.isNull(task) || task.isCancelled() || task.isDone()) {
842             LOGGER.debug("openCheckAliveTask()");
843             cancelTask(checkAliveTask, false);
844             checkAliveTask = bridgeHandler.getScheduler().scheduleWithFixedDelay(() -> checkAlive(),
845                     CHECK_ALIVE_SECONDS, CHECK_ALIVE_SECONDS, TimeUnit.SECONDS);
846         }
847     }
848
849     /**
850      * Implementation to open an HTTP 2 SSE event stream if necessary.
851      *
852      * @throws ApiException if an error was encountered.
853      * @throws InterruptedException
854      */
855     private void openEventStream() throws ApiException, InterruptedException {
856         Session session = http2Session;
857         if (Objects.isNull(session) || session.isClosed()) {
858             throw new ApiException("HTTP 2 session is null or closed");
859         }
860         if (session.getStreams().stream().anyMatch(stream -> Objects.nonNull(stream.getAttribute(EVENT_STREAM_ID)))) {
861             return;
862         }
863         LOGGER.debug("openEventStream()");
864         HeadersFrame headers = prepareHeaders(eventUrl, MediaType.SERVER_SENT_EVENTS);
865         LOGGER.trace("GET {} HTTP/2", eventUrl);
866         Stream stream = null;
867         try {
868             Completable<@Nullable Stream> streamPromise = new Completable<>();
869             EventStreamListenerAdapter eventStreamListener = new EventStreamListenerAdapter();
870             session.newStream(headers, streamPromise, eventStreamListener);
871             // wait for stream to be opened
872             stream = Objects.requireNonNull(streamPromise.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
873             stream.setIdleTimeout(0);
874             stream.setAttribute(EVENT_STREAM_ID, session);
875             // wait for "hi" from the bridge
876             eventStreamListener.awaitResult();
877         } catch (ExecutionException | TimeoutException e) {
878             if (Objects.nonNull(stream)) {
879                 stream.reset(new ResetFrame(stream.getId(), 0), Callback.NOOP);
880             }
881             throw new ApiException("Error opening event stream", e);
882         }
883     }
884
885     /**
886      * Private method to open the HTTP 2 session in passive mode.
887      *
888      * @throws ApiException if there was a communication error.
889      * @throws InterruptedException
890      */
891     private void openPassive() throws ApiException, InterruptedException {
892         synchronized (this) {
893             LOGGER.debug("openPassive()");
894             onlineState = State.CLOSED;
895             try {
896                 http2Client.start();
897             } catch (Exception e) {
898                 throw new ApiException("Error starting HTTP/2 client", e);
899             }
900             openSession();
901             openCheckAliveTask();
902             onlineState = State.PASSIVE;
903         }
904     }
905
906     /**
907      * Open the HTTP 2 session if necessary.
908      *
909      * @throws ApiException if it was not possible to create and connect the session.
910      * @throws InterruptedException
911      */
912     private void openSession() throws ApiException, InterruptedException {
913         Session session = http2Session;
914         if (Objects.nonNull(session) && !session.isClosed()) {
915             return;
916         }
917         LOGGER.debug("openSession()");
918         InetSocketAddress address = new InetSocketAddress(hostName, 443);
919         try {
920             SessionListenerAdapter sessionListener = new SessionListenerAdapter();
921             Completable<@Nullable Session> sessionPromise = new Completable<>();
922             http2Client.connect(http2Client.getBean(SslContextFactory.class), address, sessionListener, sessionPromise);
923             // wait for the (SSL) session to be opened
924             http2Session = Objects.requireNonNull(sessionPromise.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
925             checkAliveOk(); // initialise the session timeout window
926         } catch (ExecutionException | TimeoutException e) {
927             throw new ApiException("Error opening HTTP 2 session", e);
928         }
929     }
930
931     /**
932      * Helper class to create a HeadersFrame for a standard HTTP GET request.
933      *
934      * @param url the server url.
935      * @param acceptContentType the accepted content type for the response.
936      * @return the HeadersFrame.
937      */
938     private HeadersFrame prepareHeaders(String url, String acceptContentType) {
939         return prepareHeaders(url, acceptContentType, "GET", -1, null);
940     }
941
942     /**
943      * Helper class to create a HeadersFrame for a more exotic HTTP request.
944      *
945      * @param url the server url.
946      * @param acceptContentType the accepted content type for the response.
947      * @param method the HTTP request method.
948      * @param contentLength the length of the content e.g. for a PUT call.
949      * @param contentType the respective content type.
950      * @return the HeadersFrame.
951      */
952     private HeadersFrame prepareHeaders(String url, String acceptContentType, String method, long contentLength,
953             @Nullable String contentType) {
954         HttpFields fields = new HttpFields();
955         fields.put(HttpHeader.ACCEPT, acceptContentType);
956         if (contentType != null) {
957             fields.put(HttpHeader.CONTENT_TYPE, contentType);
958         }
959         if (contentLength >= 0) {
960             fields.putLongField(HttpHeader.CONTENT_LENGTH, contentLength);
961         }
962         fields.put(APPLICATION_KEY, applicationKey);
963         return new HeadersFrame(new MetaData.Request(method, new HttpURI(url), HttpVersion.HTTP_2, fields), null,
964                 contentLength <= 0);
965     }
966
967     /**
968      * Use an HTTP/2 PUT command to send a resource to the server.
969      *
970      * @param resource the resource to put.
971      * @throws ApiException if something fails.
972      * @throws InterruptedException
973      */
974     public void putResource(Resource resource) throws ApiException, InterruptedException {
975         sleepDuringRestart();
976         if (onlineState == State.CLOSED) {
977             return;
978         }
979         Session session = http2Session;
980         if (Objects.isNull(session) || session.isClosed()) {
981             throw new ApiException("HTTP 2 session is null or closed");
982         }
983         throttle();
984         String requestJson = jsonParser.toJson(resource);
985         ByteBuffer requestBytes = ByteBuffer.wrap(requestJson.getBytes(StandardCharsets.UTF_8));
986         String url = getUrl(new ResourceReference().setId(resource.getId()).setType(resource.getType()));
987         HeadersFrame headers = prepareHeaders(url, MediaType.APPLICATION_JSON, "PUT", requestBytes.capacity(),
988                 MediaType.APPLICATION_JSON);
989         LOGGER.trace("PUT {} HTTP/2 >> {}", url, requestJson);
990         try {
991             Completable<@Nullable Stream> streamPromise = new Completable<>();
992             ContentStreamListenerAdapter contentStreamListener = new ContentStreamListenerAdapter();
993             session.newStream(headers, streamPromise, contentStreamListener);
994             // wait for stream to be opened
995             Stream stream = Objects.requireNonNull(streamPromise.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
996             stream.data(new DataFrame(stream.getId(), requestBytes, true), Callback.NOOP);
997             // wait for HTTP response
998             String contentJson = contentStreamListener.awaitResult();
999             String contentType = contentStreamListener.getContentType();
1000             LOGGER.trace("HTTP/2 200 OK (Content-Type: {}) << {}", contentType, contentJson);
1001             if (!MediaType.APPLICATION_JSON.equals(contentType)) {
1002                 throw new ApiException("Unexpected Content-Type: " + contentType);
1003             }
1004             try {
1005                 Resources resources = Objects.requireNonNull(jsonParser.fromJson(contentJson, Resources.class));
1006                 if (LOGGER.isDebugEnabled()) {
1007                     resources.getErrors().forEach(error -> LOGGER.debug("putResource() resources error:{}", error));
1008                 }
1009             } catch (JsonParseException e) {
1010                 LOGGER.debug("putResource() parsing error json:{}", contentJson, e);
1011                 throw new ApiException("Parsing error", e);
1012             }
1013         } catch (ExecutionException | TimeoutException e) {
1014             throw new ApiException("putResource() error sending request", e);
1015         } finally {
1016             throttleDone();
1017         }
1018     }
1019
1020     /**
1021      * Try to register the application key with the hub. Use the given application key if one is provided; otherwise the
1022      * hub will create a new one. Note: this requires an HTTP 1.1 client call.
1023      *
1024      * @param oldApplicationKey existing application key if any i.e. may be empty.
1025      * @return the existing or a newly created application key.
1026      * @throws HttpUnauthorizedException if the registration failed.
1027      * @throws ApiException if there was a communications error.
1028      * @throws InterruptedException
1029      */
1030     public String registerApplicationKey(@Nullable String oldApplicationKey)
1031             throws HttpUnauthorizedException, ApiException, InterruptedException {
1032         LOGGER.debug("registerApplicationKey()");
1033         String json = jsonParser.toJson((Objects.isNull(oldApplicationKey) || oldApplicationKey.isEmpty())
1034                 ? new CreateUserRequest(APPLICATION_ID)
1035                 : new CreateUserRequest(oldApplicationKey, APPLICATION_ID));
1036         Request httpRequest = httpClient.newRequest(registrationUrl).method(HttpMethod.POST)
1037                 .timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
1038                 .content(new StringContentProvider(json), MediaType.APPLICATION_JSON);
1039         ContentResponse contentResponse;
1040         try {
1041             LOGGER.trace("POST {} HTTP/1.1 >> {}", registrationUrl, json);
1042             contentResponse = httpRequest.send();
1043         } catch (TimeoutException | ExecutionException e) {
1044             throw new ApiException("HTTP processing error", e);
1045         }
1046         int httpStatus = contentResponse.getStatus();
1047         json = contentResponse.getContentAsString().trim();
1048         LOGGER.trace("HTTP/1.1 {} {} << {}", httpStatus, contentResponse.getReason(), json);
1049         if (httpStatus != HttpStatus.OK_200) {
1050             throw new ApiException("HTTP bad response");
1051         }
1052         try {
1053             List<SuccessResponse> entries = jsonParser.fromJson(json, SuccessResponse.GSON_TYPE);
1054             if (Objects.nonNull(entries) && !entries.isEmpty()) {
1055                 SuccessResponse response = entries.get(0);
1056                 Map<String, Object> responseSuccess = response.success;
1057                 if (Objects.nonNull(responseSuccess)) {
1058                     String newApplicationKey = (String) responseSuccess.get("username");
1059                     if (Objects.nonNull(newApplicationKey)) {
1060                         return newApplicationKey;
1061                     }
1062                 }
1063             }
1064         } catch (JsonParseException e) {
1065             LOGGER.debug("registerApplicationKey() parsing error json:{}", json, e);
1066         }
1067         throw new HttpUnauthorizedException("Application key registration failed");
1068     }
1069
1070     public void setExternalRestartScheduled() {
1071         externalRestartScheduled = true;
1072         internalRestartScheduled = false;
1073         cancelTask(internalRestartTask, false);
1074         internalRestartTask = null;
1075         close2();
1076     }
1077
1078     /**
1079      * Sleep the caller during any period when the connection is restarting.
1080      *
1081      * @throws ApiException if anything failed.
1082      * @throws InterruptedException
1083      */
1084     private void sleepDuringRestart() throws ApiException, InterruptedException {
1085         Future<?> restartTask = this.internalRestartTask;
1086         if (Objects.nonNull(restartTask)) {
1087             try {
1088                 restartTask.get(RESTART_AFTER_SECONDS * 2, TimeUnit.SECONDS);
1089             } catch (ExecutionException | TimeoutException e) {
1090                 throw new ApiException("sleepDuringRestart() error", e);
1091             }
1092         }
1093         internalRestartScheduled = false;
1094     }
1095
1096     /**
1097      * Test the Hue Bridge connection state by attempting to connect and trying to execute a basic command that requires
1098      * authentication.
1099      *
1100      * @throws HttpUnauthorizedException if it was possible to connect but not to authenticate.
1101      * @throws ApiException if it was not possible to connect.
1102      * @throws InterruptedException
1103      */
1104     public void testConnectionState() throws HttpUnauthorizedException, ApiException, InterruptedException {
1105         LOGGER.debug("testConnectionState()");
1106         try {
1107             openPassive();
1108             getResourcesImpl(BRIDGE);
1109         } catch (ApiException e) {
1110             close2();
1111             throw e;
1112         }
1113     }
1114
1115     /**
1116      * Hue Bridges get confused if they receive too many HTTP requests in a short period of time (e.g. on start up), or
1117      * if too many HTTP sessions are opened at the same time. So this method throttles the requests to a maximum of one
1118      * per REQUEST_INTERVAL_MILLISECS, and ensures that no more than MAX_CONCURRENT_SESSIONS sessions are started.
1119      *
1120      * @throws InterruptedException
1121      */
1122     private synchronized void throttle() throws InterruptedException {
1123         streamMutex.acquire();
1124         Instant now = Instant.now();
1125         if (lastRequestTime.isPresent()) {
1126             long delay = Duration.between(now, lastRequestTime.get()).toMillis() + REQUEST_INTERVAL_MILLISECS;
1127             if (delay > 0) {
1128                 Thread.sleep(delay);
1129             }
1130         }
1131         lastRequestTime = Optional.of(now);
1132     }
1133
1134     /**
1135      * Release the mutex.
1136      */
1137     private void throttleDone() {
1138         streamMutex.release();
1139     }
1140 }