2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.milight.internal.protocol;
15 import java.io.Closeable;
16 import java.io.IOException;
17 import java.util.Iterator;
18 import java.util.NoSuchElementException;
19 import java.util.concurrent.BlockingQueue;
20 import java.util.concurrent.LinkedBlockingQueue;
22 import org.eclipse.jdt.annotation.NonNullByDefault;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
28 * This implements a queue for UDP sending, where each item to be send is associated with an id.
29 * If a new item is added, that has the same id of an already queued item, it replaces the
30 * queued item. This is used for milight packets, where older bridges accept commands with a 100ms
31 * delay only. The user may issue absolute brightness or color changes faster than 1/10s though, and we don't
32 * want to just queue up those commands but apply the newest command only.
34 * @author David Graeff - Initial contribution
37 public class QueuedSend implements Runnable, Closeable {
38 private final Logger logger = LoggerFactory.getLogger(QueuedSend.class);
40 final BlockingQueue<QueueItem> queue = new LinkedBlockingQueue<>(20);
41 private boolean willbeclosed = false;
42 private @Nullable Thread thread;
44 public static final byte NO_CATEGORY = 0;
47 * Start the send thread of this queue. Call dispose() to quit the thread.
51 thread = new Thread(this);
60 QueueItem item = null;
61 while (!willbeclosed) {
62 // If the command belongs to a chain of commands, get the next command now.
63 if (item != null && item.next != null) {
67 // block/wait for another item
69 } catch (InterruptedException e) {
71 logger.error("Queue take failed: {}", e.getLocalizedMessage());
77 if (item.isInvalid()) {
78 // Just in case it is a command chain, set the item to null to not process any chained commands.
84 for (int i = 0; i < (item.repeatable ? item.repeatCommands : 1); ++i) {
85 item.socket.send(item.packet);
87 if (ProtocolConstants.DEBUG_SESSION) {
88 StringBuilder s = new StringBuilder();
89 for (int c = 0; c < item.packet.getData().length; ++c) {
90 s.append(String.format("%02X ", item.packet.getData()[c]));
92 logger.debug("Sent packet '{}' to bridge {}", s.toString(),
93 item.packet.getAddress().getHostAddress());
96 } catch (IOException e) {
97 logger.warn("Failed to send Message to '{}': {}", item.packet.getAddress().getHostAddress(),
102 Thread.sleep(item.delayTime);
103 } catch (InterruptedException e) {
105 logger.warn("Queue sleep failed: {}", e.getLocalizedMessage());
113 * Mark all commands in the queue invalid that have the same unique id as the given one. This does not synchronise
114 * with the sender thread. If an element has been started to being processed, this method has no more effect on that
115 * element. Command chains are always executed in a row. Even if the head of the command queue has been marked
116 * as invalid, if the processing has been started, the chain will be processed completely.
118 * @param uniqueCommandId
120 private void removeFromQueue(int uniqueCommandId) {
121 Iterator<QueueItem> iterator = queue.iterator();
122 while (iterator.hasNext()) {
124 QueueItem item = iterator.next();
125 if (item.uniqueCommandId == uniqueCommandId) {
128 } catch (IllegalStateException e) {
129 // Ignore threading errors
130 } catch (NoSuchElementException e) {
131 // The element might have been processed already while iterate.
132 // Ignore NoSuchElementException here.
138 * Add data to the send queue.
141 * You have to create your own QueueItem. This allows to you create a chain of commands. A chain will always
142 * executed in order and without interrupting the sequence with another command. A chain will be removed completely
143 * if another command with the same category is added except if the chain has been started to be processed.
146 * @param item A queue item, cannot be null.
148 public void queue(QueueItem item) {
149 if (item.uniqueCommandId != NO_CATEGORY) {
150 removeFromQueue(item.uniqueCommandId);
156 * Once closed, this object can't be reused anymore.
159 public void close() throws IOException {
161 final Thread threadL = this.thread;
162 if (threadL != null) {
165 } catch (InterruptedException e) {