]> git.basschouten.com Git - openhab-addons.git/blob
0d2dfbb7c99512d27de333d618431dd6c479e384
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.insteon.internal.device;
14
15 import java.util.HashMap;
16 import java.util.Map;
17 import java.util.PriorityQueue;
18 import java.util.Queue;
19
20 import org.eclipse.jdt.annotation.NonNullByDefault;
21 import org.eclipse.jdt.annotation.Nullable;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 /**
26  * Class that manages all the per-device request queues using a single thread.
27  *
28  * - Each device has its own request queue, and the RequestQueueManager keeps a
29  * queue of queues.
30  * - Each entry in m_requestQueues corresponds to a single device's request queue.
31  * A device should never be more than once in m_requestQueues.
32  * - A hash map (m_requestQueueHash) is kept in sync with m_requestQueues for
33  * faster lookup in case a request queue is modified and needs to be
34  * rescheduled.
35  *
36  * @author Bernd Pfrommer - Initial contribution
37  * @author Rob Nielsen - Port to openHAB 2 insteon binding
38  */
39 @NonNullByDefault
40 public class RequestQueueManager {
41     private static @Nullable RequestQueueManager instance = null;
42     private final Logger logger = LoggerFactory.getLogger(RequestQueueManager.class);
43     private @Nullable Thread queueThread = null;
44     private Queue<RequestQueue> requestQueues = new PriorityQueue<>();
45     private Map<InsteonDevice, RequestQueue> requestQueueHash = new HashMap<>();
46     private boolean keepRunning = true;
47
48     private RequestQueueManager() {
49         queueThread = new Thread(new RequestQueueReader());
50         setParamsAndStart(queueThread);
51     }
52
53     private void setParamsAndStart(@Nullable Thread thread) {
54         if (thread != null) {
55             thread.setName("Insteon Request Queue Reader");
56             thread.setDaemon(true);
57             thread.start();
58         }
59     }
60
61     /**
62      * Add device to global request queue.
63      *
64      * @param dev the device to add
65      * @param time the time when the queue should be processed
66      */
67     public void addQueue(InsteonDevice dev, long time) {
68         synchronized (requestQueues) {
69             RequestQueue q = requestQueueHash.get(dev);
70             if (q == null) {
71                 logger.trace("scheduling request for device {} in {} msec", dev.getAddress(),
72                         time - System.currentTimeMillis());
73                 q = new RequestQueue(dev, time);
74             } else {
75                 logger.trace("queue for dev {} is already scheduled in {} msec", dev.getAddress(),
76                         q.getExpirationTime() - System.currentTimeMillis());
77                 if (!requestQueues.remove(q)) {
78                     logger.warn("queue for {} should be there, report as bug!", dev);
79                 }
80                 requestQueueHash.remove(dev);
81             }
82             long expTime = q.getExpirationTime();
83             if (expTime > time) {
84                 q.setExpirationTime(time);
85             }
86             // add the queue back in after (maybe) having modified
87             // the expiration time
88             requestQueues.add(q);
89             requestQueueHash.put(dev, q);
90             requestQueues.notify();
91         }
92     }
93
94     /**
95      * Stops request queue thread
96      */
97     private void stopThread() {
98         logger.debug("stopping thread");
99         Thread queueThread = this.queueThread;
100         if (queueThread != null) {
101             synchronized (requestQueues) {
102                 keepRunning = false;
103                 requestQueues.notifyAll();
104             }
105             try {
106                 logger.debug("waiting for thread to join");
107                 queueThread.join();
108                 logger.debug("request queue thread exited!");
109             } catch (InterruptedException e) {
110                 logger.warn("got interrupted waiting for thread exit ", e);
111             }
112             this.queueThread = null;
113         }
114     }
115
116     class RequestQueueReader implements Runnable {
117         @Override
118         public void run() {
119             logger.debug("starting request queue thread");
120             synchronized (requestQueues) {
121                 while (keepRunning) {
122                     try {
123                         RequestQueue q;
124                         while (keepRunning && (q = requestQueues.peek()) != null) {
125                             long now = System.currentTimeMillis();
126                             long expTime = q.getExpirationTime();
127                             InsteonDevice dev = q.getDevice();
128                             if (expTime > now) {
129                                 //
130                                 // The head of the queue is not up for processing yet, wait().
131                                 //
132                                 logger.trace("request queue head: {} must wait for {} msec", dev.getAddress(),
133                                         expTime - now);
134                                 requestQueues.wait(expTime - now);
135                                 //
136                                 // note that the wait() can also return because of changes to
137                                 // the queue, not just because the time expired!
138                                 //
139                                 continue;
140                             }
141                             //
142                             // The head of the queue has expired and can be processed!
143                             //
144                             q = requestQueues.poll(); // remove front element
145                             requestQueueHash.remove(dev); // and remove from hash map
146                             long nextExp = dev.processRequestQueue(now);
147                             if (nextExp > 0) {
148                                 q = new RequestQueue(dev, nextExp);
149                                 requestQueues.add(q);
150                                 requestQueueHash.put(dev, q);
151                                 logger.trace("device queue for {} rescheduled in {} msec", dev.getAddress(),
152                                         nextExp - now);
153                             } else {
154                                 // remove from hash since queue is no longer scheduled
155                                 logger.debug("device queue for {} is empty!", dev.getAddress());
156                             }
157                         }
158                         logger.trace("waiting for request queues to fill");
159                         requestQueues.wait();
160                     } catch (InterruptedException e) {
161                         logger.warn("request queue thread got interrupted, breaking..", e);
162                         break;
163                     }
164                 }
165             }
166             logger.debug("exiting request queue thread!");
167         }
168     }
169
170     public static class RequestQueue implements Comparable<RequestQueue> {
171         private InsteonDevice device;
172         private long expirationTime;
173
174         RequestQueue(InsteonDevice dev, long expirationTime) {
175             this.device = dev;
176             this.expirationTime = expirationTime;
177         }
178
179         public InsteonDevice getDevice() {
180             return device;
181         }
182
183         public long getExpirationTime() {
184             return expirationTime;
185         }
186
187         public void setExpirationTime(long t) {
188             expirationTime = t;
189         }
190
191         @Override
192         public int compareTo(RequestQueue a) {
193             return (int) (expirationTime - a.expirationTime);
194         }
195     }
196
197     public static synchronized @Nullable RequestQueueManager instance() {
198         if (instance == null) {
199             instance = new RequestQueueManager();
200         }
201         return instance;
202     }
203
204     public static synchronized void destroyInstance() {
205         RequestQueueManager instance = RequestQueueManager.instance;
206         if (instance != null) {
207             instance.stopThread();
208             RequestQueueManager.instance = null;
209         }
210     }
211 }