]> git.basschouten.com Git - openhab-addons.git/blob
4462cfac20ab414c71924b46b0129a6448447dc8
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.elerotransmitterstick.internal.stick;
14
15 import java.io.IOException;
16 import java.util.ArrayList;
17 import java.util.Arrays;
18 import java.util.Collection;
19 import java.util.Comparator;
20 import java.util.HashMap;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.TreeSet;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.DelayQueue;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicBoolean;
29
30 import org.openhab.binding.elerotransmitterstick.internal.config.EleroTransmitterStickConfig;
31 import org.openhab.binding.elerotransmitterstick.internal.handler.StatusListener;
32 import org.openhab.core.io.transport.serial.SerialPortManager;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * @author Volker Bier - Initial contribution
38  */
39 public class TransmitterStick {
40     private final Logger logger = LoggerFactory.getLogger(TransmitterStick.class);
41     private final HashMap<Integer, ArrayList<StatusListener>> allListeners = new HashMap<>();
42     private final StickListener listener;
43
44     private EleroTransmitterStickConfig config;
45     private SerialPortManager serialPortManager;
46     private CommandWorker worker;
47
48     public TransmitterStick(StickListener l) {
49         listener = l;
50     }
51
52     public synchronized void initialize(EleroTransmitterStickConfig stickConfig, ScheduledExecutorService scheduler,
53             SerialPortManager serialPortManager) {
54         logger.debug("Initializing Transmitter Stick...");
55         config = stickConfig;
56         this.serialPortManager = serialPortManager;
57         worker = new CommandWorker();
58         scheduler.schedule(worker, 0, TimeUnit.MILLISECONDS);
59         logger.debug("Transmitter Stick initialized, worker running.");
60     }
61
62     public synchronized void dispose() {
63         logger.debug("Disposing Transmitter Stick...");
64         worker.terminateUpdates();
65         worker = null;
66         config = null;
67         logger.debug("Transmitter Stick disposed.");
68     }
69
70     public synchronized ArrayList<Integer> getKnownIds() {
71         if (worker != null) {
72             return worker.validIds;
73         }
74
75         return new ArrayList<>();
76     }
77
78     public synchronized void sendCommand(CommandType cmd, List<Integer> channelIds) {
79         if (worker != null) {
80             worker.executeCommand(cmd, channelIds);
81         }
82     }
83
84     public synchronized void requestUpdate(List<Integer> channelIds) {
85         if (worker != null) {
86             worker.requestUpdates(channelIds);
87         }
88     }
89
90     public void addStatusListener(int channelId, StatusListener listener) {
91         synchronized (allListeners) {
92             ArrayList<StatusListener> listeners = allListeners.get(channelId);
93             if (listeners == null) {
94                 listeners = new ArrayList<>();
95                 allListeners.put(channelId, listeners);
96             }
97             listeners.add(listener);
98         }
99     }
100
101     public void removeStatusListener(int channelId, StatusListener listener) {
102         synchronized (allListeners) {
103             ArrayList<StatusListener> listeners = allListeners.get(channelId);
104             if (listeners != null) {
105                 listeners.remove(listener);
106
107                 if (listeners.isEmpty()) {
108                     allListeners.remove(channelId);
109                 }
110             }
111         }
112     }
113
114     private void notifyListeners(int channelId, ResponseStatus status) {
115         synchronized (allListeners) {
116             ArrayList<StatusListener> listeners = allListeners.get(channelId);
117             if (listeners != null) {
118                 for (StatusListener l : listeners) {
119                     l.statusChanged(channelId, status);
120                 }
121             }
122         }
123     }
124
125     /**
126      * Make sure we have
127      * - only one INFO for the same channel ids
128      * - only one other command for the same channel ids
129      */
130     private static boolean prepareAddition(Command newCmd, Collection<Command> coll) {
131         Iterator<Command> queuedCommands = coll.iterator();
132         while (queuedCommands.hasNext()) {
133             Command existingCmd = queuedCommands.next();
134
135             if (Arrays.equals(newCmd.getChannelIds(), existingCmd.getChannelIds())) {
136                 // remove pending INFOs for same channel ids
137                 if (newCmd.getCommandType() == CommandType.INFO && existingCmd.getCommandType() == CommandType.INFO) {
138                     if (existingCmd.getPriority() < newCmd.priority) {
139                         // we have an older INFO command with same or lower priority, remove
140                         queuedCommands.remove();
141                     } else {
142                         // existing has higher priority, skip addition
143                         return false;
144                     }
145                 }
146
147                 if (newCmd.getCommandType() != CommandType.INFO && existingCmd.getCommandType() != CommandType.INFO) {
148                     // we have an older command for the same channels, remove
149                     queuedCommands.remove();
150                 }
151             }
152         }
153
154         return true;
155     }
156
157     static class DueCommandSet extends TreeSet<Command> {
158         private static final long serialVersionUID = -3216360253151368826L;
159
160         public DueCommandSet() {
161             super(new Comparator<>() {
162                 /**
163                  * Due commands are sorted by priority first and then by delay.
164                  */
165                 @Override
166                 public int compare(Command o1, Command o2) {
167                     if (o1.equals(o2)) {
168                         return 0;
169                     }
170
171                     int d = o2.getPriority() - o1.getPriority();
172                     if (d < 0) {
173                         return -1;
174                     }
175
176                     if (d == 0 && o1.getDelay(TimeUnit.MILLISECONDS) < o2.getDelay(TimeUnit.MILLISECONDS)) {
177                         return -1;
178                     }
179                     return 1;
180                 }
181             });
182         }
183
184         @Override
185         public boolean add(Command e) {
186             if (TransmitterStick.prepareAddition(e, this)) {
187                 return super.add(e);
188             }
189
190             return false;
191         }
192     }
193
194     class CommandWorker implements Runnable {
195         private ArrayList<Integer> validIds = new ArrayList<>();
196         private final AtomicBoolean terminated = new AtomicBoolean();
197         private final int updateInterval;
198         private final SerialConnection connection;
199
200         private final BlockingQueue<Command> cmdQueue = new DelayQueue<Command>() {
201             @Override
202             public boolean add(Command e) {
203                 if (TransmitterStick.prepareAddition(e, this)) {
204                     return super.add(e);
205                 }
206
207                 return false;
208             }
209         };
210
211         CommandWorker() {
212             connection = new SerialConnection(config.portName, serialPortManager);
213             updateInterval = config.updateInterval;
214         }
215
216         void terminateUpdates() {
217             terminated.set(true);
218
219             // add a NONE command to make the thread exit from the call to take()
220             cmdQueue.add(new Command(CommandType.NONE));
221         }
222
223         void requestUpdates(List<Integer> channelIds) {
224             // this is a workaround for a bug in the stick firmware that does not
225             // handle commands that are sent to multiple channels correctly
226             if (channelIds.size() > 1) {
227                 for (int channelId : channelIds) {
228                     requestUpdates(List.of(channelId));
229                 }
230             } else if (!channelIds.isEmpty()) {
231                 final Integer[] ids = channelIds.toArray(new Integer[channelIds.size()]);
232
233                 logger.debug("adding INFO command for channel id {} to queue...", Arrays.toString(ids));
234                 cmdQueue.add(new DelayedCommand(CommandType.INFO, 0, Command.FAST_INFO_PRIORITY, ids));
235             }
236         }
237
238         void executeCommand(CommandType command, List<Integer> channelIds) {
239             // this is a workaround for a bug in the stick firmware that does not
240             // handle commands that are sent to multiple channels correctly
241             if (channelIds.size() > 1) {
242                 for (int channelId : channelIds) {
243                     executeCommand(command, List.of(channelId));
244                 }
245             } else if (!channelIds.isEmpty()) {
246                 final Integer[] ids = channelIds.toArray(new Integer[channelIds.size()]);
247
248                 logger.debug("adding command {} for channel ids {} to queue...", command, Arrays.toString(ids));
249                 cmdQueue.add(new Command(command, ids));
250             }
251         }
252
253         @Override
254         public void run() {
255             try {
256                 queryChannels();
257                 doWork();
258             } catch (Throwable t) {
259                 logger.error("Worker stopped by unexpected exception", t);
260             } finally {
261                 connection.close();
262             }
263         }
264
265         private void doWork() {
266             // list of due commands sorted by priority
267             final DueCommandSet dueCommands = new DueCommandSet();
268
269             logger.debug("worker started.");
270             while (!terminated.get()) {
271                 waitConnected();
272
273                 try {
274                     // in case we have no commands that are currently due, wait for a new one
275                     if (dueCommands.isEmpty()) {
276                         logger.trace("No due commands, invoking take on queue...");
277                         dueCommands.add(cmdQueue.take());
278                         logger.trace("take returned {}", dueCommands.first());
279                     }
280
281                     if (!terminated.get()) {
282                         // take all commands that are due from the queue
283                         logger.trace("Draining all available commands...");
284                         Command cmd;
285                         int drainCount = 0;
286                         while ((cmd = cmdQueue.poll()) != null) {
287                             drainCount++;
288                             dueCommands.remove(cmd);
289                             dueCommands.add(cmd);
290                         }
291                         logger.trace("Drained {} commands, active queue size is {}, queue size is {}", drainCount,
292                                 dueCommands.size(), cmdQueue.size());
293
294                         // process the command with the highest priority
295                         cmd = dueCommands.first();
296                         logger.debug("active command is {}", cmd);
297
298                         if (cmd.getCommandType() != CommandType.NONE) {
299                             Response response = connection.sendPacket(CommandUtil.createPacket(cmd));
300                             // remove the command now we know it has been correctly processed
301                             dueCommands.pollFirst();
302
303                             if (response != null && response.hasStatus()) {
304                                 for (int id : response.getChannelIds()) {
305                                     notifyListeners(id, response.getStatus());
306                                 }
307                             }
308
309                             if (cmd instanceof TimedCommand timedCommand) {
310                                 long delay = 1000 * timedCommand.getDuration();
311                                 logger.debug("adding timed command STOP for channel ids {} to queue with delay {}...",
312                                         cmd.getChannelIds(), delay);
313
314                                 cmdQueue.add(new DelayedCommand(CommandType.STOP, delay, Command.TIMED_PRIORITY,
315                                         cmd.getChannelIds()));
316                             } else if (response != null && response.isMoving()) {
317                                 logger.debug("adding timed command INFO for channel ids {} to queue with delay 2000...",
318                                         cmd.getChannelIds());
319
320                                 cmdQueue.add(new DelayedCommand(CommandType.INFO, 2000, Command.FAST_INFO_PRIORITY,
321                                         cmd.getChannelIds()));
322                             } else if (cmd.getCommandType() == CommandType.INFO) {
323                                 logger.debug("adding timed command INFO for channel ids {} to queue with delay {}...",
324                                         cmd.getChannelIds(), updateInterval * 1000);
325
326                                 cmdQueue.add(new DelayedCommand(CommandType.INFO, updateInterval * 1000,
327                                         Command.INFO_PRIORITY, cmd.getChannelIds()));
328                             }
329                         } else {
330                             logger.trace("ignoring NONE command.");
331                         }
332                     }
333                 } catch (InterruptedException e) {
334                     logger.error("Got interrupt while waiting for next command time", e);
335                     Thread.currentThread().interrupt();
336                 } catch (IOException e) {
337                     logger.error("Got IOException communicating with the stick", e);
338                     listener.connectionDropped(e);
339                     connection.close();
340                 }
341             }
342             logger.debug("worker finished.");
343         }
344
345         private void queryChannels() {
346             logger.debug("querying available channels...");
347             while (!terminated.get()) {
348                 waitConnected();
349
350                 try {
351                     Response r = null;
352                     while (r == null && !terminated.get() && connection.isOpen()) {
353                         logger.debug("sending CHECK packet...");
354                         r = connection.sendPacket(CommandUtil.createPacket(CommandType.CHECK));
355
356                         if (r == null) {
357                             Thread.sleep(2000);
358                         }
359                     }
360
361                     if (r != null) {
362                         int[] knownIds = r.getChannelIds();
363                         logger.debug("Worker found channels: {} ", Arrays.toString(knownIds));
364
365                         for (int id : knownIds) {
366                             if (!validIds.contains(id)) {
367                                 validIds.add(id);
368                             }
369                         }
370
371                         requestUpdates(validIds);
372                         break;
373                     }
374                 } catch (IOException e) {
375                     logger.error("Got IOException communicating with the stick", e);
376                     listener.connectionDropped(e);
377                     connection.close();
378                 } catch (InterruptedException e) {
379                     logger.error("Got interrupt while waiting for next command time", e);
380                     Thread.currentThread().interrupt();
381                 }
382             }
383         }
384
385         private void waitConnected() {
386             if (!connection.isOpen()) {
387                 while (!connection.isOpen() && !terminated.get()) {
388                     try {
389                         connection.open();
390                         listener.connectionEstablished();
391                     } catch (ConnectException e1) {
392                         listener.connectionDropped(e1);
393                     }
394
395                     if (!connection.isOpen() && !terminated.get()) {
396                         try {
397                             Thread.sleep(2000);
398                         } catch (InterruptedException e) {
399                             logger.error("Got interrupt while waiting for next command time", e);
400                             Thread.currentThread().interrupt();
401                         }
402                     }
403                 }
404             }
405
406             logger.trace("finished waiting. connection open={}, terminated={}", connection.isOpen(), terminated.get());
407         }
408     }
409
410     public interface StickListener {
411         void connectionEstablished();
412
413         void connectionDropped(Exception e);
414     }
415 }