2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.elerotransmitterstick.internal.stick;
15 import java.io.IOException;
16 import java.util.ArrayList;
17 import java.util.Arrays;
18 import java.util.Collection;
19 import java.util.Collections;
20 import java.util.Comparator;
21 import java.util.HashMap;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.TreeSet;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.DelayQueue;
27 import java.util.concurrent.ScheduledExecutorService;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
31 import org.openhab.binding.elerotransmitterstick.internal.config.EleroTransmitterStickConfig;
32 import org.openhab.binding.elerotransmitterstick.internal.handler.StatusListener;
33 import org.openhab.core.io.transport.serial.SerialPortManager;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 * @author Volker Bier - Initial contribution
40 public class TransmitterStick {
41 private final Logger logger = LoggerFactory.getLogger(TransmitterStick.class);
42 private final HashMap<Integer, ArrayList<StatusListener>> allListeners = new HashMap<>();
43 private final StickListener listener;
45 private EleroTransmitterStickConfig config;
46 private SerialPortManager serialPortManager;
47 private CommandWorker worker;
49 public TransmitterStick(StickListener l) {
53 public synchronized void initialize(EleroTransmitterStickConfig stickConfig, ScheduledExecutorService scheduler,
54 SerialPortManager serialPortManager) {
55 logger.debug("Initializing Transmitter Stick...");
57 this.serialPortManager = serialPortManager;
58 worker = new CommandWorker();
59 scheduler.schedule(worker, 0, TimeUnit.MILLISECONDS);
60 logger.debug("Transmitter Stick initialized, worker running.");
63 public synchronized void dispose() {
64 logger.debug("Disposing Transmitter Stick...");
65 worker.terminateUpdates();
68 logger.debug("Transmitter Stick disposed.");
71 public synchronized ArrayList<Integer> getKnownIds() {
73 return worker.validIds;
76 return new ArrayList<>();
79 public synchronized void sendCommand(CommandType cmd, List<Integer> channelIds) {
81 worker.executeCommand(cmd, channelIds);
85 public synchronized void requestUpdate(List<Integer> channelIds) {
87 worker.requestUpdates(channelIds);
91 public void addStatusListener(int channelId, StatusListener listener) {
92 synchronized (allListeners) {
93 ArrayList<StatusListener> listeners = allListeners.get(channelId);
94 if (listeners == null) {
95 listeners = new ArrayList<>();
96 allListeners.put(channelId, listeners);
98 listeners.add(listener);
102 public void removeStatusListener(int channelId, StatusListener listener) {
103 synchronized (allListeners) {
104 ArrayList<StatusListener> listeners = allListeners.get(channelId);
105 if (listeners != null) {
106 listeners.remove(listener);
108 if (listeners.isEmpty()) {
109 allListeners.remove(channelId);
115 private void notifyListeners(int channelId, ResponseStatus status) {
116 synchronized (allListeners) {
117 ArrayList<StatusListener> listeners = allListeners.get(channelId);
118 if (listeners != null) {
119 for (StatusListener l : listeners) {
120 l.statusChanged(channelId, status);
128 * - only one INFO for the same channel ids
129 * - only one other command for the same channel ids
131 private static boolean prepareAddition(Command newCmd, Collection<Command> coll) {
132 Iterator<Command> queuedCommands = coll.iterator();
133 while (queuedCommands.hasNext()) {
134 Command existingCmd = queuedCommands.next();
136 if (Arrays.equals(newCmd.getChannelIds(), existingCmd.getChannelIds())) {
137 // remove pending INFOs for same channel ids
138 if (newCmd.getCommandType() == CommandType.INFO && existingCmd.getCommandType() == CommandType.INFO) {
139 if (existingCmd.getPriority() < newCmd.priority) {
140 // we have an older INFO command with same or lower priority, remove
141 queuedCommands.remove();
143 // existing has higher priority, skip addition
148 if (newCmd.getCommandType() != CommandType.INFO && existingCmd.getCommandType() != CommandType.INFO) {
149 // we have an older command for the same channels, remove
150 queuedCommands.remove();
158 static class DueCommandSet extends TreeSet<Command> {
159 private static final long serialVersionUID = -3216360253151368826L;
161 public DueCommandSet() {
162 super(new Comparator<>() {
164 * Due commands are sorted by priority first and then by delay.
167 public int compare(Command o1, Command o2) {
172 int d = o2.getPriority() - o1.getPriority();
177 if (d == 0 && o1.getDelay(TimeUnit.MILLISECONDS) < o2.getDelay(TimeUnit.MILLISECONDS)) {
186 public boolean add(Command e) {
187 if (TransmitterStick.prepareAddition(e, this)) {
195 class CommandWorker implements Runnable {
196 private ArrayList<Integer> validIds = new ArrayList<>();
197 private final AtomicBoolean terminated = new AtomicBoolean();
198 private final int updateInterval;
199 private final SerialConnection connection;
201 private final BlockingQueue<Command> cmdQueue = new DelayQueue<Command>() {
203 public boolean add(Command e) {
204 if (TransmitterStick.prepareAddition(e, this)) {
213 connection = new SerialConnection(config.portName, serialPortManager);
214 updateInterval = config.updateInterval;
217 void terminateUpdates() {
218 terminated.set(true);
220 // add a NONE command to make the thread exit from the call to take()
221 cmdQueue.add(new Command(CommandType.NONE));
224 void requestUpdates(List<Integer> channelIds) {
225 // this is a workaround for a bug in the stick firmware that does not
226 // handle commands that are sent to multiple channels correctly
227 if (channelIds.size() > 1) {
228 for (int channelId : channelIds) {
229 requestUpdates(Collections.singletonList(channelId));
231 } else if (!channelIds.isEmpty()) {
232 final Integer[] ids = channelIds.toArray(new Integer[channelIds.size()]);
234 logger.debug("adding INFO command for channel id {} to queue...", Arrays.toString(ids));
235 cmdQueue.add(new DelayedCommand(CommandType.INFO, 0, Command.FAST_INFO_PRIORITY, ids));
239 void executeCommand(CommandType command, List<Integer> channelIds) {
240 // this is a workaround for a bug in the stick firmware that does not
241 // handle commands that are sent to multiple channels correctly
242 if (channelIds.size() > 1) {
243 for (int channelId : channelIds) {
244 executeCommand(command, Collections.singletonList(channelId));
246 } else if (!channelIds.isEmpty()) {
247 final Integer[] ids = channelIds.toArray(new Integer[channelIds.size()]);
249 logger.debug("adding command {} for channel ids {} to queue...", command, Arrays.toString(ids));
250 cmdQueue.add(new Command(command, ids));
259 } catch (Throwable t) {
260 logger.error("Worker stopped by unexpected exception", t);
266 private void doWork() {
267 // list of due commands sorted by priority
268 final DueCommandSet dueCommands = new DueCommandSet();
270 logger.debug("worker started.");
271 while (!terminated.get()) {
275 // in case we have no commands that are currently due, wait for a new one
276 if (dueCommands.isEmpty()) {
277 logger.trace("No due commands, invoking take on queue...");
278 dueCommands.add(cmdQueue.take());
279 logger.trace("take returned {}", dueCommands.first());
282 if (!terminated.get()) {
283 // take all commands that are due from the queue
284 logger.trace("Draining all available commands...");
287 while ((cmd = cmdQueue.poll()) != null) {
289 dueCommands.remove(cmd);
290 dueCommands.add(cmd);
292 logger.trace("Drained {} commands, active queue size is {}, queue size is {}", drainCount,
293 dueCommands.size(), cmdQueue.size());
295 // process the command with the highest priority
296 cmd = dueCommands.first();
297 logger.debug("active command is {}", cmd);
299 if (cmd.getCommandType() != CommandType.NONE) {
300 Response response = connection.sendPacket(CommandUtil.createPacket(cmd));
301 // remove the command now we know it has been correctly processed
302 dueCommands.pollFirst();
304 if (response != null && response.hasStatus()) {
305 for (int id : response.getChannelIds()) {
306 notifyListeners(id, response.getStatus());
310 if (cmd instanceof TimedCommand) {
311 long delay = 1000 * ((TimedCommand) cmd).getDuration();
312 logger.debug("adding timed command STOP for channel ids {} to queue with delay {}...",
313 cmd.getChannelIds(), delay);
315 cmdQueue.add(new DelayedCommand(CommandType.STOP, delay, Command.TIMED_PRIORITY,
316 cmd.getChannelIds()));
317 } else if (response != null && response.isMoving()) {
318 logger.debug("adding timed command INFO for channel ids {} to queue with delay 2000...",
319 cmd.getChannelIds());
321 cmdQueue.add(new DelayedCommand(CommandType.INFO, 2000, Command.FAST_INFO_PRIORITY,
322 cmd.getChannelIds()));
323 } else if (cmd.getCommandType() == CommandType.INFO) {
324 logger.debug("adding timed command INFO for channel ids {} to queue with delay {}...",
325 cmd.getChannelIds(), updateInterval * 1000);
327 cmdQueue.add(new DelayedCommand(CommandType.INFO, updateInterval * 1000,
328 Command.INFO_PRIORITY, cmd.getChannelIds()));
331 logger.trace("ignoring NONE command.");
334 } catch (InterruptedException e) {
335 logger.error("Got interrupt while waiting for next command time", e);
336 Thread.currentThread().interrupt();
337 } catch (IOException e) {
338 logger.error("Got IOException communicating with the stick", e);
339 listener.connectionDropped(e);
343 logger.debug("worker finished.");
346 private void queryChannels() {
347 logger.debug("querying available channels...");
348 while (!terminated.get()) {
353 while (r == null && !terminated.get() && connection.isOpen()) {
354 logger.debug("sending CHECK packet...");
355 r = connection.sendPacket(CommandUtil.createPacket(CommandType.CHECK));
363 int[] knownIds = r.getChannelIds();
364 logger.debug("Worker found channels: {} ", Arrays.toString(knownIds));
366 for (int id : knownIds) {
367 if (!validIds.contains(id)) {
372 requestUpdates(validIds);
375 } catch (IOException e) {
376 logger.error("Got IOException communicating with the stick", e);
377 listener.connectionDropped(e);
379 } catch (InterruptedException e) {
380 logger.error("Got interrupt while waiting for next command time", e);
381 Thread.currentThread().interrupt();
386 private void waitConnected() {
387 if (!connection.isOpen()) {
388 while (!connection.isOpen() && !terminated.get()) {
391 listener.connectionEstablished();
392 } catch (ConnectException e1) {
393 listener.connectionDropped(e1);
396 if (!connection.isOpen() && !terminated.get()) {
399 } catch (InterruptedException e) {
400 logger.error("Got interrupt while waiting for next command time", e);
401 Thread.currentThread().interrupt();
407 logger.trace("finished waiting. connection open={}, terminated={}", connection.isOpen(), terminated.get());
411 public interface StickListener {
412 void connectionEstablished();
414 void connectionDropped(Exception e);