2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.tesla.internal.throttler;
15 import java.util.HashMap;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.Executors;
19 import java.util.concurrent.Future;
20 import java.util.concurrent.FutureTask;
21 import java.util.concurrent.LinkedBlockingQueue;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
29 * The {@link QueueChannelThrottler} implements a throttler that maintains
30 * multiple execution rates, and maintains the order of calls
32 * @author Karel Goderis - Initial contribution
34 public final class QueueChannelThrottler extends AbstractMultiRateChannelThrottler {
36 private final Logger logger = LoggerFactory.getLogger(QueueChannelThrottler.class);
38 private static final int MAX_QUEUE_LENGTH = 150;
39 private BlockingQueue<FutureTask<?>> tasks;
40 private final Rate overallRate;
42 private final Runnable processQueueTask = () -> {
43 FutureTask<?> task = tasks.poll();
44 if (task != null && !task.isCancelled()) {
49 public QueueChannelThrottler(Rate someRate) {
50 this(someRate, Executors.newScheduledThreadPool(1), new HashMap<>(), TimeProvider.SYSTEM_PROVIDER,
54 public QueueChannelThrottler(Rate someRate, ScheduledExecutorService scheduler) {
55 this(someRate, scheduler, new HashMap<>(), TimeProvider.SYSTEM_PROVIDER, MAX_QUEUE_LENGTH);
58 public QueueChannelThrottler(Rate someRate, ScheduledExecutorService scheduler, Map<Object, Rate> channels) {
59 this(someRate, scheduler, channels, TimeProvider.SYSTEM_PROVIDER, MAX_QUEUE_LENGTH);
62 public QueueChannelThrottler(Rate someRate, Map<Object, Rate> channels, int queueLength) {
63 this(someRate, Executors.newScheduledThreadPool(1), channels, TimeProvider.SYSTEM_PROVIDER, queueLength);
66 public QueueChannelThrottler(Rate someRate, ScheduledExecutorService scheduler, Map<Object, Rate> channels,
67 TimeProvider timeProvider, int queueLength) {
68 super(someRate, scheduler, channels, timeProvider);
69 overallRate = someRate;
70 tasks = new LinkedBlockingQueue<>(queueLength);
74 public Future<?> submit(Runnable task) {
75 return submit(null, task);
78 @SuppressWarnings({ "unchecked", "rawtypes" })
80 public Future<?> submit(Object channelKey, Runnable task) {
81 FutureTask runTask = new FutureTask(task, null);
83 if (tasks.offer(runTask, overallRate.timeInMillis(), TimeUnit.MILLISECONDS)) {
84 long throttledTime = channelKey == null ? callTime(null) : callTime(channels.get(channelKey));
85 long now = timeProvider.getCurrentTimeInMillis();
86 scheduler.schedule(processQueueTask, throttledTime < now ? 0 : throttledTime - now,
87 TimeUnit.MILLISECONDS);
90 logger.warn("The QueueThrottler can not take the task '{}' at this point in time", runTask.toString());
92 } catch (InterruptedException e) {
93 logger.error("An exception occurred while scheduling a new taks: '{}'", e.getMessage());