]> git.basschouten.com Git - openhab-addons.git/blob
cc0cca96b00adb54ef6d3620c288ce37e3315b7f
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.tesla.internal.throttler;
14
15 import java.util.HashMap;
16 import java.util.Map;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.Executors;
19 import java.util.concurrent.Future;
20 import java.util.concurrent.FutureTask;
21 import java.util.concurrent.LinkedBlockingQueue;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
24
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * The {@link QueueChannelThrottler} implements a throttler that maintains
30  * multiple execution rates, and maintains the order of calls
31  *
32  * @author Karel Goderis - Initial contribution
33  */
34 public final class QueueChannelThrottler extends AbstractMultiRateChannelThrottler {
35
36     private final Logger logger = LoggerFactory.getLogger(QueueChannelThrottler.class);
37
38     private static final int MAX_QUEUE_LENGTH = 150;
39     private BlockingQueue<FutureTask<?>> tasks;
40     private final Rate overallRate;
41
42     private final Runnable processQueueTask = () -> {
43         FutureTask<?> task = tasks.poll();
44         if (task != null && !task.isCancelled()) {
45             task.run();
46         }
47     };
48
49     public QueueChannelThrottler(Rate someRate) {
50         this(someRate, Executors.newScheduledThreadPool(1), new HashMap<>(), TimeProvider.SYSTEM_PROVIDER,
51                 MAX_QUEUE_LENGTH);
52     }
53
54     public QueueChannelThrottler(Rate someRate, ScheduledExecutorService scheduler) {
55         this(someRate, scheduler, new HashMap<>(), TimeProvider.SYSTEM_PROVIDER, MAX_QUEUE_LENGTH);
56     }
57
58     public QueueChannelThrottler(Rate someRate, ScheduledExecutorService scheduler, Map<Object, Rate> channels) {
59         this(someRate, scheduler, channels, TimeProvider.SYSTEM_PROVIDER, MAX_QUEUE_LENGTH);
60     }
61
62     public QueueChannelThrottler(Rate someRate, Map<Object, Rate> channels, int queueLength) {
63         this(someRate, Executors.newScheduledThreadPool(1), channels, TimeProvider.SYSTEM_PROVIDER, queueLength);
64     }
65
66     public QueueChannelThrottler(Rate someRate, ScheduledExecutorService scheduler, Map<Object, Rate> channels,
67             TimeProvider timeProvider, int queueLength) {
68         super(someRate, scheduler, channels, timeProvider);
69         overallRate = someRate;
70         tasks = new LinkedBlockingQueue<>(queueLength);
71     }
72
73     @Override
74     public Future<?> submit(Runnable task) {
75         return submit(null, task);
76     }
77
78     @SuppressWarnings({ "unchecked", "rawtypes" })
79     @Override
80     public Future<?> submit(Object channelKey, Runnable task) {
81         FutureTask runTask = new FutureTask(task, null);
82         try {
83             if (tasks.offer(runTask, overallRate.timeInMillis(), TimeUnit.MILLISECONDS)) {
84                 long throttledTime = channelKey == null ? callTime(null) : callTime(channels.get(channelKey));
85                 long now = timeProvider.getCurrentTimeInMillis();
86                 scheduler.schedule(processQueueTask, throttledTime < now ? 0 : throttledTime - now,
87                         TimeUnit.MILLISECONDS);
88                 return runTask;
89             } else {
90                 logger.warn("The QueueThrottler can not take the task '{}' at this point in time", runTask.toString());
91             }
92         } catch (InterruptedException e) {
93             logger.error("An exception occurred while scheduling a new taks: '{}'", e.getMessage());
94         }
95
96         return null;
97     }
98 }