]> git.basschouten.com Git - openhab-addons.git/blob
35ca545793f6ca1e470b7905aee698b675b4408e
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.elerotransmitterstick.internal.stick;
14
15 import java.io.IOException;
16 import java.util.ArrayList;
17 import java.util.Arrays;
18 import java.util.Collection;
19 import java.util.Collections;
20 import java.util.Comparator;
21 import java.util.HashMap;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.TreeSet;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.DelayQueue;
27 import java.util.concurrent.ScheduledExecutorService;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
30
31 import org.openhab.binding.elerotransmitterstick.internal.config.EleroTransmitterStickConfig;
32 import org.openhab.binding.elerotransmitterstick.internal.handler.StatusListener;
33 import org.openhab.core.io.transport.serial.SerialPortManager;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * @author Volker Bier - Initial contribution
39  */
40 public class TransmitterStick {
41     private final Logger logger = LoggerFactory.getLogger(TransmitterStick.class);
42     private final HashMap<Integer, ArrayList<StatusListener>> allListeners = new HashMap<>();
43     private final StickListener listener;
44
45     private EleroTransmitterStickConfig config;
46     private SerialPortManager serialPortManager;
47     private CommandWorker worker;
48
49     public TransmitterStick(StickListener l) {
50         listener = l;
51     }
52
53     public synchronized void initialize(EleroTransmitterStickConfig stickConfig, ScheduledExecutorService scheduler,
54             SerialPortManager serialPortManager) {
55         logger.debug("Initializing Transmitter Stick...");
56         config = stickConfig;
57         this.serialPortManager = serialPortManager;
58         worker = new CommandWorker();
59         scheduler.schedule(worker, 0, TimeUnit.MILLISECONDS);
60         logger.debug("Transmitter Stick initialized, worker running.");
61     }
62
63     public synchronized void dispose() {
64         logger.debug("Disposing Transmitter Stick...");
65         worker.terminateUpdates();
66         worker = null;
67         config = null;
68         logger.debug("Transmitter Stick disposed.");
69     }
70
71     public synchronized ArrayList<Integer> getKnownIds() {
72         if (worker != null) {
73             return worker.validIds;
74         }
75
76         return new ArrayList<>();
77     }
78
79     public synchronized void sendCommand(CommandType cmd, List<Integer> channelIds) {
80         if (worker != null) {
81             worker.executeCommand(cmd, channelIds);
82         }
83     }
84
85     public synchronized void requestUpdate(List<Integer> channelIds) {
86         if (worker != null) {
87             worker.requestUpdates(channelIds);
88         }
89     }
90
91     public void addStatusListener(int channelId, StatusListener listener) {
92         synchronized (allListeners) {
93             ArrayList<StatusListener> listeners = allListeners.get(channelId);
94             if (listeners == null) {
95                 listeners = new ArrayList<>();
96                 allListeners.put(channelId, listeners);
97             }
98             listeners.add(listener);
99         }
100     }
101
102     public void removeStatusListener(int channelId, StatusListener listener) {
103         synchronized (allListeners) {
104             ArrayList<StatusListener> listeners = allListeners.get(channelId);
105             if (listeners != null) {
106                 listeners.remove(listener);
107
108                 if (listeners.isEmpty()) {
109                     allListeners.remove(channelId);
110                 }
111             }
112         }
113     }
114
115     private void notifyListeners(int channelId, ResponseStatus status) {
116         synchronized (allListeners) {
117             ArrayList<StatusListener> listeners = allListeners.get(channelId);
118             if (listeners != null) {
119                 for (StatusListener l : listeners) {
120                     l.statusChanged(channelId, status);
121                 }
122             }
123         }
124     }
125
126     /**
127      * Make sure we have
128      * - only one INFO for the same channel ids
129      * - only one other command for the same channel ids
130      */
131     private static boolean prepareAddition(Command newCmd, Collection<Command> coll) {
132         Iterator<Command> queuedCommands = coll.iterator();
133         while (queuedCommands.hasNext()) {
134             Command existingCmd = queuedCommands.next();
135
136             if (Arrays.equals(newCmd.getChannelIds(), existingCmd.getChannelIds())) {
137                 // remove pending INFOs for same channel ids
138                 if (newCmd.getCommandType() == CommandType.INFO && existingCmd.getCommandType() == CommandType.INFO) {
139                     if (existingCmd.getPriority() < newCmd.priority) {
140                         // we have an older INFO command with same or lower priority, remove
141                         queuedCommands.remove();
142                     } else {
143                         // existing has higher priority, skip addition
144                         return false;
145                     }
146                 }
147
148                 if (newCmd.getCommandType() != CommandType.INFO && existingCmd.getCommandType() != CommandType.INFO) {
149                     // we have an older command for the same channels, remove
150                     queuedCommands.remove();
151                 }
152             }
153         }
154
155         return true;
156     }
157
158     static class DueCommandSet extends TreeSet<Command> {
159         private static final long serialVersionUID = -3216360253151368826L;
160
161         public DueCommandSet() {
162             super(new Comparator<>() {
163                 /**
164                  * Due commands are sorted by priority first and then by delay.
165                  */
166                 @Override
167                 public int compare(Command o1, Command o2) {
168                     if (o1.equals(o2)) {
169                         return 0;
170                     }
171
172                     int d = o2.getPriority() - o1.getPriority();
173                     if (d < 0) {
174                         return -1;
175                     }
176
177                     if (d == 0 && o1.getDelay(TimeUnit.MILLISECONDS) < o2.getDelay(TimeUnit.MILLISECONDS)) {
178                         return -1;
179                     }
180                     return 1;
181                 }
182             });
183         }
184
185         @Override
186         public boolean add(Command e) {
187             if (TransmitterStick.prepareAddition(e, this)) {
188                 return super.add(e);
189             }
190
191             return false;
192         }
193     }
194
195     class CommandWorker implements Runnable {
196         private ArrayList<Integer> validIds = new ArrayList<>();
197         private final AtomicBoolean terminated = new AtomicBoolean();
198         private final int updateInterval;
199         private final SerialConnection connection;
200
201         private final BlockingQueue<Command> cmdQueue = new DelayQueue<Command>() {
202             @Override
203             public boolean add(Command e) {
204                 if (TransmitterStick.prepareAddition(e, this)) {
205                     return super.add(e);
206                 }
207
208                 return false;
209             }
210         };
211
212         CommandWorker() {
213             connection = new SerialConnection(config.portName, serialPortManager);
214             updateInterval = config.updateInterval;
215         }
216
217         void terminateUpdates() {
218             terminated.set(true);
219
220             // add a NONE command to make the thread exit from the call to take()
221             cmdQueue.add(new Command(CommandType.NONE));
222         }
223
224         void requestUpdates(List<Integer> channelIds) {
225             // this is a workaround for a bug in the stick firmware that does not
226             // handle commands that are sent to multiple channels correctly
227             if (channelIds.size() > 1) {
228                 for (int channelId : channelIds) {
229                     requestUpdates(Collections.singletonList(channelId));
230                 }
231             } else if (!channelIds.isEmpty()) {
232                 final Integer[] ids = channelIds.toArray(new Integer[channelIds.size()]);
233
234                 logger.debug("adding INFO command for channel id {} to queue...", Arrays.toString(ids));
235                 cmdQueue.add(new DelayedCommand(CommandType.INFO, 0, Command.FAST_INFO_PRIORITY, ids));
236             }
237         }
238
239         void executeCommand(CommandType command, List<Integer> channelIds) {
240             // this is a workaround for a bug in the stick firmware that does not
241             // handle commands that are sent to multiple channels correctly
242             if (channelIds.size() > 1) {
243                 for (int channelId : channelIds) {
244                     executeCommand(command, Collections.singletonList(channelId));
245                 }
246             } else if (!channelIds.isEmpty()) {
247                 final Integer[] ids = channelIds.toArray(new Integer[channelIds.size()]);
248
249                 logger.debug("adding command {} for channel ids {} to queue...", command, Arrays.toString(ids));
250                 cmdQueue.add(new Command(command, ids));
251             }
252         }
253
254         @Override
255         public void run() {
256             try {
257                 queryChannels();
258                 doWork();
259             } catch (Throwable t) {
260                 logger.error("Worker stopped by unexpected exception", t);
261             } finally {
262                 connection.close();
263             }
264         }
265
266         private void doWork() {
267             // list of due commands sorted by priority
268             final DueCommandSet dueCommands = new DueCommandSet();
269
270             logger.debug("worker started.");
271             while (!terminated.get()) {
272                 waitConnected();
273
274                 try {
275                     // in case we have no commands that are currently due, wait for a new one
276                     if (dueCommands.isEmpty()) {
277                         logger.trace("No due commands, invoking take on queue...");
278                         dueCommands.add(cmdQueue.take());
279                         logger.trace("take returned {}", dueCommands.first());
280                     }
281
282                     if (!terminated.get()) {
283                         // take all commands that are due from the queue
284                         logger.trace("Draining all available commands...");
285                         Command cmd;
286                         int drainCount = 0;
287                         while ((cmd = cmdQueue.poll()) != null) {
288                             drainCount++;
289                             dueCommands.remove(cmd);
290                             dueCommands.add(cmd);
291                         }
292                         logger.trace("Drained {} commands, active queue size is {}, queue size is {}", drainCount,
293                                 dueCommands.size(), cmdQueue.size());
294
295                         // process the command with the highest priority
296                         cmd = dueCommands.first();
297                         logger.debug("active command is {}", cmd);
298
299                         if (cmd.getCommandType() != CommandType.NONE) {
300                             Response response = connection.sendPacket(CommandUtil.createPacket(cmd));
301                             // remove the command now we know it has been correctly processed
302                             dueCommands.pollFirst();
303
304                             if (response != null && response.hasStatus()) {
305                                 for (int id : response.getChannelIds()) {
306                                     notifyListeners(id, response.getStatus());
307                                 }
308                             }
309
310                             if (cmd instanceof TimedCommand) {
311                                 long delay = 1000 * ((TimedCommand) cmd).getDuration();
312                                 logger.debug("adding timed command STOP for channel ids {} to queue with delay {}...",
313                                         cmd.getChannelIds(), delay);
314
315                                 cmdQueue.add(new DelayedCommand(CommandType.STOP, delay, Command.TIMED_PRIORITY,
316                                         cmd.getChannelIds()));
317                             } else if (response != null && response.isMoving()) {
318                                 logger.debug("adding timed command INFO for channel ids {} to queue with delay 2000...",
319                                         cmd.getChannelIds());
320
321                                 cmdQueue.add(new DelayedCommand(CommandType.INFO, 2000, Command.FAST_INFO_PRIORITY,
322                                         cmd.getChannelIds()));
323                             } else if (cmd.getCommandType() == CommandType.INFO) {
324                                 logger.debug("adding timed command INFO for channel ids {} to queue with delay {}...",
325                                         cmd.getChannelIds(), updateInterval * 1000);
326
327                                 cmdQueue.add(new DelayedCommand(CommandType.INFO, updateInterval * 1000,
328                                         Command.INFO_PRIORITY, cmd.getChannelIds()));
329                             }
330                         } else {
331                             logger.trace("ignoring NONE command.");
332                         }
333                     }
334                 } catch (InterruptedException e) {
335                     logger.error("Got interrupt while waiting for next command time", e);
336                     Thread.currentThread().interrupt();
337                 } catch (IOException e) {
338                     logger.error("Got IOException communicating with the stick", e);
339                     listener.connectionDropped(e);
340                     connection.close();
341                 }
342             }
343             logger.debug("worker finished.");
344         }
345
346         private void queryChannels() {
347             logger.debug("querying available channels...");
348             while (!terminated.get()) {
349                 waitConnected();
350
351                 try {
352                     Response r = null;
353                     while (r == null && !terminated.get() && connection.isOpen()) {
354                         logger.debug("sending CHECK packet...");
355                         r = connection.sendPacket(CommandUtil.createPacket(CommandType.CHECK));
356
357                         if (r == null) {
358                             Thread.sleep(2000);
359                         }
360                     }
361
362                     if (r != null) {
363                         int[] knownIds = r.getChannelIds();
364                         logger.debug("Worker found channels: {} ", Arrays.toString(knownIds));
365
366                         for (int id : knownIds) {
367                             if (!validIds.contains(id)) {
368                                 validIds.add(id);
369                             }
370                         }
371
372                         requestUpdates(validIds);
373                         break;
374                     }
375                 } catch (IOException e) {
376                     logger.error("Got IOException communicating with the stick", e);
377                     listener.connectionDropped(e);
378                     connection.close();
379                 } catch (InterruptedException e) {
380                     logger.error("Got interrupt while waiting for next command time", e);
381                     Thread.currentThread().interrupt();
382                 }
383             }
384         }
385
386         private void waitConnected() {
387             if (!connection.isOpen()) {
388                 while (!connection.isOpen() && !terminated.get()) {
389                     try {
390                         connection.open();
391                         listener.connectionEstablished();
392                     } catch (ConnectException e1) {
393                         listener.connectionDropped(e1);
394                     }
395
396                     if (!connection.isOpen() && !terminated.get()) {
397                         try {
398                             Thread.sleep(2000);
399                         } catch (InterruptedException e) {
400                             logger.error("Got interrupt while waiting for next command time", e);
401                             Thread.currentThread().interrupt();
402                         }
403                     }
404                 }
405             }
406
407             logger.trace("finished waiting. connection open={}, terminated={}", connection.isOpen(), terminated.get());
408         }
409     }
410
411     public interface StickListener {
412         void connectionEstablished();
413
414         void connectionDropped(Exception e);
415     }
416 }