| `refresh` | no | 30 | Time in seconds between two refresh calls for the channels of this thing. |
| `timeout` | no | 3000 | Timeout for HTTP requests in ms. |
| `bufferSize` | no | 2048 | The buffer size for the response data (in kB). |
+| `delay` | no | 0 | Delay between two requests in ms (advanced parameter). |
| `username` | yes | - | Username for authentication (advanced parameter). |
| `password` | yes | - | Password for authentication (advanced parameter). |
| `authMode` | no | BASIC | Authentication mode, `BASIC` or `DIGEST` (advanced parameter). |
*Note:* optional "no" means that you have to configure a value unless a default is provided and you are ok with that setting.
+*Note:* If you rate-limit requests by using the `delay` parameter you have to make sure that the time between two refreshes is larger than the time needed for one refresh cycle.
+
## Channels
Each item type has its own channel-type.
@Override
public ValueTransformation getValueTransformation(@Nullable String pattern) {
- if (pattern == null) {
+ if (pattern == null || pattern.isEmpty()) {
return NoOpValueTransformation.getInstance();
}
return new CascadedValueTransformationImpl(pattern,
import org.openhab.binding.http.internal.converter.ItemValueConverter;
import org.openhab.binding.http.internal.converter.PlayerItemConverter;
import org.openhab.binding.http.internal.converter.RollershutterItemConverter;
-import org.openhab.binding.http.internal.http.Content;
-import org.openhab.binding.http.internal.http.HttpAuthException;
-import org.openhab.binding.http.internal.http.HttpResponseListener;
-import org.openhab.binding.http.internal.http.RefreshingUrlCache;
+import org.openhab.binding.http.internal.http.*;
import org.openhab.binding.http.internal.transform.ValueTransformationProvider;
import org.openhab.core.library.types.DateTimeType;
import org.openhab.core.library.types.DecimalType;
private final ValueTransformationProvider valueTransformationProvider;
private final HttpClientProvider httpClientProvider;
private HttpClient httpClient;
+ private RateLimitedHttpClient rateLimitedHttpClient;
private final HttpDynamicStateDescriptionProvider httpDynamicStateDescriptionProvider;
private HttpThingConfig config = new HttpThingConfig();
super(thing);
this.httpClientProvider = httpClientProvider;
this.httpClient = httpClientProvider.getSecureClient();
+ this.rateLimitedHttpClient = new RateLimitedHttpClient(httpClient, scheduler);
this.valueTransformationProvider = valueTransformationProvider;
this.httpDynamicStateDescriptionProvider = httpDynamicStateDescriptionProvider;
}
"Parameter baseURL must not be empty!");
return;
}
+
+ if (config.ignoreSSLErrors) {
+ logger.info("Using the insecure client for thing '{}'.", thing.getUID());
+ httpClient = httpClientProvider.getInsecureClient();
+ } else {
+ logger.info("Using the secure client for thing '{}'.", thing.getUID());
+ httpClient = httpClientProvider.getSecureClient();
+ }
+ rateLimitedHttpClient.setHttpClient(httpClient);
+ rateLimitedHttpClient.setDelay(config.delay);
+
+ int channelCount = thing.getChannels().size();
+ if (channelCount * config.delay > config.refresh * 1000) {
+ // this should prevent the rate limit queue from filling up
+ config.refresh = (channelCount * config.delay) / 1000 + 1;
+ logger.warn(
+ "{} channels in thing {} with a delay of {} incompatible with the configured refresh time. Refresh-Time increased to the minimum of {}",
+ channelCount, thing.getUID(), config.delay, config.refresh);
+ }
+
authentication = null;
if (!config.username.isEmpty()) {
try {
logger.debug("No authentication configured for thing '{}'", thing.getUID());
}
- if (config.ignoreSSLErrors) {
- logger.info("Using the insecure client for thing '{}'.", thing.getUID());
- httpClient = httpClientProvider.getInsecureClient();
- } else {
- logger.info("Using the secure client for thing '{}'.", thing.getUID());
- httpClient = httpClientProvider.getSecureClient();
- }
-
thing.getChannels().forEach(this::createChannel);
updateStatus(ThingStatus.ONLINE);
public void dispose() {
// stop update tasks
urlHandlers.values().forEach(RefreshingUrlCache::stop);
+ rateLimitedHttpClient.shutdown();
// clear lists
urlHandlers.clear();
channels.put(channelUID, itemValueConverter);
if (channelConfig.mode != HttpChannelMode.WRITEONLY) {
channelUrls.put(channelUID, stateUrl);
- urlHandlers.computeIfAbsent(stateUrl, url -> new RefreshingUrlCache(scheduler, httpClient, url, config))
+ urlHandlers
+ .computeIfAbsent(stateUrl,
+ url -> new RefreshingUrlCache(scheduler, rateLimitedHttpClient, url, config))
.addConsumer(itemValueConverter::process);
}
public String baseURL = "";
public int refresh = 30;
public int timeout = 3000;
+ public int delay = 0;
public String username = "";
public String password = "";
--- /dev/null
+/**
+ * Copyright (c) 2010-2020 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.binding.http.internal.http;
+
+import java.net.URI;
+import java.util.concurrent.*;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.AuthenticationStore;
+import org.eclipse.jetty.client.api.Request;
+
+/**
+ * The {@link RateLimitedHttpClient} is a wrapper for a Jetty HTTP client that limits the number of requests by delaying
+ * the request creation
+ *
+ * @author Jan N. Klug - Initial contribution
+ */
+@NonNullByDefault
+public class RateLimitedHttpClient {
+ private static final int MAX_QUEUE_SIZE = 1000; // maximum queue size
+ private HttpClient httpClient;
+ private int delay = 0; // in ms
+ private final ScheduledExecutorService scheduler;
+ private final LinkedBlockingQueue<RequestQueueEntry> requestQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE);
+
+ private @Nullable ScheduledFuture<?> processJob;
+
+ public RateLimitedHttpClient(HttpClient httpClient, ScheduledExecutorService scheduler) {
+ this.httpClient = httpClient;
+ this.scheduler = scheduler;
+ }
+
+ /**
+ * Stop processing the queue and clear it
+ */
+ public void shutdown() {
+ stopProcessJob();
+ requestQueue.forEach(queueEntry -> queueEntry.future.completeExceptionally(new CancellationException()));
+ }
+
+ /**
+ * Set a new delay
+ *
+ * @param delay in ms between to requests
+ */
+ public void setDelay(int delay) {
+ if (delay < 0) {
+ throw new IllegalArgumentException("Delay needs to be larger or equal to zero");
+ }
+ this.delay = delay;
+ stopProcessJob();
+ if (delay != 0) {
+ processJob = scheduler.scheduleWithFixedDelay(this::processQueue, 0, delay, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ * Set the HTTP client
+ *
+ * @param httpClient secure or insecure Jetty http client
+ */
+ public void setHttpClient(HttpClient httpClient) {
+ this.httpClient = httpClient;
+ }
+
+ /**
+ * Create a new request to the given URL respecting rate-limits
+ *
+ * @param finalUrl the request URL
+ * @return a CompletableFuture that completes with the request
+ */
+ public CompletableFuture<Request> newRequest(URI finalUrl) {
+ // if no delay is set, return a completed CompletableFuture
+ if (delay == 0) {
+ return CompletableFuture.completedFuture(httpClient.newRequest(finalUrl));
+ }
+ CompletableFuture<Request> future = new CompletableFuture<>();
+ if (!requestQueue.offer(new RequestQueueEntry(finalUrl, future))) {
+ future.completeExceptionally(new RejectedExecutionException("Maximum queue size exceeded."));
+ }
+ return future;
+ }
+
+ /**
+ * Get the AuthenticationStore from the wrapped client
+ *
+ * @return
+ */
+ public AuthenticationStore getAuthenticationStore() {
+ return httpClient.getAuthenticationStore();
+ }
+
+ private void stopProcessJob() {
+ ScheduledFuture<?> processJob = this.processJob;
+ if (processJob != null) {
+ processJob.cancel(false);
+ this.processJob = null;
+ }
+ }
+
+ private void processQueue() {
+ RequestQueueEntry queueEntry = requestQueue.poll();
+ if (queueEntry != null) {
+ queueEntry.future.complete(httpClient.newRequest(queueEntry.finalUrl));
+ }
+ }
+
+ private static class RequestQueueEntry {
+ public URI finalUrl;
+ public CompletableFuture<Request> future;
+
+ public RequestQueueEntry(URI finalUrl, CompletableFuture<Request> future) {
+ this.finalUrl = finalUrl;
+ this.future = future;
+ }
+ }
+}
import java.util.List;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.function.Consumer;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
-import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Authentication;
import org.eclipse.jetty.client.api.AuthenticationStore;
-import org.eclipse.jetty.client.api.Request;
import org.openhab.binding.http.internal.Util;
import org.openhab.binding.http.internal.config.HttpThingConfig;
import org.slf4j.Logger;
private final Logger logger = LoggerFactory.getLogger(RefreshingUrlCache.class);
private final String url;
- private final HttpClient httpClient;
+ private final RateLimitedHttpClient httpClient;
private final int timeout;
private final int bufferSize;
private final @Nullable String fallbackEncoding;
private final ScheduledFuture<?> future;
private @Nullable Content lastContent;
- public RefreshingUrlCache(ScheduledExecutorService executor, HttpClient httpClient, String url,
+ public RefreshingUrlCache(ScheduledExecutorService executor, RateLimitedHttpClient httpClient, String url,
HttpThingConfig thingConfig) {
this.httpClient = httpClient;
this.url = url;
URI finalUrl = new URI(String.format(this.url, new Date()));
logger.trace("Requesting refresh (retry={}) from '{}' with timeout {}ms", isRetry, finalUrl, timeout);
- Request request = httpClient.newRequest(finalUrl).timeout(timeout, TimeUnit.MILLISECONDS);
- headers.forEach(header -> {
- String[] keyValuePair = header.split("=", 2);
- if (keyValuePair.length == 2) {
- request.header(keyValuePair[0].trim(), keyValuePair[1].trim());
- } else {
- logger.warn("Splitting header '{}' failed. No '=' was found. Ignoring", header);
- }
- });
+ httpClient.newRequest(finalUrl).thenAccept(request -> {
+ request.timeout(timeout, TimeUnit.MILLISECONDS);
- CompletableFuture<@Nullable Content> response = new CompletableFuture<>();
- response.exceptionally(e -> {
- if (e instanceof HttpAuthException) {
- if (isRetry) {
- logger.warn("Retry after authentication failure failed again for '{}', failing here",
- finalUrl);
+ headers.forEach(header -> {
+ String[] keyValuePair = header.split("=", 2);
+ if (keyValuePair.length == 2) {
+ request.header(keyValuePair[0].trim(), keyValuePair[1].trim());
} else {
- AuthenticationStore authStore = httpClient.getAuthenticationStore();
- Authentication.Result authResult = authStore.findAuthenticationResult(finalUrl);
- if (authResult != null) {
- authStore.removeAuthenticationResult(authResult);
- logger.debug("Cleared authentication result for '{}', retrying immediately", finalUrl);
- refresh(true);
+ logger.warn("Splitting header '{}' failed. No '=' was found. Ignoring", header);
+ }
+ });
+
+ CompletableFuture<@Nullable Content> response = new CompletableFuture<>();
+ response.exceptionally(e -> {
+ if (e instanceof HttpAuthException) {
+ if (isRetry) {
+ logger.warn("Retry after authentication failure failed again for '{}', failing here",
+ finalUrl);
} else {
- logger.warn("Could not find authentication result for '{}', failing here", finalUrl);
+ AuthenticationStore authStore = httpClient.getAuthenticationStore();
+ Authentication.Result authResult = authStore.findAuthenticationResult(finalUrl);
+ if (authResult != null) {
+ authStore.removeAuthenticationResult(authResult);
+ logger.debug("Cleared authentication result for '{}', retrying immediately", finalUrl);
+ refresh(true);
+ } else {
+ logger.warn("Could not find authentication result for '{}', failing here", finalUrl);
+ }
}
}
- }
- return null;
- }).thenAccept(this::processResult);
+ return null;
+ }).thenAccept(this::processResult);
- if (logger.isTraceEnabled()) {
- logger.trace("Sending to '{}': {}", finalUrl, Util.requestToLogString(request));
- }
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending to '{}': {}", finalUrl, Util.requestToLogString(request));
+ }
- request.send(new HttpResponseListener(response, fallbackEncoding, bufferSize));
+ request.send(new HttpResponseListener(response, fallbackEncoding, bufferSize));
+ }).exceptionally(e -> {
+ if (e instanceof CancellationException) {
+ logger.debug("Request to URL {} was cancelled by thing handler.", finalUrl);
+ } else {
+ logger.warn("Request to URL {} failed: {}", finalUrl, e.getMessage());
+ }
+ return null;
+ });
} catch (IllegalArgumentException | URISyntaxException e) {
logger.warn("Creating request for '{}' failed: {}", url, e.getMessage());
}
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.transform.TransformationService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The {@link CascadedValueTransformationImpl} implements {@link ValueTransformation for a cascaded set of
*/
@NonNullByDefault
public class CascadedValueTransformationImpl implements ValueTransformation {
+ private final Logger logger = LoggerFactory.getLogger(CascadedValueTransformationImpl.class);
private final List<ValueTransformation> transformations;
public CascadedValueTransformationImpl(String transformationString,
Function<String, @Nullable TransformationService> transformationServiceSupplier) {
- transformations = Arrays.stream(transformationString.split("∩")).filter(s -> !s.isEmpty())
- .map(transformation -> new SingleValueTransformation(transformation, transformationServiceSupplier))
- .collect(Collectors.toList());
+ List<ValueTransformation> transformations;
+ try {
+ transformations = Arrays.stream(transformationString.split("∩")).filter(s -> !s.isEmpty())
+ .map(transformation -> new SingleValueTransformation(transformation, transformationServiceSupplier))
+ .collect(Collectors.toList());
+ } catch (IllegalArgumentException e) {
+ transformations = List.of(NoOpValueTransformation.getInstance());
+ logger.warn("Transformation ignore, failed to parse {}: {}", transformationString, e.getMessage());
+ }
+ this.transformations = transformations;
}
@Override
<description>The timeout in ms for each request</description>
<default>3000</default>
</parameter>
+ <parameter name="delay" type="integer" unit="ms" min="0">
+ <label>Delay</label>
+ <description>Delay between to requests</description>
+ <default>0</default>
+ <advanced>true</advanced>
+ </parameter>
<parameter name="bufferSize" type="integer" min="0">
<label>Buffer Size</label>
<description>Size of the response buffer (default 2048 kB)</description>