]> git.basschouten.com Git - openhab-addons.git/blob
a17856ca893d32929c2c42f5defc1edebd7a4576
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.insteon.internal.driver;
14
15 import java.sql.Date;
16 import java.util.Iterator;
17 import java.util.SortedSet;
18 import java.util.TreeSet;
19
20 import org.eclipse.jdt.annotation.NonNullByDefault;
21 import org.eclipse.jdt.annotation.Nullable;
22 import org.openhab.binding.insteon.internal.device.InsteonDevice;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 /**
27  * This class manages the polling of all devices.
28  * Between successive polls of a any device there is a quiet time of
29  * at least MIN_MSEC_BETWEEN_POLLS. This avoids bunching up of poll messages
30  * and keeps the network bandwidth open for other messages.
31  *
32  * - An entry in the poll queue corresponds to a single device, i.e. each device should
33  * have exactly one entry in the poll queue. That entry is created when startPolling()
34  * is called, and then re-enqueued whenever it expires.
35  * - When a device comes up for polling, its doPoll() method is called, which in turn
36  * puts an entry into that devices request queue. So the Poller class actually never
37  * sends out messages directly. That is done by the device itself via its request
38  * queue. The poller just reminds the device to poll.
39  *
40  * @author Bernd Pfrommer - Initial contribution
41  * @author Rob Nielsen - Port to openHAB 2 insteon binding
42  */
43 @NonNullByDefault
44 public class Poller {
45     private static final long MIN_MSEC_BETWEEN_POLLS = 2000L;
46
47     private final Logger logger = LoggerFactory.getLogger(Poller.class);
48     private static Poller poller = new Poller(); // for singleton
49
50     private @Nullable Thread pollThread = null;
51     private TreeSet<PQEntry> pollQueue = new TreeSet<>();
52     private boolean keepRunning = true;
53
54     /**
55      * Constructor
56      */
57     private Poller() {
58     }
59
60     /**
61      * Get size of poll queue
62      *
63      * @return number of devices being polled
64      */
65     public int getSizeOfQueue() {
66         return (pollQueue.size());
67     }
68
69     /**
70      * Register a device for polling.
71      *
72      * @param d device to register for polling
73      * @param aNumDev approximate number of total devices
74      */
75     public void startPolling(InsteonDevice d, int aNumDev) {
76         logger.debug("start polling device {}", d);
77         synchronized (pollQueue) {
78             // try to spread out the scheduling when
79             // starting up
80             int n = pollQueue.size();
81             long pollDelay = n * d.getPollInterval() / (aNumDev > 0 ? aNumDev : 1);
82             addToPollQueue(d, System.currentTimeMillis() + pollDelay);
83             pollQueue.notify();
84         }
85     }
86
87     /**
88      * Start polling a given device
89      *
90      * @param d reference to the device to be polled
91      */
92     public void stopPolling(InsteonDevice d) {
93         synchronized (pollQueue) {
94             for (Iterator<PQEntry> i = pollQueue.iterator(); i.hasNext();) {
95                 if (i.next().getDevice().getAddress().equals(d.getAddress())) {
96                     i.remove();
97                     logger.debug("stopped polling device {}", d);
98                 }
99             }
100         }
101     }
102
103     /**
104      * Starts the poller thread
105      */
106     public void start() {
107         if (pollThread == null) {
108             pollThread = new Thread(new PollQueueReader());
109             setParamsAndStart(pollThread);
110         }
111     }
112
113     private void setParamsAndStart(@Nullable Thread thread) {
114         if (thread != null) {
115             thread.setName("Insteon Poll Queue Reader");
116             thread.setDaemon(true);
117             thread.start();
118         }
119     }
120
121     /**
122      * Stops the poller thread
123      */
124     public void stop() {
125         logger.debug("stopping poller!");
126         synchronized (pollQueue) {
127             pollQueue.clear();
128             keepRunning = false;
129             pollQueue.notify();
130         }
131         try {
132             Thread pollThread = this.pollThread;
133             if (pollThread != null) {
134                 pollThread.join();
135                 this.pollThread = null;
136             }
137             keepRunning = true;
138         } catch (InterruptedException e) {
139             logger.debug("got interrupted on exit: {}", e.getMessage());
140         }
141     }
142
143     /**
144      * Adds a device to the poll queue. After this call, the device's doPoll() method
145      * will be called according to the polling frequency set.
146      *
147      * @param d the device to poll periodically
148      * @param time the target time for the next poll to happen. Note that this time is merely
149      *            a suggestion, and may be adjusted, because there must be at least a minimum gap in polling.
150      */
151
152     private void addToPollQueue(InsteonDevice d, long time) {
153         long texp = findNextExpirationTime(d, time);
154         PQEntry ne = new PQEntry(d, texp);
155         logger.trace("added entry {} originally aimed at time {}", ne, String.format("%tc", new Date(time)));
156         pollQueue.add(ne);
157     }
158
159     /**
160      * Finds the best expiration time for a poll queue, i.e. a time slot that is after the
161      * desired expiration time, but does not collide with any of the already scheduled
162      * polls.
163      *
164      * @param d device to poll (for logging)
165      * @param aTime desired time after which the device should be polled
166      * @return the suggested time to poll
167      */
168
169     private long findNextExpirationTime(InsteonDevice d, long aTime) {
170         long expTime = aTime;
171         // tailSet finds all those that expire after aTime - buffer
172         SortedSet<PQEntry> ts = pollQueue.tailSet(new PQEntry(d, aTime - MIN_MSEC_BETWEEN_POLLS));
173         if (ts.isEmpty()) {
174             // all entries in the poll queue are ahead of the new element,
175             // go ahead and simply add it to the end
176             expTime = aTime;
177         } else {
178             Iterator<PQEntry> pqi = ts.iterator();
179             PQEntry prev = pqi.next();
180             if (prev.getExpirationTime() > aTime + MIN_MSEC_BETWEEN_POLLS) {
181                 // there is a time slot free before the head of the tail set
182                 expTime = aTime;
183             } else {
184                 // look for a gap where we can squeeze in
185                 // a new poll while maintaining MIN_MSEC_BETWEEN_POLLS
186                 while (pqi.hasNext()) {
187                     PQEntry pqe = pqi.next();
188                     long tcurr = pqe.getExpirationTime();
189                     long tprev = prev.getExpirationTime();
190                     if (tcurr - tprev >= 2 * MIN_MSEC_BETWEEN_POLLS) {
191                         // found gap
192                         logger.trace("dev {} time {} found slot between {} and {}", d, aTime, tprev, tcurr);
193                         break;
194                     }
195                     prev = pqe;
196                 }
197                 expTime = prev.getExpirationTime() + MIN_MSEC_BETWEEN_POLLS;
198             }
199         }
200         return expTime;
201     }
202
203     private class PollQueueReader implements Runnable {
204         @Override
205         public void run() {
206             logger.debug("starting poll thread.");
207             synchronized (pollQueue) {
208                 while (keepRunning) {
209                     try {
210                         readPollQueue();
211                     } catch (InterruptedException e) {
212                         logger.warn("poll queue reader thread interrupted!");
213                         break;
214                     }
215                 }
216             }
217             logger.debug("poll thread exiting");
218         }
219
220         /**
221          * Waits for first element of poll queue to become current,
222          * then process it.
223          *
224          * @throws InterruptedException
225          */
226         private void readPollQueue() throws InterruptedException {
227             while (pollQueue.isEmpty() && keepRunning) {
228                 pollQueue.wait();
229             }
230             if (!keepRunning) {
231                 return;
232             }
233             // something is in the queue
234             long now = System.currentTimeMillis();
235             PQEntry pqe = pollQueue.first();
236             long tfirst = pqe.getExpirationTime();
237             long dt = tfirst - now;
238             if (dt > 0) { // must wait for this item to expire
239                 logger.trace("waiting for {} msec until {} comes due", dt, pqe);
240                 pollQueue.wait(dt);
241             } else { // queue entry has expired, process it!
242                 logger.trace("entry {} expired at time {}", pqe, now);
243                 processQueue(now);
244             }
245         }
246
247         /**
248          * Takes first element off the poll queue, polls the corresponding device,
249          * and puts the device back into the poll queue to be polled again later.
250          *
251          * @param now the current time
252          */
253         private void processQueue(long now) {
254             processQueue(now, pollQueue.pollFirst());
255         }
256
257         private void processQueue(long now, @Nullable PQEntry pqe) {
258             if (pqe != null) {
259                 pqe.getDevice().doPoll(0);
260                 addToPollQueue(pqe.getDevice(), now + pqe.getDevice().getPollInterval());
261             }
262         }
263     }
264
265     /**
266      * A poll queue entry corresponds to a single device that needs
267      * to be polled.
268      *
269      * @author Bernd Pfrommer - Initial contribution
270      *
271      */
272     private static class PQEntry implements Comparable<PQEntry> {
273         private InsteonDevice dev;
274         private long expirationTime;
275
276         PQEntry(InsteonDevice dev, long time) {
277             this.dev = dev;
278             this.expirationTime = time;
279         }
280
281         long getExpirationTime() {
282             return expirationTime;
283         }
284
285         InsteonDevice getDevice() {
286             return dev;
287         }
288
289         @Override
290         public int compareTo(PQEntry b) {
291             return (int) (expirationTime - b.expirationTime);
292         }
293
294         @Override
295         public String toString() {
296             return dev.getAddress().toString() + "/" + String.format("%tc", new Date(expirationTime));
297         }
298     }
299
300     /**
301      * Singleton pattern instance() method
302      *
303      * @return the poller instance
304      */
305     public static synchronized Poller instance() {
306         poller.start();
307         return (poller);
308     }
309 }