2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.knx.internal.handler;
15 import static org.openhab.binding.knx.internal.KNXBindingConstants.*;
17 import java.math.BigDecimal;
18 import java.time.Duration;
19 import java.util.List;
21 import java.util.Objects;
22 import java.util.Random;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.Future;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.TimeUnit;
30 import org.eclipse.jdt.annotation.NonNullByDefault;
31 import org.eclipse.jdt.annotation.Nullable;
32 import org.openhab.binding.knx.internal.KNXBindingConstants;
33 import org.openhab.binding.knx.internal.channel.KNXChannel;
34 import org.openhab.binding.knx.internal.channel.KNXChannelFactory;
35 import org.openhab.binding.knx.internal.client.AbstractKNXClient;
36 import org.openhab.binding.knx.internal.client.DeviceInspector;
37 import org.openhab.binding.knx.internal.client.InboundSpec;
38 import org.openhab.binding.knx.internal.client.KNXClient;
39 import org.openhab.binding.knx.internal.client.OutboundSpec;
40 import org.openhab.binding.knx.internal.config.DeviceConfig;
41 import org.openhab.binding.knx.internal.dpt.DPTUtil;
42 import org.openhab.binding.knx.internal.dpt.ValueDecoder;
43 import org.openhab.binding.knx.internal.i18n.KNXTranslationProvider;
44 import org.openhab.core.cache.ExpiringCacheMap;
45 import org.openhab.core.library.types.IncreaseDecreaseType;
46 import org.openhab.core.thing.Bridge;
47 import org.openhab.core.thing.Channel;
48 import org.openhab.core.thing.ChannelUID;
49 import org.openhab.core.thing.Thing;
50 import org.openhab.core.thing.ThingStatus;
51 import org.openhab.core.thing.ThingStatusDetail;
52 import org.openhab.core.thing.ThingStatusInfo;
53 import org.openhab.core.thing.binding.BaseThingHandler;
54 import org.openhab.core.types.Command;
55 import org.openhab.core.types.RefreshType;
56 import org.openhab.core.types.State;
57 import org.openhab.core.types.Type;
58 import org.openhab.core.types.UnDefType;
59 import org.openhab.core.util.HexUtils;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
63 import tuwien.auto.calimero.GroupAddress;
64 import tuwien.auto.calimero.IndividualAddress;
65 import tuwien.auto.calimero.KNXException;
66 import tuwien.auto.calimero.KNXFormatException;
67 import tuwien.auto.calimero.datapoint.CommandDP;
68 import tuwien.auto.calimero.datapoint.Datapoint;
71 * The {@link DeviceThingHandler} is responsible for handling commands and state updates sent to and received from the
72 * bus and updating the channels correspondingly.
74 * @author Simon Kaufmann - Initial contribution and API
75 * @author Jan N. Klug - Refactored for performance
78 public class DeviceThingHandler extends BaseThingHandler implements GroupAddressListener {
79 private static final int INITIAL_PING_DELAY = 5;
80 private final Logger logger = LoggerFactory.getLogger(DeviceThingHandler.class);
82 private final Set<GroupAddress> groupAddresses = ConcurrentHashMap.newKeySet();
83 private final ExpiringCacheMap<GroupAddress, @Nullable Boolean> groupAddressesWriteBlocked = new ExpiringCacheMap<>(
84 Duration.ofMillis(1000));
85 private final Map<GroupAddress, OutboundSpec> groupAddressesRespondingSpec = new ConcurrentHashMap<>();
86 private final Map<GroupAddress, ScheduledFuture<?>> readFutures = new ConcurrentHashMap<>();
87 private final Map<ChannelUID, ScheduledFuture<?>> channelFutures = new ConcurrentHashMap<>();
88 private final Map<ChannelUID, KNXChannel> knxChannels = new ConcurrentHashMap<>();
89 private final Random random = new Random();
90 protected @Nullable IndividualAddress address;
91 private int readInterval;
92 private @Nullable ScheduledFuture<?> descriptionJob;
93 private boolean filledDescription = false;
94 private @Nullable ScheduledFuture<?> pollingJob;
96 public DeviceThingHandler(Thing thing) {
101 public void initialize() {
103 DeviceConfig config = getConfigAs(DeviceConfig.class);
104 readInterval = config.getReadInterval();
105 // gather all GAs from channel configurations and create channels
106 getThing().getChannels().forEach(channel -> {
107 KNXChannel knxChannel = KNXChannelFactory.createKnxChannel(channel);
108 knxChannels.put(channel.getUID(), knxChannel);
109 groupAddresses.addAll(knxChannel.getAllGroupAddresses());
114 public void dispose() {
115 for (ChannelUID channelUID : channelFutures.keySet()) {
116 channelFutures.computeIfPresent(channelUID, (k, v) -> {
122 groupAddresses.clear();
123 groupAddressesWriteBlocked.clear();
124 groupAddressesRespondingSpec.clear();
130 protected void cancelReadFutures() {
131 for (GroupAddress groupAddress : readFutures.keySet()) {
132 readFutures.computeIfPresent(groupAddress, (k, v) -> {
140 public void channelLinked(ChannelUID channelUID) {
141 KNXChannel knxChannel = knxChannels.get(channelUID);
142 if (knxChannel == null) {
143 logger.warn("Channel '{}' received a channel linked event, but no KNXChannel found", channelUID);
146 if (!knxChannel.isControl()) {
147 scheduleRead(knxChannel);
151 protected void scheduleReadJobs() {
153 for (KNXChannel knxChannel : knxChannels.values()) {
154 if (isLinked(knxChannel.getChannelUID()) && !knxChannel.isControl()) {
155 scheduleRead(knxChannel);
160 private void scheduleRead(KNXChannel knxChannel) {
161 List<InboundSpec> readSpecs = knxChannel.getReadSpec();
162 for (InboundSpec readSpec : readSpecs) {
163 readSpec.getGroupAddresses().forEach(ga -> scheduleReadJob(ga, readSpec.getDPT()));
167 private void scheduleReadJob(GroupAddress groupAddress, String dpt) {
168 if (readInterval > 0) {
169 ScheduledFuture<?> future = readFutures.get(groupAddress);
170 if (future == null || future.isDone() || future.isCancelled()) {
171 future = getScheduler().scheduleWithFixedDelay(() -> readDatapoint(groupAddress, dpt), 0, readInterval,
173 readFutures.put(groupAddress, future);
176 getScheduler().submit(() -> readDatapoint(groupAddress, dpt));
180 private void readDatapoint(GroupAddress groupAddress, String dpt) {
181 if (getClient().isConnected()) {
182 if (DPTUtil.getAllowedTypes(dpt).isEmpty()) {
183 logger.warn("DPT '{}' is not supported by the KNX binding", dpt);
186 Datapoint datapoint = new CommandDP(groupAddress, getThing().getUID().toString(), 0, dpt);
187 getClient().readDatapoint(datapoint);
192 public boolean listensTo(GroupAddress destination) {
193 return groupAddresses.contains(destination);
196 /** Handling commands triggered from openHAB */
198 public void handleCommand(ChannelUID channelUID, Command command) {
199 logger.trace("Handling command '{}' for channel '{}'", command, channelUID);
200 KNXChannel knxChannel = knxChannels.get(channelUID);
201 if (knxChannel == null) {
202 logger.warn("Channel '{}' received command, but no KNXChannel found", channelUID);
205 if (command instanceof RefreshType && !knxChannel.isControl()) {
206 logger.debug("Refreshing channel '{}'", channelUID);
207 scheduleRead(knxChannel);
209 if (CHANNEL_RESET.equals(channelUID.getId())) {
210 if (address != null) {
215 OutboundSpec commandSpec = knxChannel.getCommandSpec(command);
216 // only send GroupValueWrite to KNX if GA is not blocked once
217 if (commandSpec != null) {
218 GroupAddress destination = commandSpec.getGroupAddress();
219 if (knxChannel.isControl()) {
220 // always remember, otherwise we might send an old state
221 groupAddressesRespondingSpec.put(destination, commandSpec);
223 if (groupAddressesWriteBlocked.get(destination) != null) {
224 logger.debug("Write to {} blocked for 1s/one call after read.", destination);
225 groupAddressesWriteBlocked.invalidate(destination);
227 getClient().writeToKNX(commandSpec);
231 "None of the configured GAs on channel '{}' could handle the command '{}' of type '{}'",
232 channelUID, command, command.getClass().getSimpleName());
234 } catch (KNXException e) {
235 logger.warn("An error occurred while handling command '{}' on channel '{}': {}", command,
236 channelUID, e.getMessage());
243 private void sendGroupValueResponse(ChannelUID channelUID, GroupAddress destination) {
244 KNXChannel knxChannel = knxChannels.get(channelUID);
245 if (knxChannel == null) {
248 Set<GroupAddress> rsa = knxChannel.getWriteAddresses();
249 if (!rsa.isEmpty()) {
250 logger.trace("onGroupRead size '{}'", rsa.size());
251 OutboundSpec os = groupAddressesRespondingSpec.get(destination);
253 logger.trace("onGroupRead respondToKNX '{}'",
254 os.getGroupAddress()); /* KNXIO: sending real "GroupValueResponse" to the KNX bus. */
256 getClient().respondToKNX(os);
257 } catch (KNXException e) {
258 logger.warn("An error occurred on channel {}: {}", channelUID, e.getMessage(), e);
265 * KNXIO, extended with the ability to respond on "GroupValueRead" telegrams with "GroupValueResponse" telegram
268 public void onGroupRead(AbstractKNXClient client, IndividualAddress source, GroupAddress destination, byte[] asdu) {
269 logger.trace("onGroupRead Thing '{}' received a GroupValueRead telegram from '{}' for destination '{}'",
270 getThing().getUID(), source, destination);
271 for (KNXChannel knxChannel : knxChannels.values()) {
272 if (knxChannel.isControl()) {
273 OutboundSpec responseSpec = knxChannel.getResponseSpec(destination, RefreshType.REFRESH);
274 if (responseSpec != null) {
275 logger.trace("onGroupRead isControl -> postCommand");
276 // This event should be sent to KNX as GroupValueResponse immediately.
277 sendGroupValueResponse(knxChannel.getChannelUID(), destination);
279 // block write attempts for 1s or 1 request to prevent loops
280 if (!groupAddressesWriteBlocked.containsKey(destination)) {
281 groupAddressesWriteBlocked.put(destination, () -> null);
283 groupAddressesWriteBlocked.putValue(destination, true);
285 // Send REFRESH to openHAB to get this event for scripting with postCommand
286 // and remember to ignore/block this REFRESH to be sent back to KNX as GroupValueWrite after
287 // postCommand is done!
288 postCommand(knxChannel.getChannelUID(), RefreshType.REFRESH);
295 public void onGroupReadResponse(AbstractKNXClient client, IndividualAddress source, GroupAddress destination,
297 // GroupValueResponses are treated the same as GroupValueWrite telegrams
298 logger.trace("onGroupReadResponse Thing '{}' processes a GroupValueResponse telegram for destination '{}'",
299 getThing().getUID(), destination);
300 onGroupWrite(client, source, destination, asdu);
304 * KNXIO, here value changes are set, coming from KNX OR openHAB.
307 public void onGroupWrite(AbstractKNXClient client, IndividualAddress source, GroupAddress destination,
309 logger.debug("onGroupWrite Thing '{}' received a GroupValueWrite telegram from '{}' for destination '{}'",
310 getThing().getUID(), source, destination);
312 for (KNXChannel knxChannel : knxChannels.values()) {
313 InboundSpec listenSpec = knxChannel.getListenSpec(destination);
314 if (listenSpec != null) {
316 "onGroupWrite Thing '{}' processes a GroupValueWrite telegram for destination '{}' for channel '{}'",
317 getThing().getUID(), destination, knxChannel.getChannelUID());
319 * Remember current KNXIO outboundSpec only if it is a control channel.
321 if (knxChannel.isControl()) {
322 logger.trace("onGroupWrite isControl");
323 Type value = ValueDecoder.decode(listenSpec.getDPT(), asdu, knxChannel.preferredType());
325 OutboundSpec commandSpec = knxChannel.getCommandSpec(value);
326 if (commandSpec != null) {
327 groupAddressesRespondingSpec.put(destination, commandSpec);
331 processDataReceived(destination, asdu, listenSpec, knxChannel);
336 private void processDataReceived(GroupAddress destination, byte[] asdu, InboundSpec listenSpec,
337 KNXChannel knxChannel) {
338 if (DPTUtil.getAllowedTypes(listenSpec.getDPT()).isEmpty()) {
339 logger.warn("DPT '{}' is not supported by the KNX binding.", listenSpec.getDPT());
343 Type value = ValueDecoder.decode(listenSpec.getDPT(), asdu, knxChannel.preferredType());
345 if (knxChannel.isControl()) {
346 ChannelUID channelUID = knxChannel.getChannelUID();
348 if (KNXBindingConstants.CHANNEL_DIMMER_CONTROL.equals(knxChannel.getChannelType())) {
349 // if we have a dimmer control channel, check if a frequency is defined
350 Channel channel = getThing().getChannel(channelUID);
351 if (channel == null) {
352 logger.warn("Failed to find channel for ChannelUID '{}'", channelUID);
355 frequency = ((BigDecimal) Objects.requireNonNullElse(
356 channel.getConfiguration().get(KNXBindingConstants.REPEAT_FREQUENCY), BigDecimal.ZERO))
359 // disable dimming by binding
362 if ((value instanceof UnDefType || value instanceof IncreaseDecreaseType) && frequency > 0) {
363 // continuous dimming by the binding
364 // cancel a running scheduler before adding a new (and only add if not UnDefType)
365 ScheduledFuture<?> oldFuture = channelFutures.remove(channelUID);
366 if (oldFuture != null) {
367 oldFuture.cancel(true);
369 if (value instanceof IncreaseDecreaseType type) {
370 channelFutures.put(channelUID, scheduler.scheduleWithFixedDelay(
371 () -> postCommand(channelUID, type), 0, frequency, TimeUnit.MILLISECONDS));
374 if (value instanceof Command command) {
375 logger.trace("processDataReceived postCommand new value '{}' for GA '{}'", asdu, address);
376 postCommand(channelUID, command);
380 if (value instanceof State state && !(value instanceof UnDefType)) {
381 updateState(knxChannel.getChannelUID(), state);
386 "Ignoring KNX bus data for channel '{}': couldn't transform to any Type (GA='{}', DPT='{}', data='{}')",
387 knxChannel.getChannelUID(), destination, listenSpec.getDPT(), HexUtils.bytesToHex(asdu));
391 protected final ScheduledExecutorService getScheduler() {
392 return getBridgeHandler().getScheduler();
395 protected final ScheduledExecutorService getBackgroundScheduler() {
396 return getBridgeHandler().getBackgroundScheduler();
399 protected final KNXBridgeBaseThingHandler getBridgeHandler() {
400 Bridge bridge = getBridge();
401 if (bridge != null) {
402 KNXBridgeBaseThingHandler handler = (KNXBridgeBaseThingHandler) bridge.getHandler();
403 if (handler != null) {
407 throw new IllegalStateException("The bridge must not be null and must be initialized");
410 protected final KNXClient getClient() {
411 return getBridgeHandler().getClient();
414 protected final boolean describeDevice(@Nullable IndividualAddress address) {
415 if (address == null) {
418 DeviceInspector inspector = new DeviceInspector(getClient().getDeviceInfoClient(), address);
419 DeviceInspector.Result result = inspector.readDeviceInfo();
420 if (result != null) {
421 Map<String, String> properties = editProperties();
422 properties.putAll(result.getProperties());
423 updateProperties(properties);
429 protected final void restart() {
430 if (address != null) {
431 getClient().restartNetworkDevice(address);
436 public void bridgeStatusChanged(ThingStatusInfo bridgeStatusInfo) {
437 if (bridgeStatusInfo.getStatus() == ThingStatus.ONLINE) {
439 } else if (bridgeStatusInfo.getStatus() == ThingStatus.OFFLINE) {
441 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE);
445 private void pollDeviceStatus() {
447 if (address != null && getClient().isConnected()) {
448 logger.debug("Polling individual address '{}'", address);
449 boolean isReachable = getClient().isReachable(address);
451 updateStatus(ThingStatus.ONLINE);
452 DeviceConfig config = getConfigAs(DeviceConfig.class);
453 if (!filledDescription && config.getFetch()) {
454 Future<?> descriptionJob = this.descriptionJob;
455 if (descriptionJob == null || descriptionJob.isCancelled()) {
456 long initialDelay = Math.round(config.getPingInterval() * random.nextFloat());
457 this.descriptionJob = getBackgroundScheduler().schedule(() -> {
458 filledDescription = describeDevice(address);
459 }, initialDelay, TimeUnit.SECONDS);
463 updateStatus(ThingStatus.OFFLINE);
466 } catch (KNXException e) {
467 logger.debug("An error occurred while testing the reachability of a thing '{}': {}", getThing().getUID(),
469 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
470 KNXTranslationProvider.I18N.getLocalizedException(e));
474 protected void attachToClient() {
475 if (!getClient().isConnected()) {
476 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE);
479 DeviceConfig config = getConfigAs(DeviceConfig.class);
481 if (!config.getAddress().isEmpty()) {
482 updateStatus(ThingStatus.UNKNOWN);
483 address = new IndividualAddress(config.getAddress());
485 long pingInterval = config.getPingInterval();
486 long initialPingDelay = Math.round(INITIAL_PING_DELAY * random.nextFloat());
488 ScheduledFuture<?> pollingJob = this.pollingJob;
489 if ((pollingJob == null || pollingJob.isCancelled())) {
490 logger.debug("'{}' will be polled every {}s", getThing().getUID(), pingInterval);
491 this.pollingJob = getBackgroundScheduler().scheduleWithFixedDelay(this::pollDeviceStatus,
492 initialPingDelay, pingInterval, TimeUnit.SECONDS);
495 updateStatus(ThingStatus.ONLINE);
497 } catch (KNXFormatException e) {
498 logger.debug("An exception occurred while setting the individual address '{}': {}", config.getAddress(),
500 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
501 KNXTranslationProvider.I18N.getLocalizedException(e));
503 getClient().registerGroupAddressListener(this);
507 protected void detachFromClient() {
508 final var pollingJobSynced = pollingJob;
509 if (pollingJobSynced != null) {
510 pollingJobSynced.cancel(true);
513 final var descriptionJobSynced = descriptionJob;
514 if (descriptionJobSynced != null) {
515 descriptionJobSynced.cancel(true);
516 descriptionJob = null;
519 Bridge bridge = getBridge();
520 if (bridge != null) {
521 KNXBridgeBaseThingHandler handler = (KNXBridgeBaseThingHandler) bridge.getHandler();
522 if (handler != null) {
523 handler.getClient().unregisterGroupAddressListener(this);