2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.elerotransmitterstick.internal.stick;
15 import java.io.IOException;
16 import java.util.ArrayList;
17 import java.util.Arrays;
18 import java.util.Collection;
19 import java.util.Comparator;
20 import java.util.HashMap;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.TreeSet;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.DelayQueue;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicBoolean;
30 import org.openhab.binding.elerotransmitterstick.internal.config.EleroTransmitterStickConfig;
31 import org.openhab.binding.elerotransmitterstick.internal.handler.StatusListener;
32 import org.openhab.core.io.transport.serial.SerialPortManager;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
37 * @author Volker Bier - Initial contribution
39 public class TransmitterStick {
40 private final Logger logger = LoggerFactory.getLogger(TransmitterStick.class);
41 private final HashMap<Integer, ArrayList<StatusListener>> allListeners = new HashMap<>();
42 private final StickListener listener;
44 private EleroTransmitterStickConfig config;
45 private SerialPortManager serialPortManager;
46 private CommandWorker worker;
48 public TransmitterStick(StickListener l) {
52 public synchronized void initialize(EleroTransmitterStickConfig stickConfig, ScheduledExecutorService scheduler,
53 SerialPortManager serialPortManager) {
54 logger.debug("Initializing Transmitter Stick...");
56 this.serialPortManager = serialPortManager;
57 worker = new CommandWorker();
58 scheduler.schedule(worker, 0, TimeUnit.MILLISECONDS);
59 logger.debug("Transmitter Stick initialized, worker running.");
62 public synchronized void dispose() {
63 logger.debug("Disposing Transmitter Stick...");
64 worker.terminateUpdates();
67 logger.debug("Transmitter Stick disposed.");
70 public synchronized ArrayList<Integer> getKnownIds() {
72 return worker.validIds;
75 return new ArrayList<>();
78 public synchronized void sendCommand(CommandType cmd, List<Integer> channelIds) {
80 worker.executeCommand(cmd, channelIds);
84 public synchronized void requestUpdate(List<Integer> channelIds) {
86 worker.requestUpdates(channelIds);
90 public void addStatusListener(int channelId, StatusListener listener) {
91 synchronized (allListeners) {
92 ArrayList<StatusListener> listeners = allListeners.get(channelId);
93 if (listeners == null) {
94 listeners = new ArrayList<>();
95 allListeners.put(channelId, listeners);
97 listeners.add(listener);
101 public void removeStatusListener(int channelId, StatusListener listener) {
102 synchronized (allListeners) {
103 ArrayList<StatusListener> listeners = allListeners.get(channelId);
104 if (listeners != null) {
105 listeners.remove(listener);
107 if (listeners.isEmpty()) {
108 allListeners.remove(channelId);
114 private void notifyListeners(int channelId, ResponseStatus status) {
115 synchronized (allListeners) {
116 ArrayList<StatusListener> listeners = allListeners.get(channelId);
117 if (listeners != null) {
118 for (StatusListener l : listeners) {
119 l.statusChanged(channelId, status);
127 * - only one INFO for the same channel ids
128 * - only one other command for the same channel ids
130 private static boolean prepareAddition(Command newCmd, Collection<Command> coll) {
131 Iterator<Command> queuedCommands = coll.iterator();
132 while (queuedCommands.hasNext()) {
133 Command existingCmd = queuedCommands.next();
135 if (Arrays.equals(newCmd.getChannelIds(), existingCmd.getChannelIds())) {
136 // remove pending INFOs for same channel ids
137 if (newCmd.getCommandType() == CommandType.INFO && existingCmd.getCommandType() == CommandType.INFO) {
138 if (existingCmd.getPriority() < newCmd.priority) {
139 // we have an older INFO command with same or lower priority, remove
140 queuedCommands.remove();
142 // existing has higher priority, skip addition
147 if (newCmd.getCommandType() != CommandType.INFO && existingCmd.getCommandType() != CommandType.INFO) {
148 // we have an older command for the same channels, remove
149 queuedCommands.remove();
157 static class DueCommandSet extends TreeSet<Command> {
158 private static final long serialVersionUID = -3216360253151368826L;
160 public DueCommandSet() {
161 super(new Comparator<>() {
163 * Due commands are sorted by priority first and then by delay.
166 public int compare(Command o1, Command o2) {
171 int d = o2.getPriority() - o1.getPriority();
176 if (d == 0 && o1.getDelay(TimeUnit.MILLISECONDS) < o2.getDelay(TimeUnit.MILLISECONDS)) {
185 public boolean add(Command e) {
186 if (TransmitterStick.prepareAddition(e, this)) {
194 class CommandWorker implements Runnable {
195 private ArrayList<Integer> validIds = new ArrayList<>();
196 private final AtomicBoolean terminated = new AtomicBoolean();
197 private final int updateInterval;
198 private final SerialConnection connection;
200 private final BlockingQueue<Command> cmdQueue = new DelayQueue<Command>() {
202 public boolean add(Command e) {
203 if (TransmitterStick.prepareAddition(e, this)) {
212 connection = new SerialConnection(config.portName, serialPortManager);
213 updateInterval = config.updateInterval;
216 void terminateUpdates() {
217 terminated.set(true);
219 // add a NONE command to make the thread exit from the call to take()
220 cmdQueue.add(new Command(CommandType.NONE));
223 void requestUpdates(List<Integer> channelIds) {
224 // this is a workaround for a bug in the stick firmware that does not
225 // handle commands that are sent to multiple channels correctly
226 if (channelIds.size() > 1) {
227 for (int channelId : channelIds) {
228 requestUpdates(List.of(channelId));
230 } else if (!channelIds.isEmpty()) {
231 final Integer[] ids = channelIds.toArray(new Integer[channelIds.size()]);
233 logger.debug("adding INFO command for channel id {} to queue...", Arrays.toString(ids));
234 cmdQueue.add(new DelayedCommand(CommandType.INFO, 0, Command.FAST_INFO_PRIORITY, ids));
238 void executeCommand(CommandType command, List<Integer> channelIds) {
239 // this is a workaround for a bug in the stick firmware that does not
240 // handle commands that are sent to multiple channels correctly
241 if (channelIds.size() > 1) {
242 for (int channelId : channelIds) {
243 executeCommand(command, List.of(channelId));
245 } else if (!channelIds.isEmpty()) {
246 final Integer[] ids = channelIds.toArray(new Integer[channelIds.size()]);
248 logger.debug("adding command {} for channel ids {} to queue...", command, Arrays.toString(ids));
249 cmdQueue.add(new Command(command, ids));
258 } catch (Throwable t) {
259 logger.error("Worker stopped by unexpected exception", t);
265 private void doWork() {
266 // list of due commands sorted by priority
267 final DueCommandSet dueCommands = new DueCommandSet();
269 logger.debug("worker started.");
270 while (!terminated.get()) {
274 // in case we have no commands that are currently due, wait for a new one
275 if (dueCommands.isEmpty()) {
276 logger.trace("No due commands, invoking take on queue...");
277 dueCommands.add(cmdQueue.take());
278 logger.trace("take returned {}", dueCommands.first());
281 if (!terminated.get()) {
282 // take all commands that are due from the queue
283 logger.trace("Draining all available commands...");
286 while ((cmd = cmdQueue.poll()) != null) {
288 dueCommands.remove(cmd);
289 dueCommands.add(cmd);
291 logger.trace("Drained {} commands, active queue size is {}, queue size is {}", drainCount,
292 dueCommands.size(), cmdQueue.size());
294 // process the command with the highest priority
295 cmd = dueCommands.first();
296 logger.debug("active command is {}", cmd);
298 if (cmd.getCommandType() != CommandType.NONE) {
299 Response response = connection.sendPacket(CommandUtil.createPacket(cmd));
300 // remove the command now we know it has been correctly processed
301 dueCommands.pollFirst();
303 if (response != null && response.hasStatus()) {
304 for (int id : response.getChannelIds()) {
305 notifyListeners(id, response.getStatus());
309 if (cmd instanceof TimedCommand timedCommand) {
310 long delay = 1000 * timedCommand.getDuration();
311 logger.debug("adding timed command STOP for channel ids {} to queue with delay {}...",
312 cmd.getChannelIds(), delay);
314 cmdQueue.add(new DelayedCommand(CommandType.STOP, delay, Command.TIMED_PRIORITY,
315 cmd.getChannelIds()));
316 } else if (response != null && response.isMoving()) {
317 logger.debug("adding timed command INFO for channel ids {} to queue with delay 2000...",
318 cmd.getChannelIds());
320 cmdQueue.add(new DelayedCommand(CommandType.INFO, 2000, Command.FAST_INFO_PRIORITY,
321 cmd.getChannelIds()));
322 } else if (cmd.getCommandType() == CommandType.INFO) {
323 logger.debug("adding timed command INFO for channel ids {} to queue with delay {}...",
324 cmd.getChannelIds(), updateInterval * 1000);
326 cmdQueue.add(new DelayedCommand(CommandType.INFO, updateInterval * 1000,
327 Command.INFO_PRIORITY, cmd.getChannelIds()));
330 logger.trace("ignoring NONE command.");
333 } catch (InterruptedException e) {
334 logger.error("Got interrupt while waiting for next command time", e);
335 Thread.currentThread().interrupt();
336 } catch (IOException e) {
337 logger.error("Got IOException communicating with the stick", e);
338 listener.connectionDropped(e);
342 logger.debug("worker finished.");
345 private void queryChannels() {
346 logger.debug("querying available channels...");
347 while (!terminated.get()) {
352 while (r == null && !terminated.get() && connection.isOpen()) {
353 logger.debug("sending CHECK packet...");
354 r = connection.sendPacket(CommandUtil.createPacket(CommandType.CHECK));
362 int[] knownIds = r.getChannelIds();
363 logger.debug("Worker found channels: {} ", Arrays.toString(knownIds));
365 for (int id : knownIds) {
366 if (!validIds.contains(id)) {
371 requestUpdates(validIds);
374 } catch (IOException e) {
375 logger.error("Got IOException communicating with the stick", e);
376 listener.connectionDropped(e);
378 } catch (InterruptedException e) {
379 logger.error("Got interrupt while waiting for next command time", e);
380 Thread.currentThread().interrupt();
385 private void waitConnected() {
386 if (!connection.isOpen()) {
387 while (!connection.isOpen() && !terminated.get()) {
390 listener.connectionEstablished();
391 } catch (ConnectException e1) {
392 listener.connectionDropped(e1);
395 if (!connection.isOpen() && !terminated.get()) {
398 } catch (InterruptedException e) {
399 logger.error("Got interrupt while waiting for next command time", e);
400 Thread.currentThread().interrupt();
406 logger.trace("finished waiting. connection open={}, terminated={}", connection.isOpen(), terminated.get());
410 public interface StickListener {
411 void connectionEstablished();
413 void connectionDropped(Exception e);