]> git.basschouten.com Git - openhab-addons.git/blob
3f66d753e335c5445c854d85445ae4fae5775255
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.anthem.internal.handler;
14
15 import static org.openhab.binding.anthem.internal.AnthemBindingConstants.*;
16
17 import java.io.BufferedReader;
18 import java.io.BufferedWriter;
19 import java.io.IOException;
20 import java.io.InputStreamReader;
21 import java.io.InterruptedIOException;
22 import java.io.OutputStreamWriter;
23 import java.net.Socket;
24 import java.net.UnknownHostException;
25 import java.nio.charset.StandardCharsets;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.ScheduledFuture;
30 import java.util.concurrent.TimeUnit;
31
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.binding.anthem.internal.AnthemConfiguration;
35 import org.openhab.core.library.types.DecimalType;
36 import org.openhab.core.library.types.IncreaseDecreaseType;
37 import org.openhab.core.library.types.OnOffType;
38 import org.openhab.core.thing.ChannelUID;
39 import org.openhab.core.thing.Thing;
40 import org.openhab.core.thing.ThingStatus;
41 import org.openhab.core.thing.ThingStatusDetail;
42 import org.openhab.core.thing.binding.BaseThingHandler;
43 import org.openhab.core.types.Command;
44 import org.openhab.core.types.State;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 /**
49  * The {@link AnthemHandler} is responsible for handling commands, which are
50  * sent to one of the channels. It also manages the connection to the AV processor.
51  * The reader thread receives solicited and unsolicited commands from the processor.
52  * The sender thread is used to send commands to the processor.
53  *
54  * @author Mark Hilbush - Initial contribution
55  */
56 @NonNullByDefault
57 public class AnthemHandler extends BaseThingHandler {
58     private Logger logger = LoggerFactory.getLogger(AnthemHandler.class);
59
60     private static final long POLLING_INTERVAL_SECONDS = 900L;
61     private static final long POLLING_DELAY_SECONDS = 10L;
62
63     private @Nullable Socket socket;
64     private @Nullable BufferedWriter writer;
65     private @Nullable BufferedReader reader;
66
67     private AnthemCommandParser messageParser;
68
69     private final BlockingQueue<AnthemCommand> sendQueue = new LinkedBlockingQueue<>();
70
71     private @Nullable Future<?> asyncInitializeTask;
72     private @Nullable ScheduledFuture<?> connectRetryJob;
73     private @Nullable ScheduledFuture<?> pollingJob;
74
75     private @Nullable Thread senderThread;
76     private @Nullable Thread readerThread;
77
78     private int reconnectIntervalMinutes;
79     private int commandDelayMsec;
80
81     private boolean zone1PreviousPowerState;
82     private boolean zone2PreviousPowerState;
83
84     public AnthemHandler(Thing thing) {
85         super(thing);
86         messageParser = new AnthemCommandParser(this);
87     }
88
89     @Override
90     public void initialize() {
91         AnthemConfiguration configuration = getConfig().as(AnthemConfiguration.class);
92         logger.debug("AnthemHandler: Configuration of thing {} is {}", thing.getUID().getId(), configuration);
93
94         if (!configuration.isValid()) {
95             logger.debug("AnthemHandler: Config of thing '{}' is invalid", thing.getUID().getId());
96             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
97                     "@text/thing-status-detail-invalidconfig");
98             return;
99         }
100         reconnectIntervalMinutes = configuration.reconnectIntervalMinutes;
101         commandDelayMsec = configuration.commandDelayMsec;
102         updateStatus(ThingStatus.UNKNOWN, ThingStatusDetail.NONE, "@text/thing-status-detail-connecting");
103         asyncInitializeTask = scheduler.submit(this::connect);
104     }
105
106     @Override
107     public void dispose() {
108         Future<?> localAsyncInitializeTask = this.asyncInitializeTask;
109         if (localAsyncInitializeTask != null) {
110             localAsyncInitializeTask.cancel(true);
111             this.asyncInitializeTask = null;
112         }
113         disconnect();
114     }
115
116     @Override
117     public void handleCommand(ChannelUID channelUID, Command command) {
118         logger.trace("Command {} received for channel {}", command, channelUID.getId().toString());
119         String groupId = channelUID.getGroupId();
120         if (groupId == null) {
121             return;
122         }
123         Zone zone = Zone.fromValue(groupId);
124
125         switch (channelUID.getIdWithoutGroup()) {
126             case CHANNEL_POWER:
127                 if (command instanceof OnOffType) {
128                     if (command == OnOffType.ON) {
129                         // Power on the device
130                         sendCommand(AnthemCommand.powerOn(zone));
131                     } else if (command == OnOffType.OFF) {
132                         sendCommand(AnthemCommand.powerOff(zone));
133                     }
134                 }
135                 break;
136             case CHANNEL_VOLUME:
137                 if (command instanceof OnOffType || command instanceof IncreaseDecreaseType) {
138                     if (command == OnOffType.ON || command == IncreaseDecreaseType.INCREASE) {
139                         sendCommand(AnthemCommand.volumeUp(zone, 1));
140                     } else if (command == OnOffType.OFF || command == IncreaseDecreaseType.DECREASE) {
141                         sendCommand(AnthemCommand.volumeDown(zone, 1));
142                     }
143                 }
144                 break;
145             case CHANNEL_VOLUME_DB:
146                 if (command instanceof DecimalType) {
147                     sendCommand(AnthemCommand.volume(zone, ((DecimalType) command).intValue()));
148                 }
149                 break;
150             case CHANNEL_MUTE:
151                 if (command instanceof OnOffType) {
152                     if (command == OnOffType.ON) {
153                         sendCommand(AnthemCommand.muteOn(zone));
154                     } else if (command == OnOffType.OFF) {
155                         sendCommand(AnthemCommand.muteOff(zone));
156                     }
157                 }
158                 break;
159             case CHANNEL_ACTIVE_INPUT:
160                 if (command instanceof DecimalType) {
161                     sendCommand(AnthemCommand.activeInput(zone, ((DecimalType) command).intValue()));
162                 }
163                 break;
164             default:
165                 logger.debug("Received command '{}' for unhandled channel '{}'", command, channelUID.getId());
166                 break;
167         }
168     }
169
170     public void setModel(String model) {
171         updateProperty("Model", model);
172     }
173
174     public void setRegion(String region) {
175         updateProperty("Region", region);
176     }
177
178     public void setSoftwareVersion(String version) {
179         updateProperty("Software Version", version);
180     }
181
182     public void setSoftwareBuildDate(String date) {
183         updateProperty("Software Build Date", date);
184     }
185
186     public void setHardwareVersion(String version) {
187         updateProperty("Hardware Version", version);
188     }
189
190     public void setMacAddress(String mac) {
191         updateProperty("Mac Address", mac);
192     }
193
194     public void updateChannelState(String zone, String channelId, State state) {
195         updateState(zone + "#" + channelId, state);
196     }
197
198     public void checkPowerStatusChange(String zone, String power) {
199         // Zone 1
200         if (Zone.MAIN.equals(Zone.fromValue(zone))) {
201             boolean newZone1PowerState = "1".equals(power) ? true : false;
202             if (!zone1PreviousPowerState && newZone1PowerState) {
203                 // Power turned on for main zone.
204                 // This will cause the main zone channel states to be updated
205                 scheduler.submit(() -> queryAdditionalInformation(Zone.MAIN));
206             }
207             zone1PreviousPowerState = newZone1PowerState;
208         }
209         // Zone 2
210         else if (Zone.ZONE2.equals(Zone.fromValue(zone))) {
211             boolean newZone2PowerState = "1".equals(power) ? true : false;
212             if (!zone2PreviousPowerState && newZone2PowerState) {
213                 // Power turned on for zone 2.
214                 // This will cause zone 2 channel states to be updated
215                 scheduler.submit(() -> queryAdditionalInformation(Zone.ZONE2));
216             }
217             zone2PreviousPowerState = newZone2PowerState;
218         }
219     }
220
221     public void setNumAvailableInputs(int numInputs) {
222         // Request the names for all the inputs
223         for (int input = 1; input <= numInputs; input++) {
224             sendCommand(AnthemCommand.queryInputShortName(input));
225             sendCommand(AnthemCommand.queryInputLongName(input));
226         }
227         updateProperty("Number of Inputs", String.valueOf(numInputs));
228     }
229
230     private void queryAdditionalInformation(Zone zone) {
231         // Request information about the device
232         sendCommand(AnthemCommand.queryNumAvailableInputs());
233         sendCommand(AnthemCommand.queryModel());
234         sendCommand(AnthemCommand.queryRegion());
235         sendCommand(AnthemCommand.querySoftwareVersion());
236         sendCommand(AnthemCommand.querySoftwareBuildDate());
237         sendCommand(AnthemCommand.queryHardwareVersion());
238         sendCommand(AnthemCommand.queryMacAddress());
239         sendCommand(AnthemCommand.queryVolume(zone));
240         sendCommand(AnthemCommand.queryMute(zone));
241         // Give some time for the input names to populate before requesting the active input
242         scheduler.schedule(() -> queryActiveInput(zone), 5L, TimeUnit.SECONDS);
243     }
244
245     private void queryActiveInput(Zone zone) {
246         sendCommand(AnthemCommand.queryActiveInput(zone));
247     }
248
249     private void sendCommand(AnthemCommand command) {
250         logger.debug("Adding command to queue: {}", command);
251         sendQueue.add(command);
252     }
253
254     private synchronized void connect() {
255         try {
256             AnthemConfiguration configuration = getConfig().as(AnthemConfiguration.class);
257             logger.debug("Opening connection to Anthem host {} on port {}", configuration.host, configuration.port);
258             Socket socket = new Socket(configuration.host, configuration.port);
259             writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.ISO_8859_1));
260             reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.ISO_8859_1));
261             this.socket = socket;
262         } catch (UnknownHostException e) {
263             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
264                     "@text/thing-status-detail-unknownhost");
265             return;
266         } catch (IllegalArgumentException e) {
267             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
268                     "@text/thing-status-detail-invalidport");
269             return;
270         } catch (InterruptedIOException e) {
271             logger.debug("Interrupted while establishing Anthem connection");
272             Thread.currentThread().interrupt();
273             return;
274         } catch (IOException e) {
275             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
276                     "@text/thing-status-detail-openerror");
277             logger.debug("Error opening Anthem connection: {}", e.getMessage());
278             disconnect();
279             scheduleConnectRetry(reconnectIntervalMinutes);
280             return;
281         }
282         Thread localReaderThread = new Thread(this::readerThreadJob, "Anthem reader");
283         localReaderThread.setDaemon(true);
284         localReaderThread.start();
285         this.readerThread = localReaderThread;
286
287         Thread localSenderThread = new Thread(this::senderThreadJob, "Anthem sender");
288         localSenderThread.setDaemon(true);
289         localSenderThread.start();
290         this.senderThread = localSenderThread;
291
292         updateStatus(ThingStatus.ONLINE);
293
294         ScheduledFuture<?> localPollingJob = this.pollingJob;
295         if (localPollingJob == null) {
296             this.pollingJob = scheduler.scheduleWithFixedDelay(this::poll, POLLING_DELAY_SECONDS,
297                     POLLING_INTERVAL_SECONDS, TimeUnit.SECONDS);
298         }
299     }
300
301     private void poll() {
302         logger.debug("Polling...");
303         sendCommand(AnthemCommand.queryPower(Zone.MAIN));
304         sendCommand(AnthemCommand.queryPower(Zone.ZONE2));
305     }
306
307     private void scheduleConnectRetry(long waitMinutes) {
308         logger.debug("Scheduling connection retry in {} minutes", waitMinutes);
309         connectRetryJob = scheduler.schedule(this::connect, waitMinutes, TimeUnit.MINUTES);
310     }
311
312     private synchronized void disconnect() {
313         logger.debug("Disconnecting from Anthem");
314
315         ScheduledFuture<?> localPollingJob = this.pollingJob;
316         if (localPollingJob != null) {
317             localPollingJob.cancel(true);
318             this.pollingJob = null;
319         }
320
321         ScheduledFuture<?> localConnectRetryJob = this.connectRetryJob;
322         if (localConnectRetryJob != null) {
323             localConnectRetryJob.cancel(true);
324             this.connectRetryJob = null;
325         }
326
327         Thread localSenderThread = this.senderThread;
328         if (localSenderThread != null && localSenderThread.isAlive()) {
329             localSenderThread.interrupt();
330         }
331
332         Thread localReaderThread = this.readerThread;
333         if (localReaderThread != null && localReaderThread.isAlive()) {
334             localReaderThread.interrupt();
335         }
336         Socket localSocket = this.socket;
337         if (localSocket != null) {
338             try {
339                 localSocket.close();
340             } catch (IOException e) {
341                 logger.debug("Error closing socket: {}", e.getMessage());
342             }
343             this.socket = null;
344         }
345         BufferedReader localReader = this.reader;
346         if (localReader != null) {
347             try {
348                 localReader.close();
349             } catch (IOException e) {
350                 logger.debug("Error closing reader: {}", e.getMessage());
351             }
352             this.reader = null;
353         }
354         BufferedWriter localWriter = this.writer;
355         if (localWriter != null) {
356             try {
357                 localWriter.close();
358             } catch (IOException e) {
359                 logger.debug("Error closing writer: {}", e.getMessage());
360             }
361             this.writer = null;
362         }
363     }
364
365     private synchronized void reconnect() {
366         logger.debug("Attempting to reconnect to the Anthem");
367         disconnect();
368         connect();
369     }
370
371     private void senderThreadJob() {
372         logger.debug("Sender thread started");
373         try {
374             while (!Thread.currentThread().isInterrupted() && writer != null) {
375                 AnthemCommand command = sendQueue.take();
376                 logger.debug("Sender thread writing command: {}", command);
377                 try {
378                     BufferedWriter localWriter = this.writer;
379                     if (localWriter != null) {
380                         localWriter.write(command.toString());
381                         localWriter.flush();
382                     }
383                 } catch (InterruptedIOException e) {
384                     logger.debug("Interrupted while sending command");
385                     updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
386                             "@text/thing-status-detail-interrupted");
387                     break;
388                 } catch (IOException e) {
389                     logger.debug("Communication error, will try to reconnect. Error: {}", e.getMessage());
390                     updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
391                     // Requeue the command and try to reconnect
392                     sendQueue.add(command);
393                     reconnect();
394                     break;
395                 }
396                 // Introduce delay to throttle the send rate
397                 if (commandDelayMsec > 0) {
398                     Thread.sleep(commandDelayMsec);
399                 }
400             }
401         } catch (InterruptedException e) {
402             Thread.currentThread().interrupt();
403         } finally {
404             logger.debug("Sender thread exiting");
405         }
406     }
407
408     private void readerThreadJob() {
409         logger.debug("Reader thread started");
410         StringBuffer sbReader = new StringBuffer();
411         try {
412             char c;
413             String command;
414             BufferedReader localReader = this.reader;
415             while (!Thread.interrupted() && localReader != null) {
416                 c = (char) localReader.read();
417                 sbReader.append(c);
418                 if (c == COMMAND_TERMINATION_CHAR) {
419                     command = sbReader.toString();
420                     logger.debug("Reader thread sending command to parser: {}", command);
421                     messageParser.parseMessage(command);
422                     sbReader.setLength(0);
423                 }
424             }
425         } catch (InterruptedIOException e) {
426             logger.debug("Interrupted while reading");
427             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
428                     "@text/thing-status-detail-interrupted");
429         } catch (IOException e) {
430             logger.debug("I/O error while reading from socket: {}", e.getMessage());
431             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
432                     "@text/thing-status-detail-ioexception");
433         } finally {
434             logger.debug("Reader thread exiting");
435         }
436     }
437 }