}
rateLimitedHttpClient.setDelay(config.delay);
- int urlHandlerCount = urlHandlers.size();
- if (urlHandlerCount * config.delay > config.refresh * 1000) {
- // this should prevent the rate limit queue from filling up
- config.refresh = (urlHandlerCount * config.delay) / 1000 + 1;
- logger.warn(
- "{} channels in thing {} with a delay of {} incompatible with the configured refresh time. Refresh-Time increased to the minimum of {}",
- urlHandlerCount, thing.getUID(), config.delay, config.refresh);
- }
-
// remove empty headers
config.headers.removeIf(String::isBlank);
// create channels
thing.getChannels().forEach(this::createChannel);
+ int urlHandlerCount = urlHandlers.size();
+ if (urlHandlerCount * config.delay > config.refresh * 1000) {
+ // this should prevent the rate limit queue from filling up
+ config.refresh = (urlHandlerCount * config.delay) / 1000 + 1;
+ logger.warn(
+ "{} channels in thing {} with a delay of {} incompatible with the configured refresh time. Refresh-Time increased to the minimum of {}",
+ urlHandlerCount, thing.getUID(), config.delay, config.refresh);
+ }
+
+ urlHandlers.values().forEach(urlHandler -> urlHandler.start(scheduler, config.refresh));
+
updateStatus(ThingStatus.UNKNOWN);
}
// we need a key consisting of stateContent and URL, only if both are equal, we can use the same cache
String key = channelConfig.stateContent + "$" + stateUrl;
channelUrls.put(channelUID, key);
- Objects.requireNonNull(urlHandlers.computeIfAbsent(key,
- k -> new RefreshingUrlCache(scheduler, rateLimitedHttpClient, stateUrl, config,
- channelConfig.stateContent, config.contentType, this)))
+ Objects.requireNonNull(
+ urlHandlers.computeIfAbsent(key,
+ k -> new RefreshingUrlCache(rateLimitedHttpClient, stateUrl, config,
+ channelConfig.stateContent, config.contentType, this)))
.addConsumer(itemValueConverter::process);
}
private final @Nullable String httpContentType;
private final HttpStatusListener httpStatusListener;
- private final ScheduledFuture<?> future;
+ private @Nullable ScheduledFuture<?> future;
private @Nullable ChannelHandlerContent lastContent;
- public RefreshingUrlCache(ScheduledExecutorService executor, RateLimitedHttpClient httpClient, String url,
- HttpThingConfig thingConfig, String httpContent, @Nullable String httpContentType,
- HttpStatusListener httpStatusListener) {
+ public RefreshingUrlCache(RateLimitedHttpClient httpClient, String url, HttpThingConfig thingConfig,
+ String httpContent, @Nullable String httpContentType, HttpStatusListener httpStatusListener) {
this.httpClient = httpClient;
this.url = url;
this.strictErrorHandling = thingConfig.strictErrorHandling;
this.httpContentType = httpContentType;
this.httpStatusListener = httpStatusListener;
fallbackEncoding = thingConfig.encoding;
+ }
- future = executor.scheduleWithFixedDelay(this::refresh, 1, thingConfig.refresh, TimeUnit.SECONDS);
- logger.trace("Started refresh task for URL '{}' with interval {}s", url, thingConfig.refresh);
+ public void start(ScheduledExecutorService executor, int refreshTime) {
+ if (future != null) {
+ logger.warn("Starting refresh task requested but it is already started. This is bug.");
+ return;
+ }
+ future = executor.scheduleWithFixedDelay(this::refresh, 1, refreshTime, TimeUnit.SECONDS);
+ logger.trace("Started refresh task for URL '{}' with interval {}s", url, refreshTime);
+ }
+
+ public void stop() {
+ // clearing all listeners to prevent further updates
+ consumers.clear();
+ ScheduledFuture<?> future = this.future;
+ if (future != null) {
+ future.cancel(true);
+ logger.trace("Stopped refresh task for URL '{}'", url);
+ }
}
private void refresh() {
}
}
- public void stop() {
- // clearing all listeners to prevent further updates
- consumers.clear();
- future.cancel(false);
- logger.trace("Stopped refresh task for URL '{}'", url);
- }
-
public void addConsumer(Consumer<@Nullable ChannelHandlerContent> consumer) {
consumers.add(consumer);
}
assertThat((int) msBetween, allOf(greaterThanOrEqualTo(1000), lessThan(1100)));
}
- private List<Response> doLimitTest(int setDelay, List<Boolean> config) {
+ private void doLimitTest(int setDelay, List<Boolean> config) {
stubFor(get(urlEqualTo(TEST_LOCATION)).willReturn(aResponse().withBody(TEST_CONTENT)));
RateLimitedHttpClient rateLimitedHttpClient = new RateLimitedHttpClient(httpClient, scheduler);
// wait until we got all results
waitForAssert(() -> assertEquals(config.size(), responses.size()));
rateLimitedHttpClient.shutdown();
-
- return responses;
}
private static class Response {
* @return the cache object
*/
private RefreshingUrlCache getUrlCache(String content) {
- RefreshingUrlCache urlCache = new RefreshingUrlCache(scheduler, rateLimitedHttpClient, url, thingConfig,
- content, null, statusListener);
+ RefreshingUrlCache urlCache = new RefreshingUrlCache(rateLimitedHttpClient, url, thingConfig, content, null,
+ statusListener);
urlCache.addConsumer(contentWrappers::add);
-
+ urlCache.start(scheduler, thingConfig.refresh);
return urlCache;
}
}