2 * Copyright (c) 2010-2021 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.regoheatpump.internal.handler;
15 import static org.openhab.binding.regoheatpump.internal.RegoHeatPumpBindingConstants.*;
17 import java.io.IOException;
18 import java.io.InputStream;
19 import java.io.OutputStream;
20 import java.util.Collection;
21 import java.util.Date;
22 import java.util.HashMap;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import java.util.function.Consumer;
27 import java.util.function.Function;
28 import java.util.stream.Collectors;
30 import javax.measure.Unit;
32 import org.openhab.binding.regoheatpump.internal.protocol.RegoConnection;
33 import org.openhab.binding.regoheatpump.internal.rego6xx.CommandFactory;
34 import org.openhab.binding.regoheatpump.internal.rego6xx.ErrorLine;
35 import org.openhab.binding.regoheatpump.internal.rego6xx.Rego6xxProtocolException;
36 import org.openhab.binding.regoheatpump.internal.rego6xx.RegoRegisterMapper;
37 import org.openhab.binding.regoheatpump.internal.rego6xx.ResponseParser;
38 import org.openhab.binding.regoheatpump.internal.rego6xx.ResponseParserFactory;
39 import org.openhab.core.library.types.DateTimeType;
40 import org.openhab.core.library.types.DecimalType;
41 import org.openhab.core.library.types.QuantityType;
42 import org.openhab.core.library.types.StringType;
43 import org.openhab.core.thing.Channel;
44 import org.openhab.core.thing.ChannelUID;
45 import org.openhab.core.thing.Thing;
46 import org.openhab.core.thing.ThingStatus;
47 import org.openhab.core.thing.ThingStatusDetail;
48 import org.openhab.core.thing.binding.BaseThingHandler;
49 import org.openhab.core.types.Command;
50 import org.openhab.core.types.RefreshType;
51 import org.openhab.core.types.State;
52 import org.openhab.core.types.UnDefType;
53 import org.openhab.core.util.HexUtils;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
58 * The {@link Rego6xxHeatPumpHandler} is responsible for handling commands, which are
59 * sent to one of the channels.
61 * @author Boris Krivonog - Initial contribution
63 abstract class Rego6xxHeatPumpHandler extends BaseThingHandler {
65 private static final class ChannelDescriptor {
66 private Date lastUpdate;
67 private byte[] cachedValue;
69 public byte[] cachedValueIfNotExpired(int refreshTime) {
70 if (lastUpdate == null || (lastUpdate.getTime() + refreshTime * 900 < new Date().getTime())) {
77 public void setValue(byte[] value) {
78 lastUpdate = new Date();
83 private final Logger logger = LoggerFactory.getLogger(Rego6xxHeatPumpHandler.class);
84 private final Map<String, ChannelDescriptor> channelDescriptors = new HashMap<>();
85 private int refreshInterval;
86 private RegoConnection connection;
87 private RegoRegisterMapper mapper;
88 private ScheduledFuture<?> scheduledRefreshFuture;
90 protected Rego6xxHeatPumpHandler(Thing thing) {
94 protected abstract RegoConnection createConnection();
97 public void initialize() {
98 mapper = RegoRegisterMapper.REGO600;
99 refreshInterval = ((Number) getConfig().get(REFRESH_INTERVAL)).intValue();
101 connection = createConnection();
103 scheduledRefreshFuture = scheduler.scheduleWithFixedDelay(this::refresh, 2, refreshInterval, TimeUnit.SECONDS);
105 updateStatus(ThingStatus.UNKNOWN);
109 public void dispose() {
112 if (connection != null) {
116 if (scheduledRefreshFuture != null) {
117 scheduledRefreshFuture.cancel(true);
118 scheduledRefreshFuture = null;
121 synchronized (channelDescriptors) {
122 channelDescriptors.clear();
130 public void handleCommand(ChannelUID channelUID, Command command) {
131 if (command instanceof RefreshType) {
132 processChannelReadRequest(channelUID.getId());
134 RegoRegisterMapper.Channel channel = mapper.map(channelUID.getId());
135 if (channel != null) {
136 logger.debug("Executing command '{}' for channel '{}'", command, channelUID.getId());
137 processChannelWriteRequest(channel, command);
139 logger.debug("Unsupported channel {}", channelUID.getId());
144 private static double commandToValue(Command command) {
145 if (command instanceof QuantityType<?>) {
146 return ((QuantityType<?>) command).doubleValue();
149 if (command instanceof DecimalType) {
150 return ((DecimalType) command).doubleValue();
153 throw new NumberFormatException("Command '" + command + "' not supported");
156 private void processChannelWriteRequest(RegoRegisterMapper.Channel channel, Command command) {
157 short value = (short) Math.round(commandToValue(command) / channel.scaleFactor());
158 byte[] commandPayload = CommandFactory.createWriteToSystemRegisterCommand(channel.address(), value);
159 executeCommand(null, commandPayload, ResponseParserFactory.WRITE, result -> {
160 // Ignore result since it is a write command.
164 private void processChannelReadRequest(String channelIID) {
165 switch (channelIID) {
166 case CHANNEL_LAST_ERROR_TYPE:
167 readAndUpdateLastErrorType();
169 case CHANNEL_LAST_ERROR_TIMESTAMP:
170 readAndUpdateLastErrorTimestamp();
172 case CHANNEL_FRONT_PANEL_POWER_LAMP:
173 readAndUpdateFrontPanel(channelIID, (short) 0x0012);
175 case CHANNEL_FRONT_PANEL_PUMP_LAMP:
176 readAndUpdateFrontPanel(channelIID, (short) 0x0013);
178 case CHANNEL_FRONT_PANEL_ADDITIONAL_HEAT_LAMP:
179 readAndUpdateFrontPanel(channelIID, (short) 0x0014);
181 case CHANNEL_FRONT_PANEL_WATER_HEATER_LAMP:
182 readAndUpdateFrontPanel(channelIID, (short) 0x0015);
184 case CHANNEL_FRONT_PANEL_ALARM_LAMP:
185 readAndUpdateFrontPanel(channelIID, (short) 0x0016);
188 readAndUpdateSystemRegister(channelIID);
193 private Collection<String> linkedChannels() {
194 return thing.getChannels().stream().map(Channel::getUID).map(ChannelUID::getId).filter(this::isLinked)
195 .collect(Collectors.toList());
198 private void refresh() {
199 for (String channelIID : linkedChannels()) {
200 if (Thread.interrupted()) {
204 processChannelReadRequest(channelIID);
206 if (thing.getStatus() != ThingStatus.ONLINE) {
212 private void readAndUpdateLastErrorType() {
213 readAndUpdateLastError(CHANNEL_LAST_ERROR_TYPE, e -> new StringType(Byte.toString(e.error())));
216 private void readAndUpdateLastErrorTimestamp() {
217 readAndUpdateLastError(CHANNEL_LAST_ERROR_TIMESTAMP, e -> new DateTimeType(e.timestamp()));
220 private void readAndUpdateLastError(String channelIID, Function<ErrorLine, State> converter) {
221 executeCommandAndUpdateState(channelIID, CommandFactory.createReadLastErrorCommand(),
222 ResponseParserFactory.ERROR_LINE, e -> {
223 return e == null ? UnDefType.NULL : converter.apply(e);
227 private void readAndUpdateFrontPanel(String channelIID, short address) {
228 byte[] command = CommandFactory.createReadFromFrontPanelCommand(address);
229 executeCommandAndUpdateState(channelIID, command, ResponseParserFactory.SHORT, DecimalType::new);
232 private void readAndUpdateSystemRegister(String channelIID) {
233 RegoRegisterMapper.Channel channel = mapper.map(channelIID);
234 if (channel != null) {
235 byte[] command = CommandFactory.createReadFromSystemRegisterCommand(channel.address());
236 executeCommandAndUpdateState(channelIID, command, ResponseParserFactory.SHORT, value -> {
237 Unit<?> unit = channel.unit();
238 double result = Math.round(channel.convertValue(value) * channel.scaleFactor() * 10.0) / 10.0;
239 return unit != null ? new QuantityType<>(result, unit) : new DecimalType(result);
242 logger.debug("Unsupported channel {}", channelIID);
246 private <T> void executeCommandAndUpdateState(String channelIID, byte[] command, ResponseParser<T> parser,
247 Function<T, State> converter) {
248 logger.debug("Reading value for channel '{}' ...", channelIID);
249 executeCommand(channelIID, command, parser, result -> {
250 logger.debug("Got value for '{}' = {}", channelIID, result);
251 updateState(channelIID, converter.apply(result));
255 private synchronized <T> void executeCommand(String channelIID, byte[] command, ResponseParser<T> parser,
256 Consumer<T> resultProcessor) {
258 T result = executeCommandWithRetry(channelIID, command, parser, 5);
259 resultProcessor.accept(result);
260 } catch (IOException e) {
261 logger.warn("Executing command for channel '{}' failed.", channelIID, e);
263 synchronized (channelDescriptors) {
264 channelDescriptors.clear();
267 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getMessage());
268 } catch (Rego6xxProtocolException e) {
269 logger.warn("Executing command for channel '{}' failed.", channelIID, e);
270 updateState(channelIID, UnDefType.UNDEF);
271 } catch (InterruptedException e) {
272 logger.debug("Execution interrupted when accessing value for channel '{}'.", channelIID, e);
273 Thread.currentThread().interrupt();
277 private <T> T executeCommandWithRetry(String channelIID, byte[] command, ResponseParser<T> parser, int retry)
278 throws Rego6xxProtocolException, IOException, InterruptedException {
281 return executeCommand(channelIID, command, parser);
282 } catch (IOException | Rego6xxProtocolException e) {
284 logger.debug("Executing command for channel '{}' failed, retry {}.", channelIID, retry, e);
286 return executeCommandWithRetry(channelIID, command, parser, retry - 1);
293 private void checkRegoDevice() throws Rego6xxProtocolException, IOException, InterruptedException {
294 if (thing.getStatus() != ThingStatus.ONLINE) {
295 logger.debug("Reading Rego device version...");
296 Short regoVersion = executeCommand(null, CommandFactory.createReadRegoVersionCommand(),
297 ResponseParserFactory.SHORT);
299 if (regoVersion != 600) {
300 throw new IOException("Invalid rego version received " + regoVersion.toString());
303 updateStatus(ThingStatus.ONLINE);
304 logger.debug("Connected to Rego version {}.", regoVersion);
308 private ChannelDescriptor channelDescriptorForChannel(String channelIID) {
309 synchronized (channelDescriptors) {
310 ChannelDescriptor descriptor = channelDescriptors.get(channelIID);
311 if (descriptor == null) {
312 descriptor = new ChannelDescriptor();
313 channelDescriptors.put(channelIID, descriptor);
319 private <T> T executeCommand(String channelIID, byte[] command, ResponseParser<T> parser)
320 throws Rego6xxProtocolException, IOException, InterruptedException {
322 return executeCommandInternal(channelIID, command, parser);
323 } catch (IOException e) {
324 if (connection != null) {
332 private <T> T executeCommandInternal(String channelIID, byte[] command, ResponseParser<T> parser)
333 throws Rego6xxProtocolException, IOException, InterruptedException {
334 // CHANNEL_LAST_ERROR_CODE and CHANNEL_LAST_ERROR_TIMESTAMP are read from same
335 // register. To prevent accessing same register twice when both channels are linked,
336 // use same name for both so only a single fetch will be triggered.
337 String mappedChannelIID = (CHANNEL_LAST_ERROR_TYPE.equals(channelIID)
338 || CHANNEL_LAST_ERROR_TIMESTAMP.equals(channelIID)) ? CHANNEL_LAST_ERROR : channelIID;
340 // Use transient channel descriptor for null (not cached) channels.
341 ChannelDescriptor descriptor = channelIID == null ? new ChannelDescriptor()
342 : channelDescriptorForChannel(mappedChannelIID);
344 byte[] cachedValue = descriptor.cachedValueIfNotExpired(refreshInterval);
345 if (cachedValue != null) {
346 logger.debug("Cache did not yet expire, using cached value for {}", mappedChannelIID);
347 return parser.parse(cachedValue);
350 // Send command to device and wait for response.
351 if (!connection.isConnected()) {
352 connection.connect();
355 // Give heat pump some time between commands. Feeding commands too quickly
356 // might cause heat pump not to respond.
359 // Protocol is request driven so there should be no data available before sending
360 // a command to the heat pump.
361 InputStream inputStream = connection.inputStream();
362 int available = inputStream.available();
364 // Limit to max 64 bytes, fuse.
365 byte[] buffer = new byte[Math.min(64, available)];
366 inputStream.read(buffer);
367 logger.debug("There are {} unexpected bytes available. Skipping {}.", available, buffer);
370 if (logger.isDebugEnabled()) {
371 logger.debug("Sending {}", HexUtils.bytesToHex(command));
375 OutputStream outputStream = connection.outputStream();
376 outputStream.write(command);
377 outputStream.flush();
379 // Read response, wait for max 2 second for data to arrive.
380 byte[] response = new byte[parser.responseLength()];
381 long timeout = System.currentTimeMillis() + 2000;
385 int len = inputStream.read(response, pos, response.length - pos);
389 // Give some time for response to arrive...
393 } while (pos < response.length && timeout > System.currentTimeMillis());
395 if (pos < response.length) {
396 logger.debug("Response not received, read {} bytes => {}", pos, response);
398 throw new IOException("Response not received - got " + Integer.toString(pos) + " bytes of "
399 + Integer.toString(response.length));
402 if (logger.isDebugEnabled()) {
403 logger.debug("Received {}", HexUtils.bytesToHex(response));
406 T result = parser.parse(response);
408 // If reading/parsing was done successfully, cache response payload.
409 descriptor.setValue(response);