]> git.basschouten.com Git - openhab-addons.git/blob
4b9f29ff0306aba1cb3cbf0e5d398ad4b2a1efb6
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.insteon.internal.device;
14
15 import java.util.HashMap;
16 import java.util.Map;
17 import java.util.PriorityQueue;
18
19 import org.eclipse.jdt.annotation.NonNullByDefault;
20 import org.eclipse.jdt.annotation.Nullable;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 /**
25  * Class that manages all the per-device request queues using a single thread.
26  *
27  * - Each device has its own request queue, and the RequestQueueManager keeps a
28  * queue of queues.
29  * - Each entry in m_requestQueues corresponds to a single device's request queue.
30  * A device should never be more than once in m_requestQueues.
31  * - A hash map (m_requestQueueHash) is kept in sync with m_requestQueues for
32  * faster lookup in case a request queue is modified and needs to be
33  * rescheduled.
34  *
35  * @author Bernd Pfrommer - Initial contribution
36  * @author Rob Nielsen - Port to openHAB 2 insteon binding
37  */
38 @NonNullByDefault
39 @SuppressWarnings("null")
40 public class RequestQueueManager {
41     private static @Nullable RequestQueueManager instance = null;
42     private final Logger logger = LoggerFactory.getLogger(RequestQueueManager.class);
43     private @Nullable Thread queueThread = null;
44     private PriorityQueue<RequestQueue> requestQueues = new PriorityQueue<>();
45     private Map<InsteonDevice, RequestQueue> requestQueueHash = new HashMap<>();
46     private boolean keepRunning = true;
47
48     private RequestQueueManager() {
49         queueThread = new Thread(new RequestQueueReader());
50         queueThread.setName("Insteon Request Queue Reader");
51         queueThread.setDaemon(true);
52         queueThread.start();
53     }
54
55     /**
56      * Add device to global request queue.
57      *
58      * @param dev the device to add
59      * @param time the time when the queue should be processed
60      */
61     public void addQueue(InsteonDevice dev, long time) {
62         synchronized (requestQueues) {
63             RequestQueue q = requestQueueHash.get(dev);
64             if (q == null) {
65                 logger.trace("scheduling request for device {} in {} msec", dev.getAddress(),
66                         time - System.currentTimeMillis());
67                 q = new RequestQueue(dev, time);
68             } else {
69                 logger.trace("queue for dev {} is already scheduled in {} msec", dev.getAddress(),
70                         q.getExpirationTime() - System.currentTimeMillis());
71                 if (!requestQueues.remove(q)) {
72                     logger.warn("queue for {} should be there, report as bug!", dev);
73                 }
74                 requestQueueHash.remove(dev);
75             }
76             long expTime = q.getExpirationTime();
77             if (expTime > time) {
78                 q.setExpirationTime(time);
79             }
80             // add the queue back in after (maybe) having modified
81             // the expiration time
82             requestQueues.add(q);
83             requestQueueHash.put(dev, q);
84             requestQueues.notify();
85         }
86     }
87
88     /**
89      * Stops request queue thread
90      */
91     private void stopThread() {
92         logger.debug("stopping thread");
93         if (queueThread != null) {
94             synchronized (requestQueues) {
95                 keepRunning = false;
96                 requestQueues.notifyAll();
97             }
98             try {
99                 logger.debug("waiting for thread to join");
100                 queueThread.join();
101                 logger.debug("request queue thread exited!");
102             } catch (InterruptedException e) {
103                 logger.warn("got interrupted waiting for thread exit ", e);
104             }
105             queueThread = null;
106         }
107     }
108
109     @NonNullByDefault
110     class RequestQueueReader implements Runnable {
111         @Override
112         public void run() {
113             logger.debug("starting request queue thread");
114             synchronized (requestQueues) {
115                 while (keepRunning) {
116                     try {
117                         while (keepRunning && !requestQueues.isEmpty()) {
118                             RequestQueue q = requestQueues.peek();
119                             long now = System.currentTimeMillis();
120                             long expTime = q.getExpirationTime();
121                             InsteonDevice dev = q.getDevice();
122                             if (expTime > now) {
123                                 //
124                                 // The head of the queue is not up for processing yet, wait().
125                                 //
126                                 logger.trace("request queue head: {} must wait for {} msec", dev.getAddress(),
127                                         expTime - now);
128                                 requestQueues.wait(expTime - now);
129                                 //
130                                 // note that the wait() can also return because of changes to
131                                 // the queue, not just because the time expired!
132                                 //
133                                 continue;
134                             }
135                             //
136                             // The head of the queue has expired and can be processed!
137                             //
138                             q = requestQueues.poll(); // remove front element
139                             requestQueueHash.remove(dev); // and remove from hash map
140                             long nextExp = dev.processRequestQueue(now);
141                             if (nextExp > 0) {
142                                 q = new RequestQueue(dev, nextExp);
143                                 requestQueues.add(q);
144                                 requestQueueHash.put(dev, q);
145                                 logger.trace("device queue for {} rescheduled in {} msec", dev.getAddress(),
146                                         nextExp - now);
147                             } else {
148                                 // remove from hash since queue is no longer scheduled
149                                 logger.debug("device queue for {} is empty!", dev.getAddress());
150                             }
151                         }
152                         logger.trace("waiting for request queues to fill");
153                         requestQueues.wait();
154                     } catch (InterruptedException e) {
155                         logger.warn("request queue thread got interrupted, breaking..", e);
156                         break;
157                     }
158                 }
159             }
160             logger.debug("exiting request queue thread!");
161         }
162     }
163
164     @NonNullByDefault
165     public static class RequestQueue implements Comparable<RequestQueue> {
166         private InsteonDevice device;
167         private long expirationTime;
168
169         RequestQueue(InsteonDevice dev, long expirationTime) {
170             this.device = dev;
171             this.expirationTime = expirationTime;
172         }
173
174         public InsteonDevice getDevice() {
175             return device;
176         }
177
178         public long getExpirationTime() {
179             return expirationTime;
180         }
181
182         public void setExpirationTime(long t) {
183             expirationTime = t;
184         }
185
186         @Override
187         public int compareTo(RequestQueue a) {
188             return (int) (expirationTime - a.expirationTime);
189         }
190     }
191
192     @NonNullByDefault
193     public static synchronized @Nullable RequestQueueManager instance() {
194         if (instance == null) {
195             instance = new RequestQueueManager();
196         }
197         return (instance);
198     }
199
200     public static synchronized void destroyInstance() {
201         if (instance != null) {
202             instance.stopThread();
203             instance = null;
204         }
205     }
206 }