]> git.basschouten.com Git - openhab-addons.git/blob
2e7448c93b0c0ea1196f32f4b3bcd16736f07dab
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.anthem.internal.handler;
14
15 import static org.openhab.binding.anthem.internal.AnthemBindingConstants.*;
16
17 import java.io.BufferedReader;
18 import java.io.BufferedWriter;
19 import java.io.IOException;
20 import java.io.InputStreamReader;
21 import java.io.InterruptedIOException;
22 import java.io.OutputStreamWriter;
23 import java.net.Socket;
24 import java.net.UnknownHostException;
25 import java.nio.charset.StandardCharsets;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.ScheduledFuture;
30 import java.util.concurrent.TimeUnit;
31
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.binding.anthem.internal.AnthemConfiguration;
35 import org.openhab.core.library.types.DecimalType;
36 import org.openhab.core.library.types.IncreaseDecreaseType;
37 import org.openhab.core.library.types.OnOffType;
38 import org.openhab.core.library.types.StringType;
39 import org.openhab.core.thing.ChannelUID;
40 import org.openhab.core.thing.Thing;
41 import org.openhab.core.thing.ThingStatus;
42 import org.openhab.core.thing.ThingStatusDetail;
43 import org.openhab.core.thing.binding.BaseThingHandler;
44 import org.openhab.core.types.Command;
45 import org.openhab.core.types.State;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * The {@link AnthemHandler} is responsible for handling commands, which are
51  * sent to one of the channels. It also manages the connection to the AV processor.
52  * The reader thread receives solicited and unsolicited commands from the processor.
53  * The sender thread is used to send commands to the processor.
54  *
55  * @author Mark Hilbush - Initial contribution
56  */
57 @NonNullByDefault
58 public class AnthemHandler extends BaseThingHandler {
59     private Logger logger = LoggerFactory.getLogger(AnthemHandler.class);
60
61     private static final long POLLING_INTERVAL_SECONDS = 900L;
62     private static final long POLLING_DELAY_SECONDS = 10L;
63
64     private @Nullable Socket socket;
65     private @Nullable BufferedWriter writer;
66     private @Nullable BufferedReader reader;
67
68     private AnthemCommandParser commandParser;
69
70     private final BlockingQueue<AnthemCommand> sendQueue = new LinkedBlockingQueue<>();
71
72     private @Nullable Future<?> asyncInitializeTask;
73     private @Nullable ScheduledFuture<?> connectRetryJob;
74     private @Nullable ScheduledFuture<?> pollingJob;
75
76     private @Nullable Thread senderThread;
77     private @Nullable Thread readerThread;
78
79     private int reconnectIntervalMinutes;
80     private int commandDelayMsec;
81
82     private boolean zone1PreviousPowerState;
83     private boolean zone2PreviousPowerState;
84
85     public AnthemHandler(Thing thing) {
86         super(thing);
87         commandParser = new AnthemCommandParser();
88     }
89
90     @Override
91     public void initialize() {
92         AnthemConfiguration configuration = getConfig().as(AnthemConfiguration.class);
93         logger.debug("AnthemHandler: Configuration of thing {} is {}", thing.getUID().getId(), configuration);
94
95         if (!configuration.isValid()) {
96             logger.debug("AnthemHandler: Config of thing '{}' is invalid", thing.getUID().getId());
97             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
98                     "@text/thing-status-detail-invalidconfig");
99             return;
100         }
101         reconnectIntervalMinutes = configuration.reconnectIntervalMinutes;
102         commandDelayMsec = configuration.commandDelayMsec;
103         updateStatus(ThingStatus.UNKNOWN, ThingStatusDetail.NONE, "@text/thing-status-detail-connecting");
104         asyncInitializeTask = scheduler.submit(this::connect);
105     }
106
107     @Override
108     public void dispose() {
109         Future<?> localAsyncInitializeTask = this.asyncInitializeTask;
110         if (localAsyncInitializeTask != null) {
111             localAsyncInitializeTask.cancel(true);
112             this.asyncInitializeTask = null;
113         }
114         disconnect();
115     }
116
117     @Override
118     public void handleCommand(ChannelUID channelUID, Command command) {
119         logger.trace("Command {} received for channel {}", command, channelUID.getId().toString());
120         String groupId = channelUID.getGroupId();
121         if (groupId == null) {
122             return;
123         }
124
125         if (CHANNEL_GROUP_GENERAL.equals(groupId)) {
126             handleGeneralCommand(channelUID, command);
127         } else {
128             handleZoneCommand(groupId, channelUID, command);
129         }
130     }
131
132     private void handleGeneralCommand(ChannelUID channelUID, Command command) {
133         switch (channelUID.getIdWithoutGroup()) {
134             case CHANNEL_COMMAND:
135                 if (command instanceof StringType) {
136                     sendCommand(AnthemCommand.customCommand(command.toString()));
137                 }
138                 break;
139             default:
140                 logger.debug("Received general command '{}' for unhandled channel '{}'", command, channelUID.getId());
141                 break;
142         }
143     }
144
145     private void handleZoneCommand(String groupId, ChannelUID channelUID, Command command) {
146         Zone zone = Zone.fromValue(groupId);
147
148         switch (channelUID.getIdWithoutGroup()) {
149             case CHANNEL_POWER:
150                 if (command instanceof OnOffType) {
151                     if (command == OnOffType.ON) {
152                         // Power on the device
153                         sendCommand(AnthemCommand.powerOn(zone));
154                     } else if (command == OnOffType.OFF) {
155                         sendCommand(AnthemCommand.powerOff(zone));
156                     }
157                 }
158                 break;
159             case CHANNEL_VOLUME:
160                 if (command instanceof OnOffType || command instanceof IncreaseDecreaseType) {
161                     if (command == OnOffType.ON || command == IncreaseDecreaseType.INCREASE) {
162                         sendCommand(AnthemCommand.volumeUp(zone, 1));
163                     } else if (command == OnOffType.OFF || command == IncreaseDecreaseType.DECREASE) {
164                         sendCommand(AnthemCommand.volumeDown(zone, 1));
165                     }
166                 }
167                 break;
168             case CHANNEL_VOLUME_DB:
169                 if (command instanceof DecimalType) {
170                     sendCommand(AnthemCommand.volume(zone, ((DecimalType) command).intValue()));
171                 }
172                 break;
173             case CHANNEL_MUTE:
174                 if (command instanceof OnOffType) {
175                     if (command == OnOffType.ON) {
176                         sendCommand(AnthemCommand.muteOn(zone));
177                     } else if (command == OnOffType.OFF) {
178                         sendCommand(AnthemCommand.muteOff(zone));
179                     }
180                 }
181                 break;
182             case CHANNEL_ACTIVE_INPUT:
183                 if (command instanceof DecimalType) {
184                     sendCommand(AnthemCommand.activeInput(zone, ((DecimalType) command).intValue()));
185                 }
186                 break;
187             default:
188                 logger.debug("Received zone command '{}' for unhandled channel '{}'", command, channelUID.getId());
189                 break;
190         }
191     }
192
193     private void queryAdditionalInformation(Zone zone) {
194         // Request information about the device
195         sendCommand(AnthemCommand.queryNumAvailableInputs());
196         sendCommand(AnthemCommand.queryModel());
197         sendCommand(AnthemCommand.queryRegion());
198         sendCommand(AnthemCommand.querySoftwareVersion());
199         sendCommand(AnthemCommand.querySoftwareBuildDate());
200         sendCommand(AnthemCommand.queryHardwareVersion());
201         sendCommand(AnthemCommand.queryMacAddress());
202         sendCommand(AnthemCommand.queryVolume(zone));
203         sendCommand(AnthemCommand.queryMute(zone));
204         // Give some time for the input names to populate before requesting the active input
205         scheduler.schedule(() -> queryActiveInput(zone), 5L, TimeUnit.SECONDS);
206     }
207
208     private void queryActiveInput(Zone zone) {
209         sendCommand(AnthemCommand.queryActiveInput(zone));
210     }
211
212     private void sendCommand(AnthemCommand command) {
213         logger.debug("Adding command to queue: {}", command);
214         sendQueue.add(command);
215     }
216
217     private synchronized void connect() {
218         try {
219             AnthemConfiguration configuration = getConfig().as(AnthemConfiguration.class);
220             logger.debug("Opening connection to Anthem host {} on port {}", configuration.host, configuration.port);
221             Socket socket = new Socket(configuration.host, configuration.port);
222             writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.ISO_8859_1));
223             reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.ISO_8859_1));
224             this.socket = socket;
225         } catch (UnknownHostException e) {
226             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
227                     "@text/thing-status-detail-unknownhost");
228             return;
229         } catch (IllegalArgumentException e) {
230             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
231                     "@text/thing-status-detail-invalidport");
232             return;
233         } catch (InterruptedIOException e) {
234             logger.debug("Interrupted while establishing Anthem connection");
235             Thread.currentThread().interrupt();
236             return;
237         } catch (IOException e) {
238             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
239                     "@text/thing-status-detail-openerror");
240             logger.debug("Error opening Anthem connection: {}", e.getMessage());
241             disconnect();
242             scheduleConnectRetry(reconnectIntervalMinutes);
243             return;
244         }
245         Thread localReaderThread = new Thread(this::readerThreadJob, "Anthem reader");
246         localReaderThread.setDaemon(true);
247         localReaderThread.start();
248         this.readerThread = localReaderThread;
249
250         Thread localSenderThread = new Thread(this::senderThreadJob, "Anthem sender");
251         localSenderThread.setDaemon(true);
252         localSenderThread.start();
253         this.senderThread = localSenderThread;
254
255         updateStatus(ThingStatus.ONLINE);
256
257         ScheduledFuture<?> localPollingJob = this.pollingJob;
258         if (localPollingJob == null) {
259             this.pollingJob = scheduler.scheduleWithFixedDelay(this::poll, POLLING_DELAY_SECONDS,
260                     POLLING_INTERVAL_SECONDS, TimeUnit.SECONDS);
261         }
262     }
263
264     private void poll() {
265         logger.debug("Polling...");
266         sendCommand(AnthemCommand.queryPower(Zone.MAIN));
267         sendCommand(AnthemCommand.queryPower(Zone.ZONE2));
268     }
269
270     private void scheduleConnectRetry(long waitMinutes) {
271         logger.debug("Scheduling connection retry in {} minutes", waitMinutes);
272         connectRetryJob = scheduler.schedule(this::connect, waitMinutes, TimeUnit.MINUTES);
273     }
274
275     private synchronized void disconnect() {
276         logger.debug("Disconnecting from Anthem");
277
278         ScheduledFuture<?> localPollingJob = this.pollingJob;
279         if (localPollingJob != null) {
280             localPollingJob.cancel(true);
281             this.pollingJob = null;
282         }
283
284         ScheduledFuture<?> localConnectRetryJob = this.connectRetryJob;
285         if (localConnectRetryJob != null) {
286             localConnectRetryJob.cancel(true);
287             this.connectRetryJob = null;
288         }
289
290         Thread localSenderThread = this.senderThread;
291         if (localSenderThread != null && localSenderThread.isAlive()) {
292             localSenderThread.interrupt();
293         }
294
295         Thread localReaderThread = this.readerThread;
296         if (localReaderThread != null && localReaderThread.isAlive()) {
297             localReaderThread.interrupt();
298         }
299         Socket localSocket = this.socket;
300         if (localSocket != null) {
301             try {
302                 localSocket.close();
303             } catch (IOException e) {
304                 logger.debug("Error closing socket: {}", e.getMessage());
305             }
306             this.socket = null;
307         }
308         BufferedReader localReader = this.reader;
309         if (localReader != null) {
310             try {
311                 localReader.close();
312             } catch (IOException e) {
313                 logger.debug("Error closing reader: {}", e.getMessage());
314             }
315             this.reader = null;
316         }
317         BufferedWriter localWriter = this.writer;
318         if (localWriter != null) {
319             try {
320                 localWriter.close();
321             } catch (IOException e) {
322                 logger.debug("Error closing writer: {}", e.getMessage());
323             }
324             this.writer = null;
325         }
326     }
327
328     private synchronized void reconnect() {
329         logger.debug("Attempting to reconnect to the Anthem");
330         disconnect();
331         connect();
332     }
333
334     private void senderThreadJob() {
335         logger.debug("Sender thread started");
336         try {
337             while (!Thread.currentThread().isInterrupted() && writer != null) {
338                 AnthemCommand command = sendQueue.take();
339                 logger.debug("Sender thread writing command: {}", command);
340                 try {
341                     BufferedWriter localWriter = this.writer;
342                     if (localWriter != null) {
343                         localWriter.write(command.toString());
344                         localWriter.flush();
345                     }
346                 } catch (InterruptedIOException e) {
347                     logger.debug("Interrupted while sending command");
348                     updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
349                             "@text/thing-status-detail-interrupted");
350                     break;
351                 } catch (IOException e) {
352                     logger.debug("Communication error, will try to reconnect. Error: {}", e.getMessage());
353                     updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
354                     // Requeue the command and try to reconnect
355                     sendQueue.add(command);
356                     reconnect();
357                     break;
358                 }
359                 // Introduce delay to throttle the send rate
360                 if (commandDelayMsec > 0) {
361                     Thread.sleep(commandDelayMsec);
362                 }
363             }
364         } catch (InterruptedException e) {
365             Thread.currentThread().interrupt();
366         } finally {
367             logger.debug("Sender thread exiting");
368         }
369     }
370
371     private void readerThreadJob() {
372         logger.debug("Reader thread started");
373         StringBuffer sbReader = new StringBuffer();
374         try {
375             char c;
376             String command;
377             BufferedReader localReader = this.reader;
378             while (!Thread.interrupted() && localReader != null) {
379                 c = (char) localReader.read();
380                 sbReader.append(c);
381                 if (c == COMMAND_TERMINATION_CHAR) {
382                     command = sbReader.toString();
383                     logger.debug("Reader thread sending command to parser: {}", command);
384                     AnthemUpdate update = commandParser.parseCommand(command);
385                     if (update != null) {
386                         processUpdate(update);
387                     }
388                     sbReader.setLength(0);
389                 }
390             }
391         } catch (InterruptedIOException e) {
392             logger.debug("Interrupted while reading");
393             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
394                     "@text/thing-status-detail-interrupted");
395         } catch (IOException e) {
396             logger.debug("I/O error while reading from socket: {}", e.getMessage());
397             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
398                     "@text/thing-status-detail-ioexception");
399         } finally {
400             logger.debug("Reader thread exiting");
401         }
402     }
403
404     private void processUpdate(AnthemUpdate update) {
405         // State update
406         if (update.isStateUpdate()) {
407             StateUpdate stateUpdate = update.getStateUpdate();
408             updateState(stateUpdate.getGroupId() + ChannelUID.CHANNEL_GROUP_SEPARATOR + stateUpdate.getChannelId(),
409                     stateUpdate.getState());
410             postProcess(stateUpdate);
411         }
412         // Property update
413         else if (update.isPropertyUpdate()) {
414             PropertyUpdate propertyUpdate = update.getPropertyUpdate();
415             updateProperty(propertyUpdate.getName(), propertyUpdate.getValue());
416             postProcess(propertyUpdate);
417         }
418     }
419
420     private void postProcess(StateUpdate stateUpdate) {
421         switch (stateUpdate.getChannelId()) {
422             case CHANNEL_POWER:
423                 checkPowerStatusChange(stateUpdate);
424                 break;
425             case CHANNEL_ACTIVE_INPUT:
426                 updateInputNameChannels(stateUpdate);
427                 break;
428         }
429     }
430
431     private void checkPowerStatusChange(StateUpdate stateUpdate) {
432         String zone = stateUpdate.getGroupId();
433         State power = stateUpdate.getState();
434         // Zone 1
435         if (Zone.MAIN.equals(Zone.fromValue(zone))) {
436             boolean newZone1PowerState = (power instanceof OnOffType && power == OnOffType.ON) ? true : false;
437             if (!zone1PreviousPowerState && newZone1PowerState) {
438                 // Power turned on for main zone.
439                 // This will cause the main zone channel states to be updated
440                 scheduler.submit(() -> queryAdditionalInformation(Zone.MAIN));
441             }
442             zone1PreviousPowerState = newZone1PowerState;
443         }
444         // Zone 2
445         else if (Zone.ZONE2.equals(Zone.fromValue(zone))) {
446             boolean newZone2PowerState = (power instanceof OnOffType && power == OnOffType.ON) ? true : false;
447             if (!zone2PreviousPowerState && newZone2PowerState) {
448                 // Power turned on for zone 2.
449                 // This will cause zone 2 channel states to be updated
450                 scheduler.submit(() -> queryAdditionalInformation(Zone.ZONE2));
451             }
452             zone2PreviousPowerState = newZone2PowerState;
453         }
454     }
455
456     private void updateInputNameChannels(StateUpdate stateUpdate) {
457         State state = stateUpdate.getState();
458         String groupId = stateUpdate.getGroupId();
459         if (state instanceof StringType) {
460             updateState(groupId + ChannelUID.CHANNEL_GROUP_SEPARATOR + CHANNEL_ACTIVE_INPUT_SHORT_NAME,
461                     new StringType(commandParser.getInputShortName(state.toString())));
462             updateState(groupId + ChannelUID.CHANNEL_GROUP_SEPARATOR + CHANNEL_ACTIVE_INPUT_LONG_NAME,
463                     new StringType(commandParser.getInputLongName(state.toString())));
464         }
465     }
466
467     private void postProcess(PropertyUpdate propertyUpdate) {
468         switch (propertyUpdate.getName()) {
469             case PROPERTY_NUM_AVAILABLE_INPUTS:
470                 queryAllInputNames(propertyUpdate);
471                 break;
472         }
473     }
474
475     private void queryAllInputNames(PropertyUpdate propertyUpdate) {
476         try {
477             int numInputs = Integer.parseInt(propertyUpdate.getValue());
478             for (int input = 1; input <= numInputs; input++) {
479                 sendCommand(AnthemCommand.queryInputShortName(input));
480                 sendCommand(AnthemCommand.queryInputLongName(input));
481             }
482         } catch (NumberFormatException e) {
483             logger.debug("Unable to convert property '{}' to integer: {}", propertyUpdate.getName(),
484                     propertyUpdate.getValue());
485         }
486     }
487 }