]> git.basschouten.com Git - openhab-addons.git/blob
b1b3d88fa3a49fec964a250de11c956ee7db7c47
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.insteon.internal.device;
14
15 import java.util.HashMap;
16 import java.util.PriorityQueue;
17
18 import org.eclipse.jdt.annotation.NonNullByDefault;
19 import org.eclipse.jdt.annotation.Nullable;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 /**
24  * Class that manages all the per-device request queues using a single thread.
25  *
26  * - Each device has its own request queue, and the RequestQueueManager keeps a
27  * queue of queues.
28  * - Each entry in m_requestQueues corresponds to a single device's request queue.
29  * A device should never be more than once in m_requestQueues.
30  * - A hash map (m_requestQueueHash) is kept in sync with m_requestQueues for
31  * faster lookup in case a request queue is modified and needs to be
32  * rescheduled.
33  *
34  * @author Bernd Pfrommer - Initial contribution
35  * @author Rob Nielsen - Port to openHAB 2 insteon binding
36  */
37 @NonNullByDefault
38 @SuppressWarnings("null")
39 public class RequestQueueManager {
40     private static @Nullable RequestQueueManager instance = null;
41     private final Logger logger = LoggerFactory.getLogger(RequestQueueManager.class);
42     private @Nullable Thread queueThread = null;
43     private PriorityQueue<RequestQueue> requestQueues = new PriorityQueue<>();
44     private HashMap<InsteonDevice, @Nullable RequestQueue> requestQueueHash = new HashMap<>();
45     private boolean keepRunning = true;
46
47     private RequestQueueManager() {
48         queueThread = new Thread(new RequestQueueReader());
49         queueThread.setName("Insteon Request Queue Reader");
50         queueThread.setDaemon(true);
51         queueThread.start();
52     }
53
54     /**
55      * Add device to global request queue.
56      *
57      * @param dev the device to add
58      * @param time the time when the queue should be processed
59      */
60     public void addQueue(InsteonDevice dev, long time) {
61         synchronized (requestQueues) {
62             RequestQueue q = requestQueueHash.get(dev);
63             if (q == null) {
64                 logger.trace("scheduling request for device {} in {} msec", dev.getAddress(),
65                         time - System.currentTimeMillis());
66                 q = new RequestQueue(dev, time);
67             } else {
68                 logger.trace("queue for dev {} is already scheduled in {} msec", dev.getAddress(),
69                         q.getExpirationTime() - System.currentTimeMillis());
70                 if (!requestQueues.remove(q)) {
71                     logger.warn("queue for {} should be there, report as bug!", dev);
72                 }
73                 requestQueueHash.remove(dev);
74             }
75             long expTime = q.getExpirationTime();
76             if (expTime > time) {
77                 q.setExpirationTime(time);
78             }
79             // add the queue back in after (maybe) having modified
80             // the expiration time
81             requestQueues.add(q);
82             requestQueueHash.put(dev, q);
83             requestQueues.notify();
84         }
85     }
86
87     /**
88      * Stops request queue thread
89      */
90     private void stopThread() {
91         logger.debug("stopping thread");
92         if (queueThread != null) {
93             synchronized (requestQueues) {
94                 keepRunning = false;
95                 requestQueues.notifyAll();
96             }
97             try {
98                 logger.debug("waiting for thread to join");
99                 queueThread.join();
100                 logger.debug("request queue thread exited!");
101             } catch (InterruptedException e) {
102                 logger.warn("got interrupted waiting for thread exit ", e);
103             }
104             queueThread = null;
105         }
106     }
107
108     @NonNullByDefault
109     class RequestQueueReader implements Runnable {
110         @Override
111         public void run() {
112             logger.debug("starting request queue thread");
113             synchronized (requestQueues) {
114                 while (keepRunning) {
115                     try {
116                         while (keepRunning && !requestQueues.isEmpty()) {
117                             RequestQueue q = requestQueues.peek();
118                             long now = System.currentTimeMillis();
119                             long expTime = q.getExpirationTime();
120                             InsteonDevice dev = q.getDevice();
121                             if (expTime > now) {
122                                 //
123                                 // The head of the queue is not up for processing yet, wait().
124                                 //
125                                 logger.trace("request queue head: {} must wait for {} msec", dev.getAddress(),
126                                         expTime - now);
127                                 requestQueues.wait(expTime - now);
128                                 //
129                                 // note that the wait() can also return because of changes to
130                                 // the queue, not just because the time expired!
131                                 //
132                                 continue;
133                             }
134                             //
135                             // The head of the queue has expired and can be processed!
136                             //
137                             q = requestQueues.poll(); // remove front element
138                             requestQueueHash.remove(dev); // and remove from hash map
139                             long nextExp = dev.processRequestQueue(now);
140                             if (nextExp > 0) {
141                                 q = new RequestQueue(dev, nextExp);
142                                 requestQueues.add(q);
143                                 requestQueueHash.put(dev, q);
144                                 logger.trace("device queue for {} rescheduled in {} msec", dev.getAddress(),
145                                         nextExp - now);
146                             } else {
147                                 // remove from hash since queue is no longer scheduled
148                                 logger.debug("device queue for {} is empty!", dev.getAddress());
149                             }
150                         }
151                         logger.trace("waiting for request queues to fill");
152                         requestQueues.wait();
153                     } catch (InterruptedException e) {
154                         logger.warn("request queue thread got interrupted, breaking..", e);
155                         break;
156                     }
157                 }
158             }
159             logger.debug("exiting request queue thread!");
160         }
161     }
162
163     @NonNullByDefault
164     public static class RequestQueue implements Comparable<RequestQueue> {
165         private InsteonDevice device;
166         private long expirationTime;
167
168         RequestQueue(InsteonDevice dev, long expirationTime) {
169             this.device = dev;
170             this.expirationTime = expirationTime;
171         }
172
173         public InsteonDevice getDevice() {
174             return device;
175         }
176
177         public long getExpirationTime() {
178             return expirationTime;
179         }
180
181         public void setExpirationTime(long t) {
182             expirationTime = t;
183         }
184
185         @Override
186         public int compareTo(RequestQueue a) {
187             return (int) (expirationTime - a.expirationTime);
188         }
189     }
190
191     @NonNullByDefault
192     public static synchronized @Nullable RequestQueueManager instance() {
193         if (instance == null) {
194             instance = new RequestQueueManager();
195         }
196         return (instance);
197     }
198
199     public static synchronized void destroyInstance() {
200         if (instance != null) {
201             instance.stopThread();
202             instance = null;
203         }
204     }
205 }