2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.anthem.internal.handler;
15 import static org.openhab.binding.anthem.internal.AnthemBindingConstants.*;
17 import java.io.BufferedReader;
18 import java.io.BufferedWriter;
19 import java.io.IOException;
20 import java.io.InputStreamReader;
21 import java.io.InterruptedIOException;
22 import java.io.OutputStreamWriter;
23 import java.net.Socket;
24 import java.net.UnknownHostException;
25 import java.nio.charset.StandardCharsets;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.ScheduledFuture;
30 import java.util.concurrent.TimeUnit;
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.binding.anthem.internal.AnthemConfiguration;
35 import org.openhab.core.library.types.DecimalType;
36 import org.openhab.core.library.types.IncreaseDecreaseType;
37 import org.openhab.core.library.types.OnOffType;
38 import org.openhab.core.library.types.StringType;
39 import org.openhab.core.thing.ChannelUID;
40 import org.openhab.core.thing.Thing;
41 import org.openhab.core.thing.ThingStatus;
42 import org.openhab.core.thing.ThingStatusDetail;
43 import org.openhab.core.thing.binding.BaseThingHandler;
44 import org.openhab.core.types.Command;
45 import org.openhab.core.types.State;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
50 * The {@link AnthemHandler} is responsible for handling commands, which are
51 * sent to one of the channels. It also manages the connection to the AV processor.
52 * The reader thread receives solicited and unsolicited commands from the processor.
53 * The sender thread is used to send commands to the processor.
55 * @author Mark Hilbush - Initial contribution
58 public class AnthemHandler extends BaseThingHandler {
59 private Logger logger = LoggerFactory.getLogger(AnthemHandler.class);
61 private static final long POLLING_INTERVAL_SECONDS = 900L;
62 private static final long POLLING_DELAY_SECONDS = 10L;
64 private @Nullable Socket socket;
65 private @Nullable BufferedWriter writer;
66 private @Nullable BufferedReader reader;
68 private AnthemCommandParser commandParser;
70 private final BlockingQueue<AnthemCommand> sendQueue = new LinkedBlockingQueue<>();
72 private @Nullable Future<?> asyncInitializeTask;
73 private @Nullable ScheduledFuture<?> connectRetryJob;
74 private @Nullable ScheduledFuture<?> pollingJob;
76 private @Nullable Thread senderThread;
77 private @Nullable Thread readerThread;
79 private int reconnectIntervalMinutes;
80 private int commandDelayMsec;
82 private boolean zone1PreviousPowerState;
83 private boolean zone2PreviousPowerState;
85 public AnthemHandler(Thing thing) {
87 commandParser = new AnthemCommandParser();
91 public void initialize() {
92 AnthemConfiguration configuration = getConfig().as(AnthemConfiguration.class);
93 logger.debug("AnthemHandler: Configuration of thing {} is {}", thing.getUID().getId(), configuration);
95 if (!configuration.isValid()) {
96 logger.debug("AnthemHandler: Config of thing '{}' is invalid", thing.getUID().getId());
97 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
98 "@text/thing-status-detail-invalidconfig");
101 reconnectIntervalMinutes = configuration.reconnectIntervalMinutes;
102 commandDelayMsec = configuration.commandDelayMsec;
103 updateStatus(ThingStatus.UNKNOWN, ThingStatusDetail.NONE, "@text/thing-status-detail-connecting");
104 asyncInitializeTask = scheduler.submit(this::connect);
108 public void dispose() {
109 Future<?> localAsyncInitializeTask = this.asyncInitializeTask;
110 if (localAsyncInitializeTask != null) {
111 localAsyncInitializeTask.cancel(true);
112 this.asyncInitializeTask = null;
118 public void handleCommand(ChannelUID channelUID, Command command) {
119 logger.trace("Command {} received for channel {}", command, channelUID.getId().toString());
120 String groupId = channelUID.getGroupId();
121 if (groupId == null) {
125 if (CHANNEL_GROUP_GENERAL.equals(groupId)) {
126 handleGeneralCommand(channelUID, command);
128 handleZoneCommand(groupId, channelUID, command);
132 private void handleGeneralCommand(ChannelUID channelUID, Command command) {
133 switch (channelUID.getIdWithoutGroup()) {
134 case CHANNEL_COMMAND:
135 if (command instanceof StringType) {
136 sendCommand(AnthemCommand.customCommand(command.toString()));
140 logger.debug("Received general command '{}' for unhandled channel '{}'", command, channelUID.getId());
145 private void handleZoneCommand(String groupId, ChannelUID channelUID, Command command) {
146 Zone zone = Zone.fromValue(groupId);
148 switch (channelUID.getIdWithoutGroup()) {
150 if (command instanceof OnOffType) {
151 if (command == OnOffType.ON) {
152 // Power on the device
153 sendCommand(AnthemCommand.powerOn(zone));
154 } else if (command == OnOffType.OFF) {
155 sendCommand(AnthemCommand.powerOff(zone));
160 if (command instanceof OnOffType || command instanceof IncreaseDecreaseType) {
161 if (command == OnOffType.ON || command == IncreaseDecreaseType.INCREASE) {
162 sendCommand(AnthemCommand.volumeUp(zone, 1));
163 } else if (command == OnOffType.OFF || command == IncreaseDecreaseType.DECREASE) {
164 sendCommand(AnthemCommand.volumeDown(zone, 1));
168 case CHANNEL_VOLUME_DB:
169 if (command instanceof DecimalType) {
170 sendCommand(AnthemCommand.volume(zone, ((DecimalType) command).intValue()));
174 if (command instanceof OnOffType) {
175 if (command == OnOffType.ON) {
176 sendCommand(AnthemCommand.muteOn(zone));
177 } else if (command == OnOffType.OFF) {
178 sendCommand(AnthemCommand.muteOff(zone));
182 case CHANNEL_ACTIVE_INPUT:
183 if (command instanceof DecimalType) {
184 sendCommand(AnthemCommand.activeInput(zone, ((DecimalType) command).intValue()));
188 logger.debug("Received zone command '{}' for unhandled channel '{}'", command, channelUID.getId());
193 private void queryAdditionalInformation(Zone zone) {
194 // Request information about the device
195 sendCommand(AnthemCommand.queryNumAvailableInputs());
196 sendCommand(AnthemCommand.queryModel());
197 sendCommand(AnthemCommand.queryRegion());
198 sendCommand(AnthemCommand.querySoftwareVersion());
199 sendCommand(AnthemCommand.querySoftwareBuildDate());
200 sendCommand(AnthemCommand.queryHardwareVersion());
201 sendCommand(AnthemCommand.queryMacAddress());
202 sendCommand(AnthemCommand.queryVolume(zone));
203 sendCommand(AnthemCommand.queryMute(zone));
204 // Give some time for the input names to populate before requesting the active input
205 scheduler.schedule(() -> queryActiveInput(zone), 5L, TimeUnit.SECONDS);
208 private void queryActiveInput(Zone zone) {
209 sendCommand(AnthemCommand.queryActiveInput(zone));
212 private void sendCommand(AnthemCommand command) {
213 logger.debug("Adding command to queue: {}", command);
214 sendQueue.add(command);
217 private synchronized void connect() {
219 AnthemConfiguration configuration = getConfig().as(AnthemConfiguration.class);
220 logger.debug("Opening connection to Anthem host {} on port {}", configuration.host, configuration.port);
221 Socket socket = new Socket(configuration.host, configuration.port);
222 writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.ISO_8859_1));
223 reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.ISO_8859_1));
224 this.socket = socket;
225 } catch (UnknownHostException e) {
226 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
227 "@text/thing-status-detail-unknownhost");
229 } catch (IllegalArgumentException e) {
230 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
231 "@text/thing-status-detail-invalidport");
233 } catch (InterruptedIOException e) {
234 logger.debug("Interrupted while establishing Anthem connection");
235 Thread.currentThread().interrupt();
237 } catch (IOException e) {
238 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
239 "@text/thing-status-detail-openerror");
240 logger.debug("Error opening Anthem connection: {}", e.getMessage());
242 scheduleConnectRetry(reconnectIntervalMinutes);
245 Thread localReaderThread = new Thread(this::readerThreadJob, "Anthem reader");
246 localReaderThread.setDaemon(true);
247 localReaderThread.start();
248 this.readerThread = localReaderThread;
250 Thread localSenderThread = new Thread(this::senderThreadJob, "Anthem sender");
251 localSenderThread.setDaemon(true);
252 localSenderThread.start();
253 this.senderThread = localSenderThread;
255 updateStatus(ThingStatus.ONLINE);
257 ScheduledFuture<?> localPollingJob = this.pollingJob;
258 if (localPollingJob == null) {
259 this.pollingJob = scheduler.scheduleWithFixedDelay(this::poll, POLLING_DELAY_SECONDS,
260 POLLING_INTERVAL_SECONDS, TimeUnit.SECONDS);
264 private void poll() {
265 logger.debug("Polling...");
266 sendCommand(AnthemCommand.queryPower(Zone.MAIN));
267 sendCommand(AnthemCommand.queryPower(Zone.ZONE2));
270 private void scheduleConnectRetry(long waitMinutes) {
271 logger.debug("Scheduling connection retry in {} minutes", waitMinutes);
272 connectRetryJob = scheduler.schedule(this::connect, waitMinutes, TimeUnit.MINUTES);
275 private synchronized void disconnect() {
276 logger.debug("Disconnecting from Anthem");
278 ScheduledFuture<?> localPollingJob = this.pollingJob;
279 if (localPollingJob != null) {
280 localPollingJob.cancel(true);
281 this.pollingJob = null;
284 ScheduledFuture<?> localConnectRetryJob = this.connectRetryJob;
285 if (localConnectRetryJob != null) {
286 localConnectRetryJob.cancel(true);
287 this.connectRetryJob = null;
290 Thread localSenderThread = this.senderThread;
291 if (localSenderThread != null && localSenderThread.isAlive()) {
292 localSenderThread.interrupt();
295 Thread localReaderThread = this.readerThread;
296 if (localReaderThread != null && localReaderThread.isAlive()) {
297 localReaderThread.interrupt();
299 Socket localSocket = this.socket;
300 if (localSocket != null) {
303 } catch (IOException e) {
304 logger.debug("Error closing socket: {}", e.getMessage());
308 BufferedReader localReader = this.reader;
309 if (localReader != null) {
312 } catch (IOException e) {
313 logger.debug("Error closing reader: {}", e.getMessage());
317 BufferedWriter localWriter = this.writer;
318 if (localWriter != null) {
321 } catch (IOException e) {
322 logger.debug("Error closing writer: {}", e.getMessage());
328 private synchronized void reconnect() {
329 logger.debug("Attempting to reconnect to the Anthem");
334 private void senderThreadJob() {
335 logger.debug("Sender thread started");
337 while (!Thread.currentThread().isInterrupted() && writer != null) {
338 AnthemCommand command = sendQueue.take();
339 logger.debug("Sender thread writing command: {}", command);
341 BufferedWriter localWriter = this.writer;
342 if (localWriter != null) {
343 localWriter.write(command.toString());
346 } catch (InterruptedIOException e) {
347 logger.debug("Interrupted while sending command");
348 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
349 "@text/thing-status-detail-interrupted");
351 } catch (IOException e) {
352 logger.debug("Communication error, will try to reconnect. Error: {}", e.getMessage());
353 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
354 // Requeue the command and try to reconnect
355 sendQueue.add(command);
359 // Introduce delay to throttle the send rate
360 if (commandDelayMsec > 0) {
361 Thread.sleep(commandDelayMsec);
364 } catch (InterruptedException e) {
365 Thread.currentThread().interrupt();
367 logger.debug("Sender thread exiting");
371 private void readerThreadJob() {
372 logger.debug("Reader thread started");
373 StringBuffer sbReader = new StringBuffer();
377 BufferedReader localReader = this.reader;
378 while (!Thread.interrupted() && localReader != null) {
379 c = (char) localReader.read();
381 if (c == COMMAND_TERMINATION_CHAR) {
382 command = sbReader.toString();
383 logger.debug("Reader thread sending command to parser: {}", command);
384 AnthemUpdate update = commandParser.parseCommand(command);
385 if (update != null) {
386 processUpdate(update);
388 sbReader.setLength(0);
391 } catch (InterruptedIOException e) {
392 logger.debug("Interrupted while reading");
393 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
394 "@text/thing-status-detail-interrupted");
395 } catch (IOException e) {
396 logger.debug("I/O error while reading from socket: {}", e.getMessage());
397 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
398 "@text/thing-status-detail-ioexception");
400 logger.debug("Reader thread exiting");
404 private void processUpdate(AnthemUpdate update) {
406 if (update.isStateUpdate()) {
407 StateUpdate stateUpdate = update.getStateUpdate();
408 updateState(stateUpdate.getGroupId() + ChannelUID.CHANNEL_GROUP_SEPARATOR + stateUpdate.getChannelId(),
409 stateUpdate.getState());
410 postProcess(stateUpdate);
413 else if (update.isPropertyUpdate()) {
414 PropertyUpdate propertyUpdate = update.getPropertyUpdate();
415 updateProperty(propertyUpdate.getName(), propertyUpdate.getValue());
416 postProcess(propertyUpdate);
420 private void postProcess(StateUpdate stateUpdate) {
421 switch (stateUpdate.getChannelId()) {
423 checkPowerStatusChange(stateUpdate);
425 case CHANNEL_ACTIVE_INPUT:
426 updateInputNameChannels(stateUpdate);
431 private void checkPowerStatusChange(StateUpdate stateUpdate) {
432 String zone = stateUpdate.getGroupId();
433 State power = stateUpdate.getState();
435 if (Zone.MAIN.equals(Zone.fromValue(zone))) {
436 boolean newZone1PowerState = (power instanceof OnOffType && power == OnOffType.ON) ? true : false;
437 if (!zone1PreviousPowerState && newZone1PowerState) {
438 // Power turned on for main zone.
439 // This will cause the main zone channel states to be updated
440 scheduler.submit(() -> queryAdditionalInformation(Zone.MAIN));
442 zone1PreviousPowerState = newZone1PowerState;
445 else if (Zone.ZONE2.equals(Zone.fromValue(zone))) {
446 boolean newZone2PowerState = (power instanceof OnOffType && power == OnOffType.ON) ? true : false;
447 if (!zone2PreviousPowerState && newZone2PowerState) {
448 // Power turned on for zone 2.
449 // This will cause zone 2 channel states to be updated
450 scheduler.submit(() -> queryAdditionalInformation(Zone.ZONE2));
452 zone2PreviousPowerState = newZone2PowerState;
456 private void updateInputNameChannels(StateUpdate stateUpdate) {
457 State state = stateUpdate.getState();
458 String groupId = stateUpdate.getGroupId();
459 if (state instanceof StringType) {
460 updateState(groupId + ChannelUID.CHANNEL_GROUP_SEPARATOR + CHANNEL_ACTIVE_INPUT_SHORT_NAME,
461 new StringType(commandParser.getInputShortName(state.toString())));
462 updateState(groupId + ChannelUID.CHANNEL_GROUP_SEPARATOR + CHANNEL_ACTIVE_INPUT_LONG_NAME,
463 new StringType(commandParser.getInputLongName(state.toString())));
467 private void postProcess(PropertyUpdate propertyUpdate) {
468 switch (propertyUpdate.getName()) {
469 case PROPERTY_NUM_AVAILABLE_INPUTS:
470 queryAllInputNames(propertyUpdate);
475 private void queryAllInputNames(PropertyUpdate propertyUpdate) {
477 int numInputs = Integer.parseInt(propertyUpdate.getValue());
478 for (int input = 1; input <= numInputs; input++) {
479 sendCommand(AnthemCommand.queryInputShortName(input));
480 sendCommand(AnthemCommand.queryInputLongName(input));
482 } catch (NumberFormatException e) {
483 logger.debug("Unable to convert property '{}' to integer: {}", propertyUpdate.getName(),
484 propertyUpdate.getValue());