* [homeconnect] Improve logging of SSE connection and add backoff interval in case of connection error
Signed-off-by: Jonas Brüstel <jonas@bruestel.net>
* [homeconnect] Fix SSE authorization header problem
Signed-off-by: Jonas Brüstel <jonas@bruestel.net>
* [homeconnect] Fix synchronized monitor
Signed-off-by: Jonas Brüstel <jonas@bruestel.net>
if (!eventSourceConnections.containsKey(eventListener)) {
logger.debug("Create new event source listener for '{}'.", haId);
- Client client = clientBuilder.readTimeout(SSE_REQUEST_READ_TIMEOUT, TimeUnit.SECONDS).register(
- new HomeConnectStreamingRequestFilter(HttpHelper.getAuthorizationHeader(oAuthClientService)))
- .build();
- SseEventSource eventSource = eventSourceFactory
- .newSource(client.target(apiUrl + "/api/homeappliances/" + haId + "/events"));
+ String target = apiUrl + "/api/homeappliances/" + haId + "/events";
+
+ Client client = createClient(target);
+
+ SseEventSource eventSource = eventSourceFactory.newSource(client.target(target));
HomeConnectEventSourceListener eventSourceListener = new HomeConnectEventSourceListener(haId, eventListener,
this, scheduler, eventQueue);
eventSource.register(eventSourceListener::onEvent, eventSourceListener::onError,
}
private void closeEventSource(SseEventSource eventSource, boolean immediate, boolean completed) {
- if (eventSource.isOpen() && !completed) {
- logger.debug("Close event source (immediate = {})", immediate);
- eventSource.close(immediate ? 0 : 10, TimeUnit.SECONDS);
+ var open = eventSource.isOpen();
+ logger.debug("Closing event source. open={}, completed={}, immediate={}", open, completed, immediate);
+ if (open && !completed) {
+ eventSource.close(immediate ? 0 : 5, TimeUnit.SECONDS);
+ logger.debug("Event source closed.");
}
HomeConnectEventSourceListener eventSourceListener = eventSourceListeners.get(eventSource);
if (eventSourceListener != null) {
}
}
+ private Client createClient(String target) throws CommunicationException, AuthorizationException {
+ boolean filterRegistered = clientBuilder.getConfiguration()
+ .isRegistered(HomeConnectStreamingRequestFilter.class);
+
+ Client client;
+ HomeConnectStreamingRequestFilter filter;
+ if (filterRegistered) {
+ filter = clientBuilder.getConfiguration().getInstances().stream()
+ .filter(instance -> instance instanceof HomeConnectStreamingRequestFilter)
+ .map(instance -> (HomeConnectStreamingRequestFilter) instance).findAny().orElseThrow();
+ client = clientBuilder.readTimeout(SSE_REQUEST_READ_TIMEOUT, TimeUnit.SECONDS).build();
+ } else {
+ filter = new HomeConnectStreamingRequestFilter();
+ client = clientBuilder.readTimeout(SSE_REQUEST_READ_TIMEOUT, TimeUnit.SECONDS).register(filter).build();
+ }
+ filter.setAuthorizationHeader(target, HttpHelper.getAuthorizationHeader(oAuthClientService));
+
+ return client;
+ }
+
/**
* Connection count.
*
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import javax.ws.rs.InternalServerErrorException;
import javax.ws.rs.NotAuthorizedException;
import javax.ws.rs.sse.InboundSseEvent;
// seconds. So we wait few seconds before trying again.
if (error instanceof NotAuthorizedException) {
logger.debug(
- "Event source listener connection failure due to unauthorized exception : wait 10 seconds... haId={}",
+ "Event source listener connection failure due to unauthorized exception : wait 20 seconds... haId={}",
haId);
- scheduledExecutorService.schedule(() -> eventListener.onClosed(), 10, TimeUnit.SECONDS);
+ scheduledExecutorService.schedule(() -> eventListener.onClosed(), 20, TimeUnit.SECONDS);
+ } else if (error instanceof InternalServerErrorException) {
+ logger.debug(
+ "Event source listener connection failure due to internal server exception : wait 2 seconds... haId={}",
+ haId);
+ scheduledExecutorService.schedule(() -> eventListener.onClosed(), 2, TimeUnit.SECONDS);
} else {
eventListener.onClosed();
}
package org.openhab.binding.homeconnect.internal.client;
import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
import javax.ws.rs.client.ClientRequestContext;
import javax.ws.rs.client.ClientRequestFilter;
+import javax.ws.rs.client.ClientResponseContext;
+import javax.ws.rs.client.ClientResponseFilter;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MultivaluedMap;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Inserts Authorization header for requests on the streaming REST API.
* @author Laurent Garnier - Initial contribution
*/
@NonNullByDefault
-public class HomeConnectStreamingRequestFilter implements ClientRequestFilter {
+public class HomeConnectStreamingRequestFilter implements ClientRequestFilter, ClientResponseFilter {
private static final String TEXT_EVENT_STREAM = "text/event-stream";
- private final String authorizationHeader;
-
- public HomeConnectStreamingRequestFilter(String authorizationHeader) {
- this.authorizationHeader = authorizationHeader;
- }
+ private final Logger logger = LoggerFactory.getLogger(HomeConnectStreamingRequestFilter.class);
+ private final ConcurrentHashMap<String, String> authorizationHeaders = new ConcurrentHashMap<>();
@Override
public void filter(@Nullable ClientRequestContext requestContext) throws IOException {
if (requestContext != null) {
MultivaluedMap<String, Object> headers = requestContext.getHeaders();
- headers.putSingle(HttpHeaders.AUTHORIZATION, authorizationHeader);
+ String authorizationHeader = authorizationHeaders.get(requestContext.getUri().toString());
+ if (authorizationHeader != null) {
+ headers.putSingle(HttpHeaders.AUTHORIZATION, authorizationHeader);
+ } else {
+ logger.warn("No authorization header set! uri={}", requestContext.getUri());
+ }
headers.putSingle(HttpHeaders.CACHE_CONTROL, "no-cache");
headers.putSingle(HttpHeaders.ACCEPT, TEXT_EVENT_STREAM);
}
}
+
+ @Override
+ public void filter(@Nullable ClientRequestContext requestContext, @Nullable ClientResponseContext responseContext)
+ throws IOException {
+ if (logger.isDebugEnabled() && requestContext != null) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("SSE connection: ");
+ sb.append(requestContext.getUri()).append("\n");
+ requestContext.getHeaders()
+ .forEach((name, value) -> sb.append("> ").append(name).append(": ").append(value).append("\n"));
+
+ if (responseContext != null) {
+ responseContext.getHeaders()
+ .forEach((name, value) -> sb.append("< ").append(name).append(": ").append(value).append("\n"));
+ }
+
+ logger.debug("{}", sb);
+ }
+ }
+
+ public void setAuthorizationHeader(String target, String header) {
+ logger.debug("Set authorization header. target={}, header={}", target, header);
+ authorizationHeaders.put(target, header);
+ }
}
private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create();
private static final JsonParser JSON_PARSER = new JsonParser();
private static final Map<String, Bucket> BUCKET_MAP = new HashMap<>();
+ private static final Object AUTHORIZATION_HEADER_MONITOR = new Object();
+ private static final Object BUCKET_MONITOR = new Object();
private static @Nullable String lastAccessToken = null;
public static String getAuthorizationHeader(OAuthClientService oAuthClientService)
throws AuthorizationException, CommunicationException {
- try {
- AccessTokenResponse accessTokenResponse = oAuthClientService.getAccessTokenResponse();
- // refresh the token if it's about to expire
- if (accessTokenResponse != null
- && accessTokenResponse.isExpired(LocalDateTime.now(), OAUTH_EXPIRE_BUFFER)) {
- LoggerFactory.getLogger(HttpHelper.class).debug("Requesting a refresh of the access token.");
- accessTokenResponse = oAuthClientService.refreshToken();
- }
+ synchronized (AUTHORIZATION_HEADER_MONITOR) {
+ try {
+ AccessTokenResponse accessTokenResponse = oAuthClientService.getAccessTokenResponse();
+ // refresh the token if it's about to expire
+ if (accessTokenResponse != null
+ && accessTokenResponse.isExpired(LocalDateTime.now(), OAUTH_EXPIRE_BUFFER)) {
+ LoggerFactory.getLogger(HttpHelper.class).debug("Requesting a refresh of the access token.");
+ accessTokenResponse = oAuthClientService.refreshToken();
+ }
- if (accessTokenResponse != null) {
- String lastToken = lastAccessToken;
- if (lastToken == null) {
- LoggerFactory.getLogger(HttpHelper.class).debug("The used access token was created at {}",
- accessTokenResponse.getCreatedOn().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
- } else if (!lastToken.equals(accessTokenResponse.getAccessToken())) {
- LoggerFactory.getLogger(HttpHelper.class).debug("The access token changed. New one created at {}",
- accessTokenResponse.getCreatedOn().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
+ if (accessTokenResponse != null) {
+ String lastToken = lastAccessToken;
+ if (lastToken == null) {
+ LoggerFactory.getLogger(HttpHelper.class).debug("The used access token was created at {}",
+ accessTokenResponse.getCreatedOn().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
+ } else if (!lastToken.equals(accessTokenResponse.getAccessToken())) {
+ LoggerFactory.getLogger(HttpHelper.class).debug(
+ "The access token changed. New one created at {}",
+ accessTokenResponse.getCreatedOn().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
+ }
+ lastAccessToken = accessTokenResponse.getAccessToken();
+
+ LoggerFactory.getLogger(HttpHelper.class).debug("Current access token: {}",
+ accessTokenResponse.getAccessToken());
+ return BEARER + accessTokenResponse.getAccessToken();
+ } else {
+ LoggerFactory.getLogger(HttpHelper.class).error("No access token available! Fatal error.");
+ throw new AuthorizationException("No access token available!");
}
- lastAccessToken = accessTokenResponse.getAccessToken();
- return BEARER + accessTokenResponse.getAccessToken();
- } else {
- LoggerFactory.getLogger(HttpHelper.class).error("No access token available! Fatal error.");
- throw new AuthorizationException("No access token available!");
+ } catch (IOException e) {
+ String errorMessage = e.getMessage();
+ throw new CommunicationException(errorMessage != null ? errorMessage : "IOException", e);
+ } catch (OAuthException | OAuthResponseException e) {
+ String errorMessage = e.getMessage();
+ throw new AuthorizationException(errorMessage != null ? errorMessage : "oAuth exception", e);
}
- } catch (IOException e) {
- String errorMessage = e.getMessage();
- throw new CommunicationException(errorMessage != null ? errorMessage : "IOException", e);
- } catch (OAuthException | OAuthResponseException e) {
- String errorMessage = e.getMessage();
- throw new AuthorizationException(errorMessage != null ? errorMessage : "oAuth exception", e);
}
}
return JSON_PARSER.parse(json);
}
- private static synchronized Bucket getBucket(String clientId) {
- Bucket bucket = null;
- if (BUCKET_MAP.containsKey(clientId)) {
- bucket = BUCKET_MAP.get(clientId);
- }
+ private static Bucket getBucket(String clientId) {
+ synchronized (BUCKET_MONITOR) {
+ Bucket bucket = null;
+ if (BUCKET_MAP.containsKey(clientId)) {
+ bucket = BUCKET_MAP.get(clientId);
+ }
- if (bucket == null) {
- bucket = Bucket4j.builder()
- // allows 50 tokens per minute (added 10 second buffer)
- .addLimit(classic(50, intervally(50, Duration.ofSeconds(70))).withInitialTokens(40))
- // but not often then 50 tokens per second
- .addLimit(classic(10, intervally(10, Duration.ofSeconds(1)))).build();
- BUCKET_MAP.put(clientId, bucket);
+ if (bucket == null) {
+ bucket = Bucket4j.builder()
+ // allows 50 tokens per minute (added 10 second buffer)
+ .addLimit(classic(50, intervally(50, Duration.ofSeconds(70))).withInitialTokens(40))
+ // but not often then 50 tokens per second
+ .addLimit(classic(10, intervally(10, Duration.ofSeconds(1)))).build();
+ BUCKET_MAP.put(clientId, bucket);
+ }
+ return bucket;
}
- return bucket;
}
}