2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.insteon.internal.device;
15 import java.util.HashMap;
16 import java.util.PriorityQueue;
18 import org.eclipse.jdt.annotation.NonNullByDefault;
19 import org.eclipse.jdt.annotation.Nullable;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
24 * Class that manages all the per-device request queues using a single thread.
26 * - Each device has its own request queue, and the RequestQueueManager keeps a
28 * - Each entry in m_requestQueues corresponds to a single device's request queue.
29 * A device should never be more than once in m_requestQueues.
30 * - A hash map (m_requestQueueHash) is kept in sync with m_requestQueues for
31 * faster lookup in case a request queue is modified and needs to be
34 * @author Bernd Pfrommer - Initial contribution
35 * @author Rob Nielsen - Port to openHAB 2 insteon binding
38 @SuppressWarnings("null")
39 public class RequestQueueManager {
40 private static @Nullable RequestQueueManager instance = null;
41 private final Logger logger = LoggerFactory.getLogger(RequestQueueManager.class);
42 private @Nullable Thread queueThread = null;
43 private PriorityQueue<RequestQueue> requestQueues = new PriorityQueue<>();
44 private HashMap<InsteonDevice, @Nullable RequestQueue> requestQueueHash = new HashMap<>();
45 private boolean keepRunning = true;
47 private RequestQueueManager() {
48 queueThread = new Thread(new RequestQueueReader());
49 queueThread.setName("Insteon Request Queue Reader");
50 queueThread.setDaemon(true);
55 * Add device to global request queue.
57 * @param dev the device to add
58 * @param time the time when the queue should be processed
60 public void addQueue(InsteonDevice dev, long time) {
61 synchronized (requestQueues) {
62 RequestQueue q = requestQueueHash.get(dev);
64 logger.trace("scheduling request for device {} in {} msec", dev.getAddress(),
65 time - System.currentTimeMillis());
66 q = new RequestQueue(dev, time);
68 logger.trace("queue for dev {} is already scheduled in {} msec", dev.getAddress(),
69 q.getExpirationTime() - System.currentTimeMillis());
70 if (!requestQueues.remove(q)) {
71 logger.warn("queue for {} should be there, report as bug!", dev);
73 requestQueueHash.remove(dev);
75 long expTime = q.getExpirationTime();
77 q.setExpirationTime(time);
79 // add the queue back in after (maybe) having modified
80 // the expiration time
82 requestQueueHash.put(dev, q);
83 requestQueues.notify();
88 * Stops request queue thread
90 private void stopThread() {
91 logger.debug("stopping thread");
92 if (queueThread != null) {
93 synchronized (requestQueues) {
95 requestQueues.notifyAll();
98 logger.debug("waiting for thread to join");
100 logger.debug("request queue thread exited!");
101 } catch (InterruptedException e) {
102 logger.warn("got interrupted waiting for thread exit ", e);
109 class RequestQueueReader implements Runnable {
112 logger.debug("starting request queue thread");
113 synchronized (requestQueues) {
114 while (keepRunning) {
116 while (keepRunning && !requestQueues.isEmpty()) {
117 RequestQueue q = requestQueues.peek();
118 long now = System.currentTimeMillis();
119 long expTime = q.getExpirationTime();
120 InsteonDevice dev = q.getDevice();
123 // The head of the queue is not up for processing yet, wait().
125 logger.trace("request queue head: {} must wait for {} msec", dev.getAddress(),
127 requestQueues.wait(expTime - now);
129 // note that the wait() can also return because of changes to
130 // the queue, not just because the time expired!
135 // The head of the queue has expired and can be processed!
137 q = requestQueues.poll(); // remove front element
138 requestQueueHash.remove(dev); // and remove from hash map
139 long nextExp = dev.processRequestQueue(now);
141 q = new RequestQueue(dev, nextExp);
142 requestQueues.add(q);
143 requestQueueHash.put(dev, q);
144 logger.trace("device queue for {} rescheduled in {} msec", dev.getAddress(),
147 // remove from hash since queue is no longer scheduled
148 logger.debug("device queue for {} is empty!", dev.getAddress());
151 logger.trace("waiting for request queues to fill");
152 requestQueues.wait();
153 } catch (InterruptedException e) {
154 logger.warn("request queue thread got interrupted, breaking..", e);
159 logger.debug("exiting request queue thread!");
164 public static class RequestQueue implements Comparable<RequestQueue> {
165 private InsteonDevice device;
166 private long expirationTime;
168 RequestQueue(InsteonDevice dev, long expirationTime) {
170 this.expirationTime = expirationTime;
173 public InsteonDevice getDevice() {
177 public long getExpirationTime() {
178 return expirationTime;
181 public void setExpirationTime(long t) {
186 public int compareTo(RequestQueue a) {
187 return (int) (expirationTime - a.expirationTime);
192 public static synchronized @Nullable RequestQueueManager instance() {
193 if (instance == null) {
194 instance = new RequestQueueManager();
199 public static synchronized void destroyInstance() {
200 if (instance != null) {
201 instance.stopThread();