]> git.basschouten.com Git - openhab-addons.git/commitdiff
[homeconnect] Fix invalid authorization header during Server-Sent Events (SSE) client...
authorbruestel <jonas@bruestel.net>
Fri, 18 Jun 2021 09:25:13 +0000 (11:25 +0200)
committerGitHub <noreply@github.com>
Fri, 18 Jun 2021 09:25:13 +0000 (11:25 +0200)
* [homeconnect] Improve logging of SSE connection and add backoff interval in case of connection error

Signed-off-by: Jonas Brüstel <jonas@bruestel.net>
* [homeconnect] Fix SSE authorization header problem

Signed-off-by: Jonas Brüstel <jonas@bruestel.net>
* [homeconnect] Fix synchronized monitor

Signed-off-by: Jonas Brüstel <jonas@bruestel.net>
bundles/org.openhab.binding.homeconnect/src/main/java/org/openhab/binding/homeconnect/internal/client/HomeConnectEventSourceClient.java
bundles/org.openhab.binding.homeconnect/src/main/java/org/openhab/binding/homeconnect/internal/client/HomeConnectEventSourceListener.java
bundles/org.openhab.binding.homeconnect/src/main/java/org/openhab/binding/homeconnect/internal/client/HomeConnectStreamingRequestFilter.java
bundles/org.openhab.binding.homeconnect/src/main/java/org/openhab/binding/homeconnect/internal/client/HttpHelper.java

index ab0faf124d5a9c2b5051d9bc418780c9437b8511..94fcb5e587b41d78d8a4bbbf23fee81dad081ebc 100644 (file)
@@ -95,11 +95,11 @@ public class HomeConnectEventSourceClient {
 
         if (!eventSourceConnections.containsKey(eventListener)) {
             logger.debug("Create new event source listener for '{}'.", haId);
-            Client client = clientBuilder.readTimeout(SSE_REQUEST_READ_TIMEOUT, TimeUnit.SECONDS).register(
-                    new HomeConnectStreamingRequestFilter(HttpHelper.getAuthorizationHeader(oAuthClientService)))
-                    .build();
-            SseEventSource eventSource = eventSourceFactory
-                    .newSource(client.target(apiUrl + "/api/homeappliances/" + haId + "/events"));
+            String target = apiUrl + "/api/homeappliances/" + haId + "/events";
+
+            Client client = createClient(target);
+
+            SseEventSource eventSource = eventSourceFactory.newSource(client.target(target));
             HomeConnectEventSourceListener eventSourceListener = new HomeConnectEventSourceListener(haId, eventListener,
                     this, scheduler, eventQueue);
             eventSource.register(eventSourceListener::onEvent, eventSourceListener::onError,
@@ -149,9 +149,11 @@ public class HomeConnectEventSourceClient {
     }
 
     private void closeEventSource(SseEventSource eventSource, boolean immediate, boolean completed) {
-        if (eventSource.isOpen() && !completed) {
-            logger.debug("Close event source (immediate = {})", immediate);
-            eventSource.close(immediate ? 0 : 10, TimeUnit.SECONDS);
+        var open = eventSource.isOpen();
+        logger.debug("Closing event source. open={}, completed={}, immediate={}", open, completed, immediate);
+        if (open && !completed) {
+            eventSource.close(immediate ? 0 : 5, TimeUnit.SECONDS);
+            logger.debug("Event source closed.");
         }
         HomeConnectEventSourceListener eventSourceListener = eventSourceListeners.get(eventSource);
         if (eventSourceListener != null) {
@@ -159,6 +161,26 @@ public class HomeConnectEventSourceClient {
         }
     }
 
+    private Client createClient(String target) throws CommunicationException, AuthorizationException {
+        boolean filterRegistered = clientBuilder.getConfiguration()
+                .isRegistered(HomeConnectStreamingRequestFilter.class);
+
+        Client client;
+        HomeConnectStreamingRequestFilter filter;
+        if (filterRegistered) {
+            filter = clientBuilder.getConfiguration().getInstances().stream()
+                    .filter(instance -> instance instanceof HomeConnectStreamingRequestFilter)
+                    .map(instance -> (HomeConnectStreamingRequestFilter) instance).findAny().orElseThrow();
+            client = clientBuilder.readTimeout(SSE_REQUEST_READ_TIMEOUT, TimeUnit.SECONDS).build();
+        } else {
+            filter = new HomeConnectStreamingRequestFilter();
+            client = clientBuilder.readTimeout(SSE_REQUEST_READ_TIMEOUT, TimeUnit.SECONDS).register(filter).build();
+        }
+        filter.setAuthorizationHeader(target, HttpHelper.getAuthorizationHeader(oAuthClientService));
+
+        return client;
+    }
+
     /**
      * Connection count.
      *
index e33b0456835a17d84992377dec3f8634bab67b64..8341d5e47cb31f2f16a1d2ed879199e7e3cef907 100644 (file)
@@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
+import javax.ws.rs.InternalServerErrorException;
 import javax.ws.rs.NotAuthorizedException;
 import javax.ws.rs.sse.InboundSseEvent;
 
@@ -138,9 +139,14 @@ public class HomeConnectEventSourceListener {
                 // seconds. So we wait few seconds before trying again.
                 if (error instanceof NotAuthorizedException) {
                     logger.debug(
-                            "Event source listener connection failure due to unauthorized exception : wait 10 seconds... haId={}",
+                            "Event source listener connection failure due to unauthorized exception : wait 20 seconds... haId={}",
                             haId);
-                    scheduledExecutorService.schedule(() -> eventListener.onClosed(), 10, TimeUnit.SECONDS);
+                    scheduledExecutorService.schedule(() -> eventListener.onClosed(), 20, TimeUnit.SECONDS);
+                } else if (error instanceof InternalServerErrorException) {
+                    logger.debug(
+                            "Event source listener connection failure due to internal server exception : wait 2 seconds... haId={}",
+                            haId);
+                    scheduledExecutorService.schedule(() -> eventListener.onClosed(), 2, TimeUnit.SECONDS);
                 } else {
                     eventListener.onClosed();
                 }
index c93acd25bc135dd1fc4b89c4df4098773632558e..66984e8917894716bc379988b14ccd820e13f8d4 100644 (file)
 package org.openhab.binding.homeconnect.internal.client;
 
 import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.ws.rs.client.ClientRequestContext;
 import javax.ws.rs.client.ClientRequestFilter;
+import javax.ws.rs.client.ClientResponseContext;
+import javax.ws.rs.client.ClientResponseFilter;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MultivaluedMap;
 
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Inserts Authorization header for requests on the streaming REST API.
@@ -28,23 +33,49 @@ import org.eclipse.jdt.annotation.Nullable;
  * @author Laurent Garnier - Initial contribution
  */
 @NonNullByDefault
-public class HomeConnectStreamingRequestFilter implements ClientRequestFilter {
+public class HomeConnectStreamingRequestFilter implements ClientRequestFilter, ClientResponseFilter {
 
     private static final String TEXT_EVENT_STREAM = "text/event-stream";
 
-    private final String authorizationHeader;
-
-    public HomeConnectStreamingRequestFilter(String authorizationHeader) {
-        this.authorizationHeader = authorizationHeader;
-    }
+    private final Logger logger = LoggerFactory.getLogger(HomeConnectStreamingRequestFilter.class);
+    private final ConcurrentHashMap<String, String> authorizationHeaders = new ConcurrentHashMap<>();
 
     @Override
     public void filter(@Nullable ClientRequestContext requestContext) throws IOException {
         if (requestContext != null) {
             MultivaluedMap<String, Object> headers = requestContext.getHeaders();
-            headers.putSingle(HttpHeaders.AUTHORIZATION, authorizationHeader);
+            String authorizationHeader = authorizationHeaders.get(requestContext.getUri().toString());
+            if (authorizationHeader != null) {
+                headers.putSingle(HttpHeaders.AUTHORIZATION, authorizationHeader);
+            } else {
+                logger.warn("No authorization header set! uri={}", requestContext.getUri());
+            }
             headers.putSingle(HttpHeaders.CACHE_CONTROL, "no-cache");
             headers.putSingle(HttpHeaders.ACCEPT, TEXT_EVENT_STREAM);
         }
     }
+
+    @Override
+    public void filter(@Nullable ClientRequestContext requestContext, @Nullable ClientResponseContext responseContext)
+            throws IOException {
+        if (logger.isDebugEnabled() && requestContext != null) {
+            StringBuilder sb = new StringBuilder();
+            sb.append("SSE connection: ");
+            sb.append(requestContext.getUri()).append("\n");
+            requestContext.getHeaders()
+                    .forEach((name, value) -> sb.append("> ").append(name).append(": ").append(value).append("\n"));
+
+            if (responseContext != null) {
+                responseContext.getHeaders()
+                        .forEach((name, value) -> sb.append("< ").append(name).append(": ").append(value).append("\n"));
+            }
+
+            logger.debug("{}", sb);
+        }
+    }
+
+    public void setAuthorizationHeader(String target, String header) {
+        logger.debug("Set authorization header. target={}, header={}", target, header);
+        authorizationHeaders.put(target, header);
+    }
 }
index c03f589611599b0b681e9a08ed7692d7e7b04518..3fdaedb3807816389710f437fea3cb465b3a9c06 100644 (file)
@@ -60,6 +60,8 @@ public class HttpHelper {
     private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create();
     private static final JsonParser JSON_PARSER = new JsonParser();
     private static final Map<String, Bucket> BUCKET_MAP = new HashMap<>();
+    private static final Object AUTHORIZATION_HEADER_MONITOR = new Object();
+    private static final Object BUCKET_MONITOR = new Object();
 
     private static @Nullable String lastAccessToken = null;
 
@@ -90,36 +92,42 @@ public class HttpHelper {
 
     public static String getAuthorizationHeader(OAuthClientService oAuthClientService)
             throws AuthorizationException, CommunicationException {
-        try {
-            AccessTokenResponse accessTokenResponse = oAuthClientService.getAccessTokenResponse();
-            // refresh the token if it's about to expire
-            if (accessTokenResponse != null
-                    && accessTokenResponse.isExpired(LocalDateTime.now(), OAUTH_EXPIRE_BUFFER)) {
-                LoggerFactory.getLogger(HttpHelper.class).debug("Requesting a refresh of the access token.");
-                accessTokenResponse = oAuthClientService.refreshToken();
-            }
+        synchronized (AUTHORIZATION_HEADER_MONITOR) {
+            try {
+                AccessTokenResponse accessTokenResponse = oAuthClientService.getAccessTokenResponse();
+                // refresh the token if it's about to expire
+                if (accessTokenResponse != null
+                        && accessTokenResponse.isExpired(LocalDateTime.now(), OAUTH_EXPIRE_BUFFER)) {
+                    LoggerFactory.getLogger(HttpHelper.class).debug("Requesting a refresh of the access token.");
+                    accessTokenResponse = oAuthClientService.refreshToken();
+                }
 
-            if (accessTokenResponse != null) {
-                String lastToken = lastAccessToken;
-                if (lastToken == null) {
-                    LoggerFactory.getLogger(HttpHelper.class).debug("The used access token was created at {}",
-                            accessTokenResponse.getCreatedOn().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
-                } else if (!lastToken.equals(accessTokenResponse.getAccessToken())) {
-                    LoggerFactory.getLogger(HttpHelper.class).debug("The access token changed. New one created at {}",
-                            accessTokenResponse.getCreatedOn().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
+                if (accessTokenResponse != null) {
+                    String lastToken = lastAccessToken;
+                    if (lastToken == null) {
+                        LoggerFactory.getLogger(HttpHelper.class).debug("The used access token was created at {}",
+                                accessTokenResponse.getCreatedOn().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
+                    } else if (!lastToken.equals(accessTokenResponse.getAccessToken())) {
+                        LoggerFactory.getLogger(HttpHelper.class).debug(
+                                "The access token changed. New one created at {}",
+                                accessTokenResponse.getCreatedOn().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
+                    }
+                    lastAccessToken = accessTokenResponse.getAccessToken();
+
+                    LoggerFactory.getLogger(HttpHelper.class).debug("Current access token: {}",
+                            accessTokenResponse.getAccessToken());
+                    return BEARER + accessTokenResponse.getAccessToken();
+                } else {
+                    LoggerFactory.getLogger(HttpHelper.class).error("No access token available! Fatal error.");
+                    throw new AuthorizationException("No access token available!");
                 }
-                lastAccessToken = accessTokenResponse.getAccessToken();
-                return BEARER + accessTokenResponse.getAccessToken();
-            } else {
-                LoggerFactory.getLogger(HttpHelper.class).error("No access token available! Fatal error.");
-                throw new AuthorizationException("No access token available!");
+            } catch (IOException e) {
+                String errorMessage = e.getMessage();
+                throw new CommunicationException(errorMessage != null ? errorMessage : "IOException", e);
+            } catch (OAuthException | OAuthResponseException e) {
+                String errorMessage = e.getMessage();
+                throw new AuthorizationException(errorMessage != null ? errorMessage : "oAuth exception", e);
             }
-        } catch (IOException e) {
-            String errorMessage = e.getMessage();
-            throw new CommunicationException(errorMessage != null ? errorMessage : "IOException", e);
-        } catch (OAuthException | OAuthResponseException e) {
-            String errorMessage = e.getMessage();
-            throw new AuthorizationException(errorMessage != null ? errorMessage : "oAuth exception", e);
         }
     }
 
@@ -127,20 +135,22 @@ public class HttpHelper {
         return JSON_PARSER.parse(json);
     }
 
-    private static synchronized Bucket getBucket(String clientId) {
-        Bucket bucket = null;
-        if (BUCKET_MAP.containsKey(clientId)) {
-            bucket = BUCKET_MAP.get(clientId);
-        }
+    private static Bucket getBucket(String clientId) {
+        synchronized (BUCKET_MONITOR) {
+            Bucket bucket = null;
+            if (BUCKET_MAP.containsKey(clientId)) {
+                bucket = BUCKET_MAP.get(clientId);
+            }
 
-        if (bucket == null) {
-            bucket = Bucket4j.builder()
-                    // allows 50 tokens per minute (added 10 second buffer)
-                    .addLimit(classic(50, intervally(50, Duration.ofSeconds(70))).withInitialTokens(40))
-                    // but not often then 50 tokens per second
-                    .addLimit(classic(10, intervally(10, Duration.ofSeconds(1)))).build();
-            BUCKET_MAP.put(clientId, bucket);
+            if (bucket == null) {
+                bucket = Bucket4j.builder()
+                        // allows 50 tokens per minute (added 10 second buffer)
+                        .addLimit(classic(50, intervally(50, Duration.ofSeconds(70))).withInitialTokens(40))
+                        // but not often then 50 tokens per second
+                        .addLimit(classic(10, intervally(10, Duration.ofSeconds(1)))).build();
+                BUCKET_MAP.put(clientId, bucket);
+            }
+            return bucket;
         }
-        return bucket;
     }
 }