2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.insteon.internal.driver;
16 import java.util.Iterator;
17 import java.util.SortedSet;
18 import java.util.TreeSet;
20 import org.eclipse.jdt.annotation.NonNullByDefault;
21 import org.eclipse.jdt.annotation.Nullable;
22 import org.openhab.binding.insteon.internal.device.InsteonDevice;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
27 * This class manages the polling of all devices.
28 * Between successive polls of a any device there is a quiet time of
29 * at least MIN_MSEC_BETWEEN_POLLS. This avoids bunching up of poll messages
30 * and keeps the network bandwidth open for other messages.
32 * - An entry in the poll queue corresponds to a single device, i.e. each device should
33 * have exactly one entry in the poll queue. That entry is created when startPolling()
34 * is called, and then re-enqueued whenever it expires.
35 * - When a device comes up for polling, its doPoll() method is called, which in turn
36 * puts an entry into that devices request queue. So the Poller class actually never
37 * sends out messages directly. That is done by the device itself via its request
38 * queue. The poller just reminds the device to poll.
40 * @author Bernd Pfrommer - Initial contribution
41 * @author Rob Nielsen - Port to openHAB 2 insteon binding
45 private static final long MIN_MSEC_BETWEEN_POLLS = 2000L;
47 private final Logger logger = LoggerFactory.getLogger(Poller.class);
48 private static Poller poller = new Poller(); // for singleton
50 private @Nullable Thread pollThread = null;
51 private TreeSet<PQEntry> pollQueue = new TreeSet<>();
52 private boolean keepRunning = true;
61 * Get size of poll queue
63 * @return number of devices being polled
65 public int getSizeOfQueue() {
66 return (pollQueue.size());
70 * Register a device for polling.
72 * @param d device to register for polling
73 * @param aNumDev approximate number of total devices
75 public void startPolling(InsteonDevice d, int aNumDev) {
76 logger.debug("start polling device {}", d);
77 synchronized (pollQueue) {
78 // try to spread out the scheduling when
80 int n = pollQueue.size();
81 long pollDelay = n * d.getPollInterval() / (aNumDev > 0 ? aNumDev : 1);
82 addToPollQueue(d, System.currentTimeMillis() + pollDelay);
88 * Start polling a given device
90 * @param d reference to the device to be polled
92 public void stopPolling(InsteonDevice d) {
93 synchronized (pollQueue) {
94 for (Iterator<PQEntry> i = pollQueue.iterator(); i.hasNext();) {
95 if (i.next().getDevice().getAddress().equals(d.getAddress())) {
97 logger.debug("stopped polling device {}", d);
104 * Starts the poller thread
106 public void start() {
107 if (pollThread == null) {
108 pollThread = new Thread(new PollQueueReader());
109 setParamsAndStart(pollThread);
113 private void setParamsAndStart(@Nullable Thread thread) {
114 if (thread != null) {
115 thread.setName("Insteon Poll Queue Reader");
116 thread.setDaemon(true);
122 * Stops the poller thread
125 logger.debug("stopping poller!");
126 synchronized (pollQueue) {
132 Thread pollThread = this.pollThread;
133 if (pollThread != null) {
135 this.pollThread = null;
138 } catch (InterruptedException e) {
139 logger.debug("got interrupted on exit: {}", e.getMessage());
144 * Adds a device to the poll queue. After this call, the device's doPoll() method
145 * will be called according to the polling frequency set.
147 * @param d the device to poll periodically
148 * @param time the target time for the next poll to happen. Note that this time is merely
149 * a suggestion, and may be adjusted, because there must be at least a minimum gap in polling.
152 private void addToPollQueue(InsteonDevice d, long time) {
153 long texp = findNextExpirationTime(d, time);
154 PQEntry ne = new PQEntry(d, texp);
155 logger.trace("added entry {} originally aimed at time {}", ne, String.format("%tc", new Date(time)));
160 * Finds the best expiration time for a poll queue, i.e. a time slot that is after the
161 * desired expiration time, but does not collide with any of the already scheduled
164 * @param d device to poll (for logging)
165 * @param aTime desired time after which the device should be polled
166 * @return the suggested time to poll
169 private long findNextExpirationTime(InsteonDevice d, long aTime) {
170 long expTime = aTime;
171 // tailSet finds all those that expire after aTime - buffer
172 SortedSet<PQEntry> ts = pollQueue.tailSet(new PQEntry(d, aTime - MIN_MSEC_BETWEEN_POLLS));
174 // all entries in the poll queue are ahead of the new element,
175 // go ahead and simply add it to the end
178 Iterator<PQEntry> pqi = ts.iterator();
179 PQEntry prev = pqi.next();
180 if (prev.getExpirationTime() > aTime + MIN_MSEC_BETWEEN_POLLS) {
181 // there is a time slot free before the head of the tail set
184 // look for a gap where we can squeeze in
185 // a new poll while maintaining MIN_MSEC_BETWEEN_POLLS
186 while (pqi.hasNext()) {
187 PQEntry pqe = pqi.next();
188 long tcurr = pqe.getExpirationTime();
189 long tprev = prev.getExpirationTime();
190 if (tcurr - tprev >= 2 * MIN_MSEC_BETWEEN_POLLS) {
192 logger.trace("dev {} time {} found slot between {} and {}", d, aTime, tprev, tcurr);
197 expTime = prev.getExpirationTime() + MIN_MSEC_BETWEEN_POLLS;
203 private class PollQueueReader implements Runnable {
206 logger.debug("starting poll thread.");
207 synchronized (pollQueue) {
208 while (keepRunning) {
211 } catch (InterruptedException e) {
212 logger.warn("poll queue reader thread interrupted!");
217 logger.debug("poll thread exiting");
221 * Waits for first element of poll queue to become current,
224 * @throws InterruptedException
226 private void readPollQueue() throws InterruptedException {
227 while (pollQueue.isEmpty() && keepRunning) {
233 // something is in the queue
234 long now = System.currentTimeMillis();
235 PQEntry pqe = pollQueue.first();
236 long tfirst = pqe.getExpirationTime();
237 long dt = tfirst - now;
238 if (dt > 0) { // must wait for this item to expire
239 logger.trace("waiting for {} msec until {} comes due", dt, pqe);
241 } else { // queue entry has expired, process it!
242 logger.trace("entry {} expired at time {}", pqe, now);
248 * Takes first element off the poll queue, polls the corresponding device,
249 * and puts the device back into the poll queue to be polled again later.
251 * @param now the current time
253 private void processQueue(long now) {
254 processQueue(now, pollQueue.pollFirst());
257 private void processQueue(long now, @Nullable PQEntry pqe) {
259 pqe.getDevice().doPoll(0);
260 addToPollQueue(pqe.getDevice(), now + pqe.getDevice().getPollInterval());
266 * A poll queue entry corresponds to a single device that needs
269 * @author Bernd Pfrommer - Initial contribution
272 private static class PQEntry implements Comparable<PQEntry> {
273 private InsteonDevice dev;
274 private long expirationTime;
276 PQEntry(InsteonDevice dev, long time) {
278 this.expirationTime = time;
281 long getExpirationTime() {
282 return expirationTime;
285 InsteonDevice getDevice() {
290 public int compareTo(PQEntry b) {
291 return (int) (expirationTime - b.expirationTime);
295 public String toString() {
296 return dev.getAddress().toString() + "/" + String.format("%tc", new Date(expirationTime));
301 * Singleton pattern instance() method
303 * @return the poller instance
305 public static synchronized Poller instance() {