2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.insteon.internal.device;
15 import java.util.HashMap;
17 import java.util.PriorityQueue;
19 import org.eclipse.jdt.annotation.NonNullByDefault;
20 import org.eclipse.jdt.annotation.Nullable;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
25 * Class that manages all the per-device request queues using a single thread.
27 * - Each device has its own request queue, and the RequestQueueManager keeps a
29 * - Each entry in m_requestQueues corresponds to a single device's request queue.
30 * A device should never be more than once in m_requestQueues.
31 * - A hash map (m_requestQueueHash) is kept in sync with m_requestQueues for
32 * faster lookup in case a request queue is modified and needs to be
35 * @author Bernd Pfrommer - Initial contribution
36 * @author Rob Nielsen - Port to openHAB 2 insteon binding
39 @SuppressWarnings("null")
40 public class RequestQueueManager {
41 private static @Nullable RequestQueueManager instance = null;
42 private final Logger logger = LoggerFactory.getLogger(RequestQueueManager.class);
43 private @Nullable Thread queueThread = null;
44 private PriorityQueue<RequestQueue> requestQueues = new PriorityQueue<>();
45 private Map<InsteonDevice, RequestQueue> requestQueueHash = new HashMap<>();
46 private boolean keepRunning = true;
48 private RequestQueueManager() {
49 queueThread = new Thread(new RequestQueueReader());
50 queueThread.setName("Insteon Request Queue Reader");
51 queueThread.setDaemon(true);
56 * Add device to global request queue.
58 * @param dev the device to add
59 * @param time the time when the queue should be processed
61 public void addQueue(InsteonDevice dev, long time) {
62 synchronized (requestQueues) {
63 RequestQueue q = requestQueueHash.get(dev);
65 logger.trace("scheduling request for device {} in {} msec", dev.getAddress(),
66 time - System.currentTimeMillis());
67 q = new RequestQueue(dev, time);
69 logger.trace("queue for dev {} is already scheduled in {} msec", dev.getAddress(),
70 q.getExpirationTime() - System.currentTimeMillis());
71 if (!requestQueues.remove(q)) {
72 logger.warn("queue for {} should be there, report as bug!", dev);
74 requestQueueHash.remove(dev);
76 long expTime = q.getExpirationTime();
78 q.setExpirationTime(time);
80 // add the queue back in after (maybe) having modified
81 // the expiration time
83 requestQueueHash.put(dev, q);
84 requestQueues.notify();
89 * Stops request queue thread
91 private void stopThread() {
92 logger.debug("stopping thread");
93 if (queueThread != null) {
94 synchronized (requestQueues) {
96 requestQueues.notifyAll();
99 logger.debug("waiting for thread to join");
101 logger.debug("request queue thread exited!");
102 } catch (InterruptedException e) {
103 logger.warn("got interrupted waiting for thread exit ", e);
110 class RequestQueueReader implements Runnable {
113 logger.debug("starting request queue thread");
114 synchronized (requestQueues) {
115 while (keepRunning) {
117 while (keepRunning && !requestQueues.isEmpty()) {
118 RequestQueue q = requestQueues.peek();
119 long now = System.currentTimeMillis();
120 long expTime = q.getExpirationTime();
121 InsteonDevice dev = q.getDevice();
124 // The head of the queue is not up for processing yet, wait().
126 logger.trace("request queue head: {} must wait for {} msec", dev.getAddress(),
128 requestQueues.wait(expTime - now);
130 // note that the wait() can also return because of changes to
131 // the queue, not just because the time expired!
136 // The head of the queue has expired and can be processed!
138 q = requestQueues.poll(); // remove front element
139 requestQueueHash.remove(dev); // and remove from hash map
140 long nextExp = dev.processRequestQueue(now);
142 q = new RequestQueue(dev, nextExp);
143 requestQueues.add(q);
144 requestQueueHash.put(dev, q);
145 logger.trace("device queue for {} rescheduled in {} msec", dev.getAddress(),
148 // remove from hash since queue is no longer scheduled
149 logger.debug("device queue for {} is empty!", dev.getAddress());
152 logger.trace("waiting for request queues to fill");
153 requestQueues.wait();
154 } catch (InterruptedException e) {
155 logger.warn("request queue thread got interrupted, breaking..", e);
160 logger.debug("exiting request queue thread!");
165 public static class RequestQueue implements Comparable<RequestQueue> {
166 private InsteonDevice device;
167 private long expirationTime;
169 RequestQueue(InsteonDevice dev, long expirationTime) {
171 this.expirationTime = expirationTime;
174 public InsteonDevice getDevice() {
178 public long getExpirationTime() {
179 return expirationTime;
182 public void setExpirationTime(long t) {
187 public int compareTo(RequestQueue a) {
188 return (int) (expirationTime - a.expirationTime);
193 public static synchronized @Nullable RequestQueueManager instance() {
194 if (instance == null) {
195 instance = new RequestQueueManager();
200 public static synchronized void destroyInstance() {
201 if (instance != null) {
202 instance.stopThread();