]> git.basschouten.com Git - openhab-addons.git/blob
4e6b898caece38668fbe86b43330179a131be27a
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2024 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.insteon.internal.driver;
14
15 import java.sql.Date;
16 import java.util.Iterator;
17 import java.util.SortedSet;
18 import java.util.TreeSet;
19
20 import org.eclipse.jdt.annotation.NonNullByDefault;
21 import org.eclipse.jdt.annotation.Nullable;
22 import org.openhab.binding.insteon.internal.InsteonBindingConstants;
23 import org.openhab.binding.insteon.internal.device.InsteonDevice;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 /**
28  * This class manages the polling of all devices.
29  * Between successive polls of any device there is a quiet time of
30  * at least MIN_MSEC_BETWEEN_POLLS. This avoids bunching up of poll messages
31  * and keeps the network bandwidth open for other messages.
32  *
33  * - An entry in the poll queue corresponds to a single device, i.e. each device should
34  * have exactly one entry in the poll queue. That entry is created when startPolling()
35  * is called, and then re-enqueued whenever it expires.
36  * - When a device comes up for polling, its doPoll() method is called, which in turn
37  * puts an entry into that devices request queue. So the Poller class actually never
38  * sends out messages directly. That is done by the device itself via its request
39  * queue. The poller just reminds the device to poll.
40  *
41  * @author Bernd Pfrommer - Initial contribution
42  * @author Rob Nielsen - Port to openHAB 2 insteon binding
43  */
44 @NonNullByDefault
45 public class Poller {
46     private static final long MIN_MSEC_BETWEEN_POLLS = 2000L;
47
48     private final Logger logger = LoggerFactory.getLogger(Poller.class);
49     private static Poller poller = new Poller(); // for singleton
50
51     private @Nullable Thread pollThread = null;
52     private TreeSet<PQEntry> pollQueue = new TreeSet<>();
53     private boolean keepRunning = true;
54
55     /**
56      * Constructor
57      */
58     private Poller() {
59     }
60
61     /**
62      * Get size of poll queue
63      *
64      * @return number of devices being polled
65      */
66     public int getSizeOfQueue() {
67         return (pollQueue.size());
68     }
69
70     /**
71      * Register a device for polling.
72      *
73      * @param d device to register for polling
74      * @param aNumDev approximate number of total devices
75      */
76     public void startPolling(InsteonDevice d, int aNumDev) {
77         logger.debug("start polling device {}", d);
78         synchronized (pollQueue) {
79             // try to spread out the scheduling when
80             // starting up
81             int n = pollQueue.size();
82             long pollDelay = n * d.getPollInterval() / (aNumDev > 0 ? aNumDev : 1);
83             addToPollQueue(d, System.currentTimeMillis() + pollDelay);
84             pollQueue.notify();
85         }
86     }
87
88     /**
89      * Start polling a given device
90      *
91      * @param d reference to the device to be polled
92      */
93     public void stopPolling(InsteonDevice d) {
94         synchronized (pollQueue) {
95             for (Iterator<PQEntry> i = pollQueue.iterator(); i.hasNext();) {
96                 if (i.next().getDevice().getAddress().equals(d.getAddress())) {
97                     i.remove();
98                     logger.debug("stopped polling device {}", d);
99                 }
100             }
101         }
102     }
103
104     /**
105      * Starts the poller thread
106      */
107     public void start() {
108         if (pollThread == null) {
109             pollThread = new Thread(new PollQueueReader());
110             setParamsAndStart(pollThread);
111         }
112     }
113
114     private void setParamsAndStart(@Nullable Thread thread) {
115         if (thread != null) {
116             thread.setName("OH-binding-" + InsteonBindingConstants.BINDING_ID + "-pollQueueReader");
117             thread.setDaemon(true);
118             thread.start();
119         }
120     }
121
122     /**
123      * Stops the poller thread
124      */
125     public void stop() {
126         logger.debug("stopping poller!");
127         synchronized (pollQueue) {
128             pollQueue.clear();
129             keepRunning = false;
130             pollQueue.notify();
131         }
132         try {
133             Thread pollThread = this.pollThread;
134             if (pollThread != null) {
135                 pollThread.join();
136                 this.pollThread = null;
137             }
138             keepRunning = true;
139         } catch (InterruptedException e) {
140             logger.debug("got interrupted on exit: {}", e.getMessage());
141         }
142     }
143
144     /**
145      * Adds a device to the poll queue. After this call, the device's doPoll() method
146      * will be called according to the polling frequency set.
147      *
148      * @param d the device to poll periodically
149      * @param time the target time for the next poll to happen. Note that this time is merely
150      *            a suggestion, and may be adjusted, because there must be at least a minimum gap in polling.
151      */
152
153     private void addToPollQueue(InsteonDevice d, long time) {
154         long texp = findNextExpirationTime(d, time);
155         PQEntry ne = new PQEntry(d, texp);
156         logger.trace("added entry {} originally aimed at time {}", ne, String.format("%tc", new Date(time)));
157         pollQueue.add(ne);
158     }
159
160     /**
161      * Finds the best expiration time for a poll queue, i.e. a time slot that is after the
162      * desired expiration time, but does not collide with any of the already scheduled
163      * polls.
164      *
165      * @param d device to poll (for logging)
166      * @param aTime desired time after which the device should be polled
167      * @return the suggested time to poll
168      */
169
170     private long findNextExpirationTime(InsteonDevice d, long aTime) {
171         long expTime = aTime;
172         // tailSet finds all those that expire after aTime - buffer
173         SortedSet<PQEntry> ts = pollQueue.tailSet(new PQEntry(d, aTime - MIN_MSEC_BETWEEN_POLLS));
174         if (ts.isEmpty()) {
175             // all entries in the poll queue are ahead of the new element,
176             // go ahead and simply add it to the end
177             expTime = aTime;
178         } else {
179             Iterator<PQEntry> pqi = ts.iterator();
180             PQEntry prev = pqi.next();
181             if (prev.getExpirationTime() > aTime + MIN_MSEC_BETWEEN_POLLS) {
182                 // there is a time slot free before the head of the tail set
183                 expTime = aTime;
184             } else {
185                 // look for a gap where we can squeeze in
186                 // a new poll while maintaining MIN_MSEC_BETWEEN_POLLS
187                 while (pqi.hasNext()) {
188                     PQEntry pqe = pqi.next();
189                     long tcurr = pqe.getExpirationTime();
190                     long tprev = prev.getExpirationTime();
191                     if (tcurr - tprev >= 2 * MIN_MSEC_BETWEEN_POLLS) {
192                         // found gap
193                         logger.trace("dev {} time {} found slot between {} and {}", d, aTime, tprev, tcurr);
194                         break;
195                     }
196                     prev = pqe;
197                 }
198                 expTime = prev.getExpirationTime() + MIN_MSEC_BETWEEN_POLLS;
199             }
200         }
201         return expTime;
202     }
203
204     private class PollQueueReader implements Runnable {
205         @Override
206         public void run() {
207             logger.debug("starting poll thread.");
208             synchronized (pollQueue) {
209                 while (keepRunning) {
210                     try {
211                         readPollQueue();
212                     } catch (InterruptedException e) {
213                         logger.warn("poll queue reader thread interrupted!");
214                         break;
215                     }
216                 }
217             }
218             logger.debug("poll thread exiting");
219         }
220
221         /**
222          * Waits for first element of poll queue to become current,
223          * then process it.
224          *
225          * @throws InterruptedException
226          */
227         private void readPollQueue() throws InterruptedException {
228             while (pollQueue.isEmpty() && keepRunning) {
229                 pollQueue.wait();
230             }
231             if (!keepRunning) {
232                 return;
233             }
234             // something is in the queue
235             long now = System.currentTimeMillis();
236             PQEntry pqe = pollQueue.first();
237             long tfirst = pqe.getExpirationTime();
238             long dt = tfirst - now;
239             if (dt > 0) { // must wait for this item to expire
240                 logger.trace("waiting for {} msec until {} comes due", dt, pqe);
241                 pollQueue.wait(dt);
242             } else { // queue entry has expired, process it!
243                 logger.trace("entry {} expired at time {}", pqe, now);
244                 processQueue(now);
245             }
246         }
247
248         /**
249          * Takes first element off the poll queue, polls the corresponding device,
250          * and puts the device back into the poll queue to be polled again later.
251          *
252          * @param now the current time
253          */
254         private void processQueue(long now) {
255             processQueue(now, pollQueue.pollFirst());
256         }
257
258         private void processQueue(long now, @Nullable PQEntry pqe) {
259             if (pqe != null) {
260                 pqe.getDevice().doPoll(0);
261                 addToPollQueue(pqe.getDevice(), now + pqe.getDevice().getPollInterval());
262             }
263         }
264     }
265
266     /**
267      * A poll queue entry corresponds to a single device that needs
268      * to be polled.
269      *
270      * @author Bernd Pfrommer - Initial contribution
271      *
272      */
273     private static class PQEntry implements Comparable<PQEntry> {
274         private InsteonDevice dev;
275         private long expirationTime;
276
277         PQEntry(InsteonDevice dev, long time) {
278             this.dev = dev;
279             this.expirationTime = time;
280         }
281
282         long getExpirationTime() {
283             return expirationTime;
284         }
285
286         InsteonDevice getDevice() {
287             return dev;
288         }
289
290         @Override
291         public int compareTo(PQEntry b) {
292             return (int) (expirationTime - b.expirationTime);
293         }
294
295         @Override
296         public String toString() {
297             return dev.getAddress().toString() + "/" + String.format("%tc", new Date(expirationTime));
298         }
299     }
300
301     /**
302      * Singleton pattern instance() method
303      *
304      * @return the poller instance
305      */
306     public static synchronized Poller instance() {
307         poller.start();
308         return (poller);
309     }
310 }