]> git.basschouten.com Git - openhab-addons.git/blob
72fc553abb6904f20c715accc91b5c20c92b3e50
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic.tools;
14
15 import java.util.ArrayList;
16 import java.util.Collections;
17 import java.util.List;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.ScheduledFuture;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicReference;
22 import java.util.function.Consumer;
23
24 import org.eclipse.jdt.annotation.NonNullByDefault;
25 import org.eclipse.jdt.annotation.Nullable;
26
27 /**
28  * Collects objects over time until a specified delay passed by.
29  * Then call the user back with a list of accumulated objects and start over again.
30  *
31  * @author David Graeff - Initial contribution
32  *
33  * @param <T> Any object
34  */
35 @NonNullByDefault
36 public class DelayedBatchProcessing<T> implements Consumer<T> {
37     private final int delay;
38     private final Consumer<List<T>> consumer;
39     private final List<T> queue = Collections.synchronizedList(new ArrayList<>());
40     private final ScheduledExecutorService executor;
41     protected final AtomicReference<@Nullable ScheduledFuture<?>> futureRef = new AtomicReference<>();
42
43     /**
44      * Creates a {@link DelayedBatchProcessing}.
45      *
46      * @param delay A delay in milliseconds
47      * @param consumer A consumer of the list of collected objects
48      * @param executor A scheduled executor service
49      */
50     public DelayedBatchProcessing(int delay, Consumer<List<T>> consumer, ScheduledExecutorService executor) {
51         this.delay = delay;
52         this.consumer = consumer;
53         this.executor = executor;
54         if (delay <= 0) {
55             throw new IllegalArgumentException("Delay need to be greater than 0!");
56         }
57     }
58
59     /**
60      * Add new object to the batch process list. Every time a new object is received,
61      * the delay timer is rescheduled.
62      *
63      * @param t An object
64      */
65     @Override
66     public void accept(T t) {
67         queue.add(t);
68         cancel(futureRef.getAndSet(executor.schedule(this::run, delay, TimeUnit.MILLISECONDS)));
69     }
70
71     /**
72      * Return the so far accumulated objects, but do not deliver them to the target consumer anymore.
73      *
74      * @return A list of accumulated objects
75      */
76     public List<T> join() {
77         cancel(futureRef.getAndSet(null));
78         List<T> lqueue = new ArrayList<>();
79         synchronized (queue) {
80             lqueue.addAll(queue);
81             queue.clear();
82         }
83         return lqueue;
84     }
85
86     /**
87      * Return true if there is a delayed processing going on.
88      */
89     public boolean isArmed() {
90         ScheduledFuture<?> scheduledFuture = this.futureRef.get();
91         return scheduledFuture != null && !scheduledFuture.isDone();
92     }
93
94     /**
95      * Deliver queued items now to the target consumer.
96      */
97     public void forceProcessNow() {
98         cancel(futureRef.getAndSet(null));
99         run();
100     }
101
102     private void run() {
103         List<T> lqueue = new ArrayList<>();
104         synchronized (queue) {
105             lqueue.addAll(queue);
106             queue.clear();
107         }
108
109         if (!lqueue.isEmpty()) {
110             consumer.accept(lqueue);
111         }
112     }
113
114     private static void cancel(@Nullable ScheduledFuture<?> future) {
115         if (future != null) {
116             future.cancel(false);
117         }
118     }
119 }