2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.insteon.internal.device;
15 import java.util.HashMap;
17 import java.util.PriorityQueue;
18 import java.util.Queue;
20 import org.eclipse.jdt.annotation.NonNullByDefault;
21 import org.eclipse.jdt.annotation.Nullable;
22 import org.openhab.binding.insteon.internal.InsteonBindingConstants;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
27 * Class that manages all the per-device request queues using a single thread.
29 * - Each device has its own request queue, and the RequestQueueManager keeps a
31 * - Each entry in m_requestQueues corresponds to a single device's request queue.
32 * A device should never be more than once in m_requestQueues.
33 * - A hash map (m_requestQueueHash) is kept in sync with m_requestQueues for
34 * faster lookup in case a request queue is modified and needs to be
37 * @author Bernd Pfrommer - Initial contribution
38 * @author Rob Nielsen - Port to openHAB 2 insteon binding
41 public class RequestQueueManager {
42 private static @Nullable RequestQueueManager instance = null;
43 private final Logger logger = LoggerFactory.getLogger(RequestQueueManager.class);
44 private @Nullable Thread queueThread = null;
45 private Queue<RequestQueue> requestQueues = new PriorityQueue<>();
46 private Map<InsteonDevice, RequestQueue> requestQueueHash = new HashMap<>();
47 private boolean keepRunning = true;
49 private RequestQueueManager() {
50 queueThread = new Thread(new RequestQueueReader());
51 setParamsAndStart(queueThread);
54 private void setParamsAndStart(@Nullable Thread thread) {
56 thread.setName("OH-binding-" + InsteonBindingConstants.BINDING_ID + "-requestQueueReader");
57 thread.setDaemon(true);
63 * Add device to global request queue.
65 * @param dev the device to add
66 * @param time the time when the queue should be processed
68 public void addQueue(InsteonDevice dev, long time) {
69 synchronized (requestQueues) {
70 RequestQueue q = requestQueueHash.get(dev);
72 logger.trace("scheduling request for device {} in {} msec", dev.getAddress(),
73 time - System.currentTimeMillis());
74 q = new RequestQueue(dev, time);
76 logger.trace("queue for dev {} is already scheduled in {} msec", dev.getAddress(),
77 q.getExpirationTime() - System.currentTimeMillis());
78 if (!requestQueues.remove(q)) {
79 logger.warn("queue for {} should be there, report as bug!", dev);
81 requestQueueHash.remove(dev);
83 long expTime = q.getExpirationTime();
85 q.setExpirationTime(time);
87 // add the queue back in after (maybe) having modified
88 // the expiration time
90 requestQueueHash.put(dev, q);
91 requestQueues.notify();
96 * Stops request queue thread
98 private void stopThread() {
99 logger.debug("stopping thread");
100 Thread queueThread = this.queueThread;
101 if (queueThread != null) {
102 synchronized (requestQueues) {
104 requestQueues.notifyAll();
107 logger.debug("waiting for thread to join");
109 logger.debug("request queue thread exited!");
110 } catch (InterruptedException e) {
111 logger.warn("got interrupted waiting for thread exit ", e);
113 this.queueThread = null;
117 class RequestQueueReader implements Runnable {
120 logger.debug("starting request queue thread");
121 synchronized (requestQueues) {
122 while (keepRunning) {
125 while (keepRunning && (q = requestQueues.peek()) != null) {
126 long now = System.currentTimeMillis();
127 long expTime = q.getExpirationTime();
128 InsteonDevice dev = q.getDevice();
131 // The head of the queue is not up for processing yet, wait().
133 logger.trace("request queue head: {} must wait for {} msec", dev.getAddress(),
135 requestQueues.wait(expTime - now);
137 // note that the wait() can also return because of changes to
138 // the queue, not just because the time expired!
143 // The head of the queue has expired and can be processed!
145 q = requestQueues.poll(); // remove front element
146 requestQueueHash.remove(dev); // and remove from hash map
147 long nextExp = dev.processRequestQueue(now);
149 q = new RequestQueue(dev, nextExp);
150 requestQueues.add(q);
151 requestQueueHash.put(dev, q);
152 logger.trace("device queue for {} rescheduled in {} msec", dev.getAddress(),
155 // remove from hash since queue is no longer scheduled
156 logger.debug("device queue for {} is empty!", dev.getAddress());
159 logger.trace("waiting for request queues to fill");
160 requestQueues.wait();
161 } catch (InterruptedException e) {
162 logger.warn("request queue thread got interrupted, breaking..", e);
167 logger.debug("exiting request queue thread!");
171 public static class RequestQueue implements Comparable<RequestQueue> {
172 private InsteonDevice device;
173 private long expirationTime;
175 RequestQueue(InsteonDevice dev, long expirationTime) {
177 this.expirationTime = expirationTime;
180 public InsteonDevice getDevice() {
184 public long getExpirationTime() {
185 return expirationTime;
188 public void setExpirationTime(long t) {
193 public int compareTo(RequestQueue a) {
194 return (int) (expirationTime - a.expirationTime);
198 public static synchronized @Nullable RequestQueueManager instance() {
199 if (instance == null) {
200 instance = new RequestQueueManager();
205 public static synchronized void destroyInstance() {
206 RequestQueueManager instance = RequestQueueManager.instance;
207 if (instance != null) {
208 instance.stopThread();
209 RequestQueueManager.instance = null;