2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.insteon.internal.device;
15 import java.util.HashMap;
17 import java.util.PriorityQueue;
18 import java.util.Queue;
20 import org.eclipse.jdt.annotation.NonNullByDefault;
21 import org.eclipse.jdt.annotation.Nullable;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
26 * Class that manages all the per-device request queues using a single thread.
28 * - Each device has its own request queue, and the RequestQueueManager keeps a
30 * - Each entry in m_requestQueues corresponds to a single device's request queue.
31 * A device should never be more than once in m_requestQueues.
32 * - A hash map (m_requestQueueHash) is kept in sync with m_requestQueues for
33 * faster lookup in case a request queue is modified and needs to be
36 * @author Bernd Pfrommer - Initial contribution
37 * @author Rob Nielsen - Port to openHAB 2 insteon binding
40 public class RequestQueueManager {
41 private static @Nullable RequestQueueManager instance = null;
42 private final Logger logger = LoggerFactory.getLogger(RequestQueueManager.class);
43 private @Nullable Thread queueThread = null;
44 private Queue<RequestQueue> requestQueues = new PriorityQueue<>();
45 private Map<InsteonDevice, RequestQueue> requestQueueHash = new HashMap<>();
46 private boolean keepRunning = true;
48 private RequestQueueManager() {
49 queueThread = new Thread(new RequestQueueReader());
50 setParamsAndStart(queueThread);
53 private void setParamsAndStart(@Nullable Thread thread) {
55 thread.setName("Insteon Request Queue Reader");
56 thread.setDaemon(true);
62 * Add device to global request queue.
64 * @param dev the device to add
65 * @param time the time when the queue should be processed
67 public void addQueue(InsteonDevice dev, long time) {
68 synchronized (requestQueues) {
69 RequestQueue q = requestQueueHash.get(dev);
71 logger.trace("scheduling request for device {} in {} msec", dev.getAddress(),
72 time - System.currentTimeMillis());
73 q = new RequestQueue(dev, time);
75 logger.trace("queue for dev {} is already scheduled in {} msec", dev.getAddress(),
76 q.getExpirationTime() - System.currentTimeMillis());
77 if (!requestQueues.remove(q)) {
78 logger.warn("queue for {} should be there, report as bug!", dev);
80 requestQueueHash.remove(dev);
82 long expTime = q.getExpirationTime();
84 q.setExpirationTime(time);
86 // add the queue back in after (maybe) having modified
87 // the expiration time
89 requestQueueHash.put(dev, q);
90 requestQueues.notify();
95 * Stops request queue thread
97 private void stopThread() {
98 logger.debug("stopping thread");
99 Thread queueThread = this.queueThread;
100 if (queueThread != null) {
101 synchronized (requestQueues) {
103 requestQueues.notifyAll();
106 logger.debug("waiting for thread to join");
108 logger.debug("request queue thread exited!");
109 } catch (InterruptedException e) {
110 logger.warn("got interrupted waiting for thread exit ", e);
112 this.queueThread = null;
116 class RequestQueueReader implements Runnable {
119 logger.debug("starting request queue thread");
120 synchronized (requestQueues) {
121 while (keepRunning) {
124 while (keepRunning && (q = requestQueues.peek()) != null) {
125 long now = System.currentTimeMillis();
126 long expTime = q.getExpirationTime();
127 InsteonDevice dev = q.getDevice();
130 // The head of the queue is not up for processing yet, wait().
132 logger.trace("request queue head: {} must wait for {} msec", dev.getAddress(),
134 requestQueues.wait(expTime - now);
136 // note that the wait() can also return because of changes to
137 // the queue, not just because the time expired!
142 // The head of the queue has expired and can be processed!
144 q = requestQueues.poll(); // remove front element
145 requestQueueHash.remove(dev); // and remove from hash map
146 long nextExp = dev.processRequestQueue(now);
148 q = new RequestQueue(dev, nextExp);
149 requestQueues.add(q);
150 requestQueueHash.put(dev, q);
151 logger.trace("device queue for {} rescheduled in {} msec", dev.getAddress(),
154 // remove from hash since queue is no longer scheduled
155 logger.debug("device queue for {} is empty!", dev.getAddress());
158 logger.trace("waiting for request queues to fill");
159 requestQueues.wait();
160 } catch (InterruptedException e) {
161 logger.warn("request queue thread got interrupted, breaking..", e);
166 logger.debug("exiting request queue thread!");
170 public static class RequestQueue implements Comparable<RequestQueue> {
171 private InsteonDevice device;
172 private long expirationTime;
174 RequestQueue(InsteonDevice dev, long expirationTime) {
176 this.expirationTime = expirationTime;
179 public InsteonDevice getDevice() {
183 public long getExpirationTime() {
184 return expirationTime;
187 public void setExpirationTime(long t) {
192 public int compareTo(RequestQueue a) {
193 return (int) (expirationTime - a.expirationTime);
197 public static synchronized @Nullable RequestQueueManager instance() {
198 if (instance == null) {
199 instance = new RequestQueueManager();
204 public static synchronized void destroyInstance() {
205 RequestQueueManager instance = RequestQueueManager.instance;
206 if (instance != null) {
207 instance.stopThread();
208 RequestQueueManager.instance = null;