]> git.basschouten.com Git - openhab-addons.git/blob
44d4c1322c3407b495ee268c6f66c452294f4779
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.milight.internal.protocol;
14
15 import java.io.Closeable;
16 import java.io.IOException;
17 import java.util.Iterator;
18 import java.util.NoSuchElementException;
19 import java.util.concurrent.BlockingQueue;
20 import java.util.concurrent.LinkedBlockingQueue;
21
22 import org.eclipse.jdt.annotation.NonNullByDefault;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 /**
28  * This implements a queue for UDP sending, where each item to be send is associated with an id.
29  * If a new item is added, that has the same id of an already queued item, it replaces the
30  * queued item. This is used for milight packets, where older bridges accept commands with a 100ms
31  * delay only. The user may issue absolute brightness or color changes faster than 1/10s though, and we don't
32  * want to just queue up those commands but apply the newest command only.
33  *
34  * @author David Graeff - Initial contribution
35  */
36 @NonNullByDefault
37 public class QueuedSend implements Runnable, Closeable {
38     private final Logger logger = LoggerFactory.getLogger(QueuedSend.class);
39
40     final BlockingQueue<QueueItem> queue = new LinkedBlockingQueue<>(20);
41     private boolean willbeclosed = false;
42     private @Nullable Thread thread;
43
44     public static final byte NO_CATEGORY = 0;
45
46     /**
47      * Start the send thread of this queue. Call dispose() to quit the thread.
48      */
49     public void start() {
50         willbeclosed = false;
51         thread = new Thread(this);
52         thread.start();
53     }
54
55     /**
56      * The queue process
57      */
58     @Override
59     public void run() {
60         QueueItem item = null;
61         while (!willbeclosed) {
62             // If the command belongs to a chain of commands, get the next command now.
63             if (item != null && item.next != null) {
64                 item = item.next;
65             } else {
66                 try {
67                     // block/wait for another item
68                     item = queue.take();
69                 } catch (InterruptedException e) {
70                     if (!willbeclosed) {
71                         logger.error("Queue take failed: {}", e.getLocalizedMessage());
72                     }
73                     break;
74                 }
75             }
76
77             if (item.isInvalid()) {
78                 // Just in case it is a command chain, set the item to null to not process any chained commands.
79                 item = null;
80                 continue;
81             }
82
83             try {
84                 for (int i = 0; i < (item.repeatable ? item.repeatCommands : 1); ++i) {
85                     item.socket.send(item.packet);
86
87                     if (ProtocolConstants.DEBUG_SESSION) {
88                         StringBuilder s = new StringBuilder();
89                         for (int c = 0; c < item.packet.getData().length; ++c) {
90                             s.append(String.format("%02X ", item.packet.getData()[c]));
91                         }
92                         logger.debug("Sent packet '{}' to bridge {}", s.toString(),
93                                 item.packet.getAddress().getHostAddress());
94                     }
95                 }
96             } catch (IOException e) {
97                 logger.warn("Failed to send Message to '{}': {}", item.packet.getAddress().getHostAddress(),
98                         e.getMessage());
99             }
100
101             try {
102                 Thread.sleep(item.delayTime);
103             } catch (InterruptedException e) {
104                 if (!willbeclosed) {
105                     logger.warn("Queue sleep failed: {}", e.getLocalizedMessage());
106                 }
107                 break;
108             }
109         }
110     }
111
112     /**
113      * Mark all commands in the queue invalid that have the same unique id as the given one. This does not synchronise
114      * with the sender thread. If an element has been started to being processed, this method has no more effect on that
115      * element. Command chains are always executed in a row. Even if the head of the command queue has been marked
116      * as invalid, if the processing has been started, the chain will be processed completely.
117      *
118      * @param uniqueCommandId
119      */
120     private void removeFromQueue(int uniqueCommandId) {
121         Iterator<QueueItem> iterator = queue.iterator();
122         while (iterator.hasNext()) {
123             try {
124                 QueueItem item = iterator.next();
125                 if (item.uniqueCommandId == uniqueCommandId) {
126                     item.makeInvalid();
127                 }
128             } catch (IllegalStateException e) {
129                 // Ignore threading errors
130             } catch (NoSuchElementException e) {
131                 // The element might have been processed already while iterate.
132                 // Ignore NoSuchElementException here.
133             }
134         }
135     }
136
137     /**
138      * Add data to the send queue.
139      *
140      * <p>
141      * You have to create your own QueueItem. This allows to you create a chain of commands. A chain will always
142      * executed in order and without interrupting the sequence with another command. A chain will be removed completely
143      * if another command with the same category is added except if the chain has been started to be processed.
144      * </p>
145      *
146      * @param item A queue item, cannot be null.
147      */
148     public void queue(QueueItem item) {
149         if (item.uniqueCommandId != NO_CATEGORY) {
150             removeFromQueue(item.uniqueCommandId);
151         }
152         queue.offer(item);
153     }
154
155     /**
156      * Once closed, this object can't be reused anymore.
157      */
158     @Override
159     public void close() throws IOException {
160         willbeclosed = true;
161         final Thread threadL = this.thread;
162         if (threadL != null) {
163             try {
164                 threadL.join(200);
165             } catch (InterruptedException e) {
166             }
167             threadL.interrupt();
168         }
169         this.thread = null;
170     }
171 }