2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.anthem.internal.handler;
15 import static org.openhab.binding.anthem.internal.AnthemBindingConstants.*;
17 import java.io.BufferedReader;
18 import java.io.BufferedWriter;
19 import java.io.IOException;
20 import java.io.InputStreamReader;
21 import java.io.InterruptedIOException;
22 import java.io.OutputStreamWriter;
23 import java.net.Socket;
24 import java.net.UnknownHostException;
25 import java.nio.charset.StandardCharsets;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.ScheduledFuture;
30 import java.util.concurrent.TimeUnit;
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.binding.anthem.internal.AnthemConfiguration;
35 import org.openhab.core.library.types.DecimalType;
36 import org.openhab.core.library.types.IncreaseDecreaseType;
37 import org.openhab.core.library.types.OnOffType;
38 import org.openhab.core.thing.ChannelUID;
39 import org.openhab.core.thing.Thing;
40 import org.openhab.core.thing.ThingStatus;
41 import org.openhab.core.thing.ThingStatusDetail;
42 import org.openhab.core.thing.binding.BaseThingHandler;
43 import org.openhab.core.types.Command;
44 import org.openhab.core.types.State;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
49 * The {@link AnthemHandler} is responsible for handling commands, which are
50 * sent to one of the channels. It also manages the connection to the AV processor.
51 * The reader thread receives solicited and unsolicited commands from the processor.
52 * The sender thread is used to send commands to the processor.
54 * @author Mark Hilbush - Initial contribution
57 public class AnthemHandler extends BaseThingHandler {
58 private Logger logger = LoggerFactory.getLogger(AnthemHandler.class);
60 private static final long POLLING_INTERVAL_SECONDS = 900L;
61 private static final long POLLING_DELAY_SECONDS = 10L;
63 private @Nullable Socket socket;
64 private @Nullable BufferedWriter writer;
65 private @Nullable BufferedReader reader;
67 private AnthemCommandParser messageParser;
69 private final BlockingQueue<AnthemCommand> sendQueue = new LinkedBlockingQueue<>();
71 private @Nullable Future<?> asyncInitializeTask;
72 private @Nullable ScheduledFuture<?> connectRetryJob;
73 private @Nullable ScheduledFuture<?> pollingJob;
75 private @Nullable Thread senderThread;
76 private @Nullable Thread readerThread;
78 private int reconnectIntervalMinutes;
79 private int commandDelayMsec;
81 private boolean zone1PreviousPowerState;
82 private boolean zone2PreviousPowerState;
84 public AnthemHandler(Thing thing) {
86 messageParser = new AnthemCommandParser(this);
90 public void initialize() {
91 AnthemConfiguration configuration = getConfig().as(AnthemConfiguration.class);
92 logger.debug("AnthemHandler: Configuration of thing {} is {}", thing.getUID().getId(), configuration);
94 if (!configuration.isValid()) {
95 logger.debug("AnthemHandler: Config of thing '{}' is invalid", thing.getUID().getId());
96 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
97 "@text/thing-status-detail-invalidconfig");
100 reconnectIntervalMinutes = configuration.reconnectIntervalMinutes;
101 commandDelayMsec = configuration.commandDelayMsec;
102 updateStatus(ThingStatus.UNKNOWN, ThingStatusDetail.NONE, "@text/thing-status-detail-connecting");
103 asyncInitializeTask = scheduler.submit(this::connect);
107 public void dispose() {
108 Future<?> localAsyncInitializeTask = this.asyncInitializeTask;
109 if (localAsyncInitializeTask != null) {
110 localAsyncInitializeTask.cancel(true);
111 this.asyncInitializeTask = null;
117 public void handleCommand(ChannelUID channelUID, Command command) {
118 logger.trace("Command {} received for channel {}", command, channelUID.getId().toString());
119 String groupId = channelUID.getGroupId();
120 if (groupId == null) {
123 Zone zone = Zone.fromValue(groupId);
125 switch (channelUID.getIdWithoutGroup()) {
127 if (command instanceof OnOffType) {
128 if (command == OnOffType.ON) {
129 // Power on the device
130 sendCommand(AnthemCommand.powerOn(zone));
131 } else if (command == OnOffType.OFF) {
132 sendCommand(AnthemCommand.powerOff(zone));
137 if (command instanceof OnOffType || command instanceof IncreaseDecreaseType) {
138 if (command == OnOffType.ON || command == IncreaseDecreaseType.INCREASE) {
139 sendCommand(AnthemCommand.volumeUp(zone, 1));
140 } else if (command == OnOffType.OFF || command == IncreaseDecreaseType.DECREASE) {
141 sendCommand(AnthemCommand.volumeDown(zone, 1));
145 case CHANNEL_VOLUME_DB:
146 if (command instanceof DecimalType) {
147 sendCommand(AnthemCommand.volume(zone, ((DecimalType) command).intValue()));
151 if (command instanceof OnOffType) {
152 if (command == OnOffType.ON) {
153 sendCommand(AnthemCommand.muteOn(zone));
154 } else if (command == OnOffType.OFF) {
155 sendCommand(AnthemCommand.muteOff(zone));
159 case CHANNEL_ACTIVE_INPUT:
160 if (command instanceof DecimalType) {
161 sendCommand(AnthemCommand.activeInput(zone, ((DecimalType) command).intValue()));
165 logger.debug("Received command '{}' for unhandled channel '{}'", command, channelUID.getId());
170 public void setModel(String model) {
171 updateProperty("Model", model);
174 public void setRegion(String region) {
175 updateProperty("Region", region);
178 public void setSoftwareVersion(String version) {
179 updateProperty("Software Version", version);
182 public void setSoftwareBuildDate(String date) {
183 updateProperty("Software Build Date", date);
186 public void setHardwareVersion(String version) {
187 updateProperty("Hardware Version", version);
190 public void setMacAddress(String mac) {
191 updateProperty("Mac Address", mac);
194 public void updateChannelState(String zone, String channelId, State state) {
195 updateState(zone + "#" + channelId, state);
198 public void checkPowerStatusChange(String zone, String power) {
200 if (Zone.MAIN.equals(Zone.fromValue(zone))) {
201 boolean newZone1PowerState = "1".equals(power) ? true : false;
202 if (!zone1PreviousPowerState && newZone1PowerState) {
203 // Power turned on for main zone.
204 // This will cause the main zone channel states to be updated
205 scheduler.submit(() -> queryAdditionalInformation(Zone.MAIN));
207 zone1PreviousPowerState = newZone1PowerState;
210 else if (Zone.ZONE2.equals(Zone.fromValue(zone))) {
211 boolean newZone2PowerState = "1".equals(power) ? true : false;
212 if (!zone2PreviousPowerState && newZone2PowerState) {
213 // Power turned on for zone 2.
214 // This will cause zone 2 channel states to be updated
215 scheduler.submit(() -> queryAdditionalInformation(Zone.ZONE2));
217 zone2PreviousPowerState = newZone2PowerState;
221 public void setNumAvailableInputs(int numInputs) {
222 // Request the names for all the inputs
223 for (int input = 1; input <= numInputs; input++) {
224 sendCommand(AnthemCommand.queryInputShortName(input));
225 sendCommand(AnthemCommand.queryInputLongName(input));
227 updateProperty("Number of Inputs", String.valueOf(numInputs));
230 private void queryAdditionalInformation(Zone zone) {
231 // Request information about the device
232 sendCommand(AnthemCommand.queryNumAvailableInputs());
233 sendCommand(AnthemCommand.queryModel());
234 sendCommand(AnthemCommand.queryRegion());
235 sendCommand(AnthemCommand.querySoftwareVersion());
236 sendCommand(AnthemCommand.querySoftwareBuildDate());
237 sendCommand(AnthemCommand.queryHardwareVersion());
238 sendCommand(AnthemCommand.queryMacAddress());
239 sendCommand(AnthemCommand.queryVolume(zone));
240 sendCommand(AnthemCommand.queryMute(zone));
241 // Give some time for the input names to populate before requesting the active input
242 scheduler.schedule(() -> queryActiveInput(zone), 5L, TimeUnit.SECONDS);
245 private void queryActiveInput(Zone zone) {
246 sendCommand(AnthemCommand.queryActiveInput(zone));
249 private void sendCommand(AnthemCommand command) {
250 logger.debug("Adding command to queue: {}", command);
251 sendQueue.add(command);
254 private synchronized void connect() {
256 AnthemConfiguration configuration = getConfig().as(AnthemConfiguration.class);
257 logger.debug("Opening connection to Anthem host {} on port {}", configuration.host, configuration.port);
258 Socket socket = new Socket(configuration.host, configuration.port);
259 writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.ISO_8859_1));
260 reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.ISO_8859_1));
261 this.socket = socket;
262 } catch (UnknownHostException e) {
263 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
264 "@text/thing-status-detail-unknownhost");
266 } catch (IllegalArgumentException e) {
267 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
268 "@text/thing-status-detail-invalidport");
270 } catch (InterruptedIOException e) {
271 logger.debug("Interrupted while establishing Anthem connection");
272 Thread.currentThread().interrupt();
274 } catch (IOException e) {
275 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
276 "@text/thing-status-detail-openerror");
277 logger.debug("Error opening Anthem connection: {}", e.getMessage());
279 scheduleConnectRetry(reconnectIntervalMinutes);
282 Thread localReaderThread = new Thread(this::readerThreadJob, "Anthem reader");
283 localReaderThread.setDaemon(true);
284 localReaderThread.start();
285 this.readerThread = localReaderThread;
287 Thread localSenderThread = new Thread(this::senderThreadJob, "Anthem sender");
288 localSenderThread.setDaemon(true);
289 localSenderThread.start();
290 this.senderThread = localSenderThread;
292 updateStatus(ThingStatus.ONLINE);
294 ScheduledFuture<?> localPollingJob = this.pollingJob;
295 if (localPollingJob == null) {
296 this.pollingJob = scheduler.scheduleWithFixedDelay(this::poll, POLLING_DELAY_SECONDS,
297 POLLING_INTERVAL_SECONDS, TimeUnit.SECONDS);
301 private void poll() {
302 logger.debug("Polling...");
303 sendCommand(AnthemCommand.queryPower(Zone.MAIN));
304 sendCommand(AnthemCommand.queryPower(Zone.ZONE2));
307 private void scheduleConnectRetry(long waitMinutes) {
308 logger.debug("Scheduling connection retry in {} minutes", waitMinutes);
309 connectRetryJob = scheduler.schedule(this::connect, waitMinutes, TimeUnit.MINUTES);
312 private synchronized void disconnect() {
313 logger.debug("Disconnecting from Anthem");
315 ScheduledFuture<?> localPollingJob = this.pollingJob;
316 if (localPollingJob != null) {
317 localPollingJob.cancel(true);
318 this.pollingJob = null;
321 ScheduledFuture<?> localConnectRetryJob = this.connectRetryJob;
322 if (localConnectRetryJob != null) {
323 localConnectRetryJob.cancel(true);
324 this.connectRetryJob = null;
327 Thread localSenderThread = this.senderThread;
328 if (localSenderThread != null && localSenderThread.isAlive()) {
329 localSenderThread.interrupt();
332 Thread localReaderThread = this.readerThread;
333 if (localReaderThread != null && localReaderThread.isAlive()) {
334 localReaderThread.interrupt();
336 Socket localSocket = this.socket;
337 if (localSocket != null) {
340 } catch (IOException e) {
341 logger.debug("Error closing socket: {}", e.getMessage());
345 BufferedReader localReader = this.reader;
346 if (localReader != null) {
349 } catch (IOException e) {
350 logger.debug("Error closing reader: {}", e.getMessage());
354 BufferedWriter localWriter = this.writer;
355 if (localWriter != null) {
358 } catch (IOException e) {
359 logger.debug("Error closing writer: {}", e.getMessage());
365 private synchronized void reconnect() {
366 logger.debug("Attempting to reconnect to the Anthem");
371 private void senderThreadJob() {
372 logger.debug("Sender thread started");
374 while (!Thread.currentThread().isInterrupted() && writer != null) {
375 AnthemCommand command = sendQueue.take();
376 logger.debug("Sender thread writing command: {}", command);
378 BufferedWriter localWriter = this.writer;
379 if (localWriter != null) {
380 localWriter.write(command.toString());
383 } catch (InterruptedIOException e) {
384 logger.debug("Interrupted while sending command");
385 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
386 "@text/thing-status-detail-interrupted");
388 } catch (IOException e) {
389 logger.debug("Communication error, will try to reconnect. Error: {}", e.getMessage());
390 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
391 // Requeue the command and try to reconnect
392 sendQueue.add(command);
396 // Introduce delay to throttle the send rate
397 if (commandDelayMsec > 0) {
398 Thread.sleep(commandDelayMsec);
401 } catch (InterruptedException e) {
402 Thread.currentThread().interrupt();
404 logger.debug("Sender thread exiting");
408 private void readerThreadJob() {
409 logger.debug("Reader thread started");
410 StringBuffer sbReader = new StringBuffer();
414 BufferedReader localReader = this.reader;
415 while (!Thread.interrupted() && localReader != null) {
416 c = (char) localReader.read();
418 if (c == COMMAND_TERMINATION_CHAR) {
419 command = sbReader.toString();
420 logger.debug("Reader thread sending command to parser: {}", command);
421 messageParser.parseMessage(command);
422 sbReader.setLength(0);
425 } catch (InterruptedIOException e) {
426 logger.debug("Interrupted while reading");
427 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
428 "@text/thing-status-detail-interrupted");
429 } catch (IOException e) {
430 logger.debug("I/O error while reading from socket: {}", e.getMessage());
431 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
432 "@text/thing-status-detail-ioexception");
434 logger.debug("Reader thread exiting");