2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt.generic.tools;
15 import java.util.ArrayList;
16 import java.util.Collections;
17 import java.util.List;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.ScheduledFuture;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicReference;
22 import java.util.function.Consumer;
24 import org.eclipse.jdt.annotation.NonNullByDefault;
25 import org.eclipse.jdt.annotation.Nullable;
28 * Collects objects over time until a specified delay passed by.
29 * Then call the user back with a list of accumulated objects and start over again.
31 * @author David Graeff - Initial contribution
33 * @param <T> Any object
36 public class DelayedBatchProcessing<T> implements Consumer<T> {
37 private final int delay;
38 private final Consumer<List<T>> consumer;
39 private final List<T> queue = Collections.synchronizedList(new ArrayList<>());
40 private final ScheduledExecutorService executor;
41 protected final AtomicReference<@Nullable ScheduledFuture<?>> futureRef = new AtomicReference<>();
44 * Creates a {@link DelayedBatchProcessing}.
46 * @param delay A delay in milliseconds
47 * @param consumer A consumer of the list of collected objects
48 * @param executor A scheduled executor service
50 public DelayedBatchProcessing(int delay, Consumer<List<T>> consumer, ScheduledExecutorService executor) {
52 this.consumer = consumer;
53 this.executor = executor;
55 throw new IllegalArgumentException("Delay need to be greater than 0!");
60 * Add new object to the batch process list. Every time a new object is received,
61 * the delay timer is rescheduled.
66 public void accept(T t) {
68 cancel(futureRef.getAndSet(executor.schedule(this::run, delay, TimeUnit.MILLISECONDS)));
72 * Return the so far accumulated objects, but do not deliver them to the target consumer anymore.
74 * @return A list of accumulated objects
76 public List<T> join() {
77 cancel(futureRef.getAndSet(null));
78 List<T> lqueue = new ArrayList<>();
79 synchronized (queue) {
87 * Return true if there is a delayed processing going on.
89 public boolean isArmed() {
90 ScheduledFuture<?> scheduledFuture = this.futureRef.get();
91 return scheduledFuture != null && !scheduledFuture.isDone();
95 * Deliver queued items now to the target consumer.
97 public void forceProcessNow() {
98 cancel(futureRef.getAndSet(null));
103 List<T> lqueue = new ArrayList<>();
104 synchronized (queue) {
105 lqueue.addAll(queue);
109 if (!lqueue.isEmpty()) {
110 consumer.accept(lqueue);
114 private static void cancel(@Nullable ScheduledFuture<?> future) {
115 if (future != null) {
116 future.cancel(false);