]> git.basschouten.com Git - openhab-addons.git/commitdiff
[http] Add rate-limiting to channel refresh (#9509)
authorJ-N-K <J-N-K@users.noreply.github.com>
Mon, 28 Dec 2020 17:18:06 +0000 (18:18 +0100)
committerGitHub <noreply@github.com>
Mon, 28 Dec 2020 17:18:06 +0000 (09:18 -0800)
* add rate limiter for requests and catch transformation exception
* address review comment
* address review comments

Signed-off-by: Jan N. Klug <jan.n.klug@rub.de>
bundles/org.openhab.binding.http/README.md
bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/HttpHandlerFactory.java
bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/HttpThingHandler.java
bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/config/HttpThingConfig.java
bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/http/RateLimitedHttpClient.java [new file with mode: 0644]
bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/http/RefreshingUrlCache.java
bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/transform/CascadedValueTransformationImpl.java
bundles/org.openhab.binding.http/src/main/resources/OH-INF/thing/thing-types.xml

index 74b5084d4ac8dc8c5adf54b0f899256215fe96dc..44dce4aad075626f255542c6942f8192bc70b2c0 100644 (file)
@@ -15,6 +15,7 @@ It can be extended with different channels.
 | `refresh`         | no       |   30    | Time in seconds between two refresh calls for the channels of this thing. |
 | `timeout`         | no       |  3000   | Timeout for HTTP requests in ms. |
 | `bufferSize`      | no       |  2048   | The buffer size for the response data (in kB). |
+| `delay`           | no       |    0    | Delay between two requests in ms (advanced parameter). |
 | `username`        | yes      |    -    | Username for authentication (advanced parameter). |
 | `password`        | yes      |    -    | Password for authentication (advanced parameter). |
 | `authMode`        | no       |  BASIC  | Authentication mode, `BASIC` or `DIGEST` (advanced parameter). |
@@ -26,6 +27,8 @@ It can be extended with different channels.
 
 *Note:* optional "no" means that you have to configure a value unless a default is provided and you are ok with that setting.
 
+*Note:* If you rate-limit requests by using the `delay` parameter you have to make sure that the time between two refreshes is larger than the time needed for one refresh cycle.
+
 ## Channels
 
 Each item type has its own channel-type.
index 7b1444156746513ba20b1c88ae5c225bda6d557e..ae153ca51be855beeb87b1dedb82b45d5c27711b 100644 (file)
@@ -101,7 +101,7 @@ public class HttpHandlerFactory extends BaseThingHandlerFactory
 
     @Override
     public ValueTransformation getValueTransformation(@Nullable String pattern) {
-        if (pattern == null) {
+        if (pattern == null || pattern.isEmpty()) {
             return NoOpValueTransformation.getInstance();
         }
         return new CascadedValueTransformationImpl(pattern,
index f7ce74a0fad209019c7becdf7473ac76da59806c..343145a92e4779613bfcfe043530903dfcb57802 100644 (file)
@@ -44,10 +44,7 @@ import org.openhab.binding.http.internal.converter.ImageItemConverter;
 import org.openhab.binding.http.internal.converter.ItemValueConverter;
 import org.openhab.binding.http.internal.converter.PlayerItemConverter;
 import org.openhab.binding.http.internal.converter.RollershutterItemConverter;
-import org.openhab.binding.http.internal.http.Content;
-import org.openhab.binding.http.internal.http.HttpAuthException;
-import org.openhab.binding.http.internal.http.HttpResponseListener;
-import org.openhab.binding.http.internal.http.RefreshingUrlCache;
+import org.openhab.binding.http.internal.http.*;
 import org.openhab.binding.http.internal.transform.ValueTransformationProvider;
 import org.openhab.core.library.types.DateTimeType;
 import org.openhab.core.library.types.DecimalType;
@@ -81,6 +78,7 @@ public class HttpThingHandler extends BaseThingHandler {
     private final ValueTransformationProvider valueTransformationProvider;
     private final HttpClientProvider httpClientProvider;
     private HttpClient httpClient;
+    private RateLimitedHttpClient rateLimitedHttpClient;
     private final HttpDynamicStateDescriptionProvider httpDynamicStateDescriptionProvider;
 
     private HttpThingConfig config = new HttpThingConfig();
@@ -95,6 +93,7 @@ public class HttpThingHandler extends BaseThingHandler {
         super(thing);
         this.httpClientProvider = httpClientProvider;
         this.httpClient = httpClientProvider.getSecureClient();
+        this.rateLimitedHttpClient = new RateLimitedHttpClient(httpClient, scheduler);
         this.valueTransformationProvider = valueTransformationProvider;
         this.httpDynamicStateDescriptionProvider = httpDynamicStateDescriptionProvider;
     }
@@ -139,6 +138,26 @@ public class HttpThingHandler extends BaseThingHandler {
                     "Parameter baseURL must not be empty!");
             return;
         }
+
+        if (config.ignoreSSLErrors) {
+            logger.info("Using the insecure client for thing '{}'.", thing.getUID());
+            httpClient = httpClientProvider.getInsecureClient();
+        } else {
+            logger.info("Using the secure client for thing '{}'.", thing.getUID());
+            httpClient = httpClientProvider.getSecureClient();
+        }
+        rateLimitedHttpClient.setHttpClient(httpClient);
+        rateLimitedHttpClient.setDelay(config.delay);
+
+        int channelCount = thing.getChannels().size();
+        if (channelCount * config.delay > config.refresh * 1000) {
+            // this should prevent the rate limit queue from filling up
+            config.refresh = (channelCount * config.delay) / 1000 + 1;
+            logger.warn(
+                    "{} channels in thing {} with a delay of {} incompatible with the configured refresh time. Refresh-Time increased to the minimum of {}",
+                    channelCount, thing.getUID(), config.delay, config.refresh);
+        }
+
         authentication = null;
         if (!config.username.isEmpty()) {
             try {
@@ -170,14 +189,6 @@ public class HttpThingHandler extends BaseThingHandler {
             logger.debug("No authentication configured for thing '{}'", thing.getUID());
         }
 
-        if (config.ignoreSSLErrors) {
-            logger.info("Using the insecure client for thing '{}'.", thing.getUID());
-            httpClient = httpClientProvider.getInsecureClient();
-        } else {
-            logger.info("Using the secure client for thing '{}'.", thing.getUID());
-            httpClient = httpClientProvider.getSecureClient();
-        }
-
         thing.getChannels().forEach(this::createChannel);
 
         updateStatus(ThingStatus.ONLINE);
@@ -187,6 +198,7 @@ public class HttpThingHandler extends BaseThingHandler {
     public void dispose() {
         // stop update tasks
         urlHandlers.values().forEach(RefreshingUrlCache::stop);
+        rateLimitedHttpClient.shutdown();
 
         // clear lists
         urlHandlers.clear();
@@ -266,7 +278,9 @@ public class HttpThingHandler extends BaseThingHandler {
         channels.put(channelUID, itemValueConverter);
         if (channelConfig.mode != HttpChannelMode.WRITEONLY) {
             channelUrls.put(channelUID, stateUrl);
-            urlHandlers.computeIfAbsent(stateUrl, url -> new RefreshingUrlCache(scheduler, httpClient, url, config))
+            urlHandlers
+                    .computeIfAbsent(stateUrl,
+                            url -> new RefreshingUrlCache(scheduler, rateLimitedHttpClient, url, config))
                     .addConsumer(itemValueConverter::process);
         }
 
index 19aeae84d04eaf9f9ead8ff3cb7a9697ac315387..258f94466a97f0d0eb2bcb4193d3c384db44a112 100644 (file)
@@ -29,6 +29,7 @@ public class HttpThingConfig {
     public String baseURL = "";
     public int refresh = 30;
     public int timeout = 3000;
+    public int delay = 0;
 
     public String username = "";
     public String password = "";
diff --git a/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/http/RateLimitedHttpClient.java b/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/http/RateLimitedHttpClient.java
new file mode 100644 (file)
index 0000000..d639c04
--- /dev/null
@@ -0,0 +1,129 @@
+/**
+ * Copyright (c) 2010-2020 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.binding.http.internal.http;
+
+import java.net.URI;
+import java.util.concurrent.*;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.AuthenticationStore;
+import org.eclipse.jetty.client.api.Request;
+
+/**
+ * The {@link RateLimitedHttpClient} is a wrapper for a Jetty HTTP client that limits the number of requests by delaying
+ * the request creation
+ *
+ * @author Jan N. Klug - Initial contribution
+ */
+@NonNullByDefault
+public class RateLimitedHttpClient {
+    private static final int MAX_QUEUE_SIZE = 1000; // maximum queue size
+    private HttpClient httpClient;
+    private int delay = 0; // in ms
+    private final ScheduledExecutorService scheduler;
+    private final LinkedBlockingQueue<RequestQueueEntry> requestQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE);
+
+    private @Nullable ScheduledFuture<?> processJob;
+
+    public RateLimitedHttpClient(HttpClient httpClient, ScheduledExecutorService scheduler) {
+        this.httpClient = httpClient;
+        this.scheduler = scheduler;
+    }
+
+    /**
+     * Stop processing the queue and clear it
+     */
+    public void shutdown() {
+        stopProcessJob();
+        requestQueue.forEach(queueEntry -> queueEntry.future.completeExceptionally(new CancellationException()));
+    }
+
+    /**
+     * Set a new delay
+     * 
+     * @param delay in ms between to requests
+     */
+    public void setDelay(int delay) {
+        if (delay < 0) {
+            throw new IllegalArgumentException("Delay needs to be larger or equal to zero");
+        }
+        this.delay = delay;
+        stopProcessJob();
+        if (delay != 0) {
+            processJob = scheduler.scheduleWithFixedDelay(this::processQueue, 0, delay, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    /**
+     * Set the HTTP client
+     *
+     * @param httpClient secure or insecure Jetty http client
+     */
+    public void setHttpClient(HttpClient httpClient) {
+        this.httpClient = httpClient;
+    }
+
+    /**
+     * Create a new request to the given URL respecting rate-limits
+     *
+     * @param finalUrl the request URL
+     * @return a CompletableFuture that completes with the request
+     */
+    public CompletableFuture<Request> newRequest(URI finalUrl) {
+        // if no delay is set, return a completed CompletableFuture
+        if (delay == 0) {
+            return CompletableFuture.completedFuture(httpClient.newRequest(finalUrl));
+        }
+        CompletableFuture<Request> future = new CompletableFuture<>();
+        if (!requestQueue.offer(new RequestQueueEntry(finalUrl, future))) {
+            future.completeExceptionally(new RejectedExecutionException("Maximum queue size exceeded."));
+        }
+        return future;
+    }
+
+    /**
+     * Get the AuthenticationStore from the wrapped client
+     *
+     * @return
+     */
+    public AuthenticationStore getAuthenticationStore() {
+        return httpClient.getAuthenticationStore();
+    }
+
+    private void stopProcessJob() {
+        ScheduledFuture<?> processJob = this.processJob;
+        if (processJob != null) {
+            processJob.cancel(false);
+            this.processJob = null;
+        }
+    }
+
+    private void processQueue() {
+        RequestQueueEntry queueEntry = requestQueue.poll();
+        if (queueEntry != null) {
+            queueEntry.future.complete(httpClient.newRequest(queueEntry.finalUrl));
+        }
+    }
+
+    private static class RequestQueueEntry {
+        public URI finalUrl;
+        public CompletableFuture<Request> future;
+
+        public RequestQueueEntry(URI finalUrl, CompletableFuture<Request> future) {
+            this.finalUrl = finalUrl;
+            this.future = future;
+        }
+    }
+}
index 0a0a5a5de2063b6a4a012e0ab67d8a5665f834dc..f0ae9c9ec1e2eb5f048e3d6a4e0c36fa106ec621 100644 (file)
@@ -18,19 +18,13 @@ import java.util.Date;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.function.Consumer;
 
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
-import org.eclipse.jetty.client.HttpClient;
 import org.eclipse.jetty.client.api.Authentication;
 import org.eclipse.jetty.client.api.AuthenticationStore;
-import org.eclipse.jetty.client.api.Request;
 import org.openhab.binding.http.internal.Util;
 import org.openhab.binding.http.internal.config.HttpThingConfig;
 import org.slf4j.Logger;
@@ -47,7 +41,7 @@ public class RefreshingUrlCache {
     private final Logger logger = LoggerFactory.getLogger(RefreshingUrlCache.class);
 
     private final String url;
-    private final HttpClient httpClient;
+    private final RateLimitedHttpClient httpClient;
     private final int timeout;
     private final int bufferSize;
     private final @Nullable String fallbackEncoding;
@@ -57,7 +51,7 @@ public class RefreshingUrlCache {
     private final ScheduledFuture<?> future;
     private @Nullable Content lastContent;
 
-    public RefreshingUrlCache(ScheduledExecutorService executor, HttpClient httpClient, String url,
+    public RefreshingUrlCache(ScheduledExecutorService executor, RateLimitedHttpClient httpClient, String url,
             HttpThingConfig thingConfig) {
         this.httpClient = httpClient;
         this.url = url;
@@ -85,43 +79,53 @@ public class RefreshingUrlCache {
             URI finalUrl = new URI(String.format(this.url, new Date()));
 
             logger.trace("Requesting refresh (retry={}) from '{}' with timeout {}ms", isRetry, finalUrl, timeout);
-            Request request = httpClient.newRequest(finalUrl).timeout(timeout, TimeUnit.MILLISECONDS);
 
-            headers.forEach(header -> {
-                String[] keyValuePair = header.split("=", 2);
-                if (keyValuePair.length == 2) {
-                    request.header(keyValuePair[0].trim(), keyValuePair[1].trim());
-                } else {
-                    logger.warn("Splitting header '{}' failed. No '=' was found. Ignoring", header);
-                }
-            });
+            httpClient.newRequest(finalUrl).thenAccept(request -> {
+                request.timeout(timeout, TimeUnit.MILLISECONDS);
 
-            CompletableFuture<@Nullable Content> response = new CompletableFuture<>();
-            response.exceptionally(e -> {
-                if (e instanceof HttpAuthException) {
-                    if (isRetry) {
-                        logger.warn("Retry after authentication  failure failed again for '{}', failing here",
-                                finalUrl);
+                headers.forEach(header -> {
+                    String[] keyValuePair = header.split("=", 2);
+                    if (keyValuePair.length == 2) {
+                        request.header(keyValuePair[0].trim(), keyValuePair[1].trim());
                     } else {
-                        AuthenticationStore authStore = httpClient.getAuthenticationStore();
-                        Authentication.Result authResult = authStore.findAuthenticationResult(finalUrl);
-                        if (authResult != null) {
-                            authStore.removeAuthenticationResult(authResult);
-                            logger.debug("Cleared authentication result for '{}', retrying immediately", finalUrl);
-                            refresh(true);
+                        logger.warn("Splitting header '{}' failed. No '=' was found. Ignoring", header);
+                    }
+                });
+
+                CompletableFuture<@Nullable Content> response = new CompletableFuture<>();
+                response.exceptionally(e -> {
+                    if (e instanceof HttpAuthException) {
+                        if (isRetry) {
+                            logger.warn("Retry after authentication  failure failed again for '{}', failing here",
+                                    finalUrl);
                         } else {
-                            logger.warn("Could not find authentication result for '{}', failing here", finalUrl);
+                            AuthenticationStore authStore = httpClient.getAuthenticationStore();
+                            Authentication.Result authResult = authStore.findAuthenticationResult(finalUrl);
+                            if (authResult != null) {
+                                authStore.removeAuthenticationResult(authResult);
+                                logger.debug("Cleared authentication result for '{}', retrying immediately", finalUrl);
+                                refresh(true);
+                            } else {
+                                logger.warn("Could not find authentication result for '{}', failing here", finalUrl);
+                            }
                         }
                     }
-                }
-                return null;
-            }).thenAccept(this::processResult);
+                    return null;
+                }).thenAccept(this::processResult);
 
-            if (logger.isTraceEnabled()) {
-                logger.trace("Sending to '{}': {}", finalUrl, Util.requestToLogString(request));
-            }
+                if (logger.isTraceEnabled()) {
+                    logger.trace("Sending to '{}': {}", finalUrl, Util.requestToLogString(request));
+                }
 
-            request.send(new HttpResponseListener(response, fallbackEncoding, bufferSize));
+                request.send(new HttpResponseListener(response, fallbackEncoding, bufferSize));
+            }).exceptionally(e -> {
+                if (e instanceof CancellationException) {
+                    logger.debug("Request to URL {} was cancelled by thing handler.", finalUrl);
+                } else {
+                    logger.warn("Request to URL {} failed: {}", finalUrl, e.getMessage());
+                }
+                return null;
+            });
         } catch (IllegalArgumentException | URISyntaxException e) {
             logger.warn("Creating request for '{}' failed: {}", url, e.getMessage());
         }
index a163c9fdc7888f9b067ec8ea94529e6c381a6b87..e5e8c9c02acacc2a9c51eeb5305e820eb9e4aef3 100644 (file)
@@ -21,6 +21,8 @@ import java.util.stream.Collectors;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
 import org.openhab.core.transform.TransformationService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The {@link CascadedValueTransformationImpl} implements {@link ValueTransformation for a cascaded set of
@@ -30,13 +32,21 @@ import org.openhab.core.transform.TransformationService;
  */
 @NonNullByDefault
 public class CascadedValueTransformationImpl implements ValueTransformation {
+    private final Logger logger = LoggerFactory.getLogger(CascadedValueTransformationImpl.class);
     private final List<ValueTransformation> transformations;
 
     public CascadedValueTransformationImpl(String transformationString,
             Function<String, @Nullable TransformationService> transformationServiceSupplier) {
-        transformations = Arrays.stream(transformationString.split("∩")).filter(s -> !s.isEmpty())
-                .map(transformation -> new SingleValueTransformation(transformation, transformationServiceSupplier))
-                .collect(Collectors.toList());
+        List<ValueTransformation> transformations;
+        try {
+            transformations = Arrays.stream(transformationString.split("∩")).filter(s -> !s.isEmpty())
+                    .map(transformation -> new SingleValueTransformation(transformation, transformationServiceSupplier))
+                    .collect(Collectors.toList());
+        } catch (IllegalArgumentException e) {
+            transformations = List.of(NoOpValueTransformation.getInstance());
+            logger.warn("Transformation ignore, failed to parse {}: {}", transformationString, e.getMessage());
+        }
+        this.transformations = transformations;
     }
 
     @Override
index b3907c83c8710395816b41546b1e6d15b1157578..f298a53c5df81eb8de9b2d6e4b7ce20391ebbc4b 100644 (file)
                                <description>The timeout in ms for each request</description>
                                <default>3000</default>
                        </parameter>
+                       <parameter name="delay" type="integer" unit="ms" min="0">
+                               <label>Delay</label>
+                               <description>Delay between to requests</description>
+                               <default>0</default>
+                               <advanced>true</advanced>
+                       </parameter>
                        <parameter name="bufferSize" type="integer" min="0">
                                <label>Buffer Size</label>
                                <description>Size of the response buffer (default 2048 kB)</description>