2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.knx.internal.client;
15 import static org.openhab.binding.knx.internal.dpt.DPTUtil.NORMALIZED_DPT;
17 import java.time.Duration;
19 import java.util.concurrent.CancellationException;
20 import java.util.concurrent.CopyOnWriteArraySet;
21 import java.util.concurrent.LinkedBlockingQueue;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.TimeUnit;
25 import java.util.function.Consumer;
27 import org.eclipse.jdt.annotation.NonNullByDefault;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.openhab.binding.knx.internal.dpt.ValueEncoder;
30 import org.openhab.binding.knx.internal.handler.GroupAddressListener;
31 import org.openhab.binding.knx.internal.handler.KNXBridgeBaseThingHandler;
32 import org.openhab.binding.knx.internal.handler.KNXBridgeBaseThingHandler.CommandExtensionData;
33 import org.openhab.binding.knx.internal.i18n.KNXTranslationProvider;
34 import org.openhab.core.thing.ThingStatus;
35 import org.openhab.core.thing.ThingStatusDetail;
36 import org.openhab.core.thing.ThingUID;
37 import org.openhab.core.types.Type;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
41 import tuwien.auto.calimero.CloseEvent;
42 import tuwien.auto.calimero.DetachEvent;
43 import tuwien.auto.calimero.FrameEvent;
44 import tuwien.auto.calimero.GroupAddress;
45 import tuwien.auto.calimero.IndividualAddress;
46 import tuwien.auto.calimero.KNXException;
47 import tuwien.auto.calimero.KNXIllegalArgumentException;
48 import tuwien.auto.calimero.datapoint.CommandDP;
49 import tuwien.auto.calimero.datapoint.Datapoint;
50 import tuwien.auto.calimero.device.ProcessCommunicationResponder;
51 import tuwien.auto.calimero.link.KNXNetworkLink;
52 import tuwien.auto.calimero.link.NetworkLinkListener;
53 import tuwien.auto.calimero.mgmt.Destination;
54 import tuwien.auto.calimero.mgmt.ManagementClient;
55 import tuwien.auto.calimero.mgmt.ManagementProcedures;
56 import tuwien.auto.calimero.mgmt.TransportLayerImpl;
57 import tuwien.auto.calimero.process.ProcessCommunication;
58 import tuwien.auto.calimero.process.ProcessCommunicator;
59 import tuwien.auto.calimero.process.ProcessCommunicatorImpl;
60 import tuwien.auto.calimero.process.ProcessEvent;
61 import tuwien.auto.calimero.process.ProcessListener;
62 import tuwien.auto.calimero.secure.KnxSecureException;
63 import tuwien.auto.calimero.secure.Security;
66 * KNX Client which encapsulates the communication with the KNX bus via the calimero library.
68 * @author Simon Kaufmann - initial contribution and API.
72 public abstract class AbstractKNXClient implements NetworkLinkListener, KNXClient {
73 public enum ClientState {
80 private ClientState state = ClientState.INIT;
82 private static final int MAX_SEND_ATTEMPTS = 2;
84 private final Logger logger = LoggerFactory.getLogger(AbstractKNXClient.class);
86 private final ThingUID thingUID;
87 private final int responseTimeout;
88 private final int readingPause;
89 private final int autoReconnectPeriod;
90 private final int readRetriesLimit;
91 private final StatusUpdateCallback statusUpdateCallback;
92 private final ScheduledExecutorService knxScheduler;
93 private final CommandExtensionData commandExtensionData;
94 protected final Security openhabSecurity;
96 private @Nullable ProcessCommunicator processCommunicator;
97 private @Nullable ProcessCommunicationResponder responseCommunicator;
98 private @Nullable ManagementProcedures managementProcedures;
99 private @Nullable ManagementClient managementClient;
100 private @Nullable KNXNetworkLink link;
101 private @Nullable DeviceInfoClient deviceInfoClient;
102 private @Nullable ScheduledFuture<?> busJob;
103 private @Nullable ScheduledFuture<?> connectJob;
105 private final Set<GroupAddressListener> groupAddressListeners = new CopyOnWriteArraySet<>();
106 private final LinkedBlockingQueue<ReadDatapoint> readDatapoints = new LinkedBlockingQueue<>();
109 private interface ListenerNotification {
110 void apply(BusMessageListener listener, IndividualAddress source, GroupAddress destination, byte[] asdu);
113 @NonNullByDefault({})
114 private final ProcessListener processListener = new ProcessListener() {
117 public void detached(DetachEvent e) {
118 logger.debug("The KNX network link was detached from the process communicator");
122 public void groupWrite(ProcessEvent e) {
123 processEvent("Group Write", e, (listener, source, destination, asdu) -> listener
124 .onGroupWrite(AbstractKNXClient.this, source, destination, asdu));
128 public void groupReadRequest(ProcessEvent e) {
129 processEvent("Group Read Request", e, (listener, source, destination, asdu) -> listener
130 .onGroupRead(AbstractKNXClient.this, source, destination, asdu));
134 public void groupReadResponse(ProcessEvent e) {
135 processEvent("Group Read Response", e, (listener, source, destination, asdu) -> listener
136 .onGroupReadResponse(AbstractKNXClient.this, source, destination, asdu));
140 public AbstractKNXClient(int autoReconnectPeriod, ThingUID thingUID, int responseTimeout, int readingPause,
141 int readRetriesLimit, ScheduledExecutorService knxScheduler, CommandExtensionData commandExtensionData,
142 Security openhabSecurity, StatusUpdateCallback statusUpdateCallback) {
143 this.autoReconnectPeriod = autoReconnectPeriod;
144 this.thingUID = thingUID;
145 this.responseTimeout = responseTimeout;
146 this.readingPause = readingPause;
147 this.readRetriesLimit = readRetriesLimit;
148 this.knxScheduler = knxScheduler;
149 this.statusUpdateCallback = statusUpdateCallback;
150 this.commandExtensionData = commandExtensionData;
151 this.openhabSecurity = openhabSecurity;
154 public void initialize() {
158 private void scheduleReconnectJob() {
159 if (autoReconnectPeriod > 0) {
160 // schedule connect job, for the first connection ignore autoReconnectPeriod and use 1 sec
161 final long reconnectDelayS = (state == ClientState.INIT) ? 1 : autoReconnectPeriod;
162 final String prefix = (state == ClientState.INIT) ? "re" : "";
163 logger.debug("Bridge {} scheduling {}connect in {}s", thingUID, prefix, reconnectDelayS);
164 connectJob = knxScheduler.schedule(this::connect, reconnectDelayS, TimeUnit.SECONDS);
168 private void cancelReconnectJob() {
169 final ScheduledFuture<?> currentReconnectJob = connectJob;
170 if (currentReconnectJob != null) {
171 currentReconnectJob.cancel(true);
176 protected abstract KNXNetworkLink establishConnection() throws KNXException, InterruptedException;
178 private synchronized boolean connectIfNotAutomatic() {
179 if (!isConnected()) {
180 return connectJob == null && connect();
185 private synchronized boolean connect() {
186 if (state == ClientState.INIT) {
187 state = ClientState.RUNNING;
188 } else if (state == ClientState.DISPOSE) {
189 logger.trace("connect() ignored, closing down");
197 // We have a valid "connection" object, this is ensured by IPClient.java.
198 // "releaseConnection" is actually removing all registered users of this connection and stopping
200 // Note that this will also kill this function in the following call to sleep in case of a
201 // connection loss -> restart is via triggered via scheduledReconnect in handler for InterruptedException.
204 logger.debug("Bridge {} is connecting to KNX bus", thingUID);
206 // now establish (possibly encrypted) connection, according to settings (tunnel, routing, secure...)
207 KNXNetworkLink link = establishConnection();
210 // one transport layer implementation, to be shared by all following classes
211 TransportLayerImpl tl = new TransportLayerImpl(link);
213 // new SecureManagement / SecureApplicationLayer, based on the keyring (if any)
214 // SecureManagement does not offer a public ctor which can use a given TL.
215 // Protected ctor using given TransportLayerImpl is available (custom class to be inherited)
216 // which also copies the relevant content of the supplied SAL to a new SAL instance created
217 // by SecureManagement ctor.
218 CustomSecureManagement sal = new CustomSecureManagement(tl, openhabSecurity);
220 logger.debug("GAs: {} Send: {}, S={}", sal.security().groupKeys().size(),
221 sal.security().groupSenders().size(),
222 KNXBridgeBaseThingHandler.secHelperGetSecureGroupAddresses(sal.security()));
224 // ManagementClient provided by Calimero: allow reading device info, etc.
225 // Note for KNX Secure: ManagementClientImpl does not provide a ctor with external SAL in Calimero 2.5.
226 // Protected ctor using given ManagementClientImpl is available in >2.5 (custom class to be inherited)
227 ManagementClient managementClient = new CustomManagementClientImpl(link, sal);
228 managementClient.responseTimeout(Duration.ofSeconds(responseTimeout));
229 this.managementClient = managementClient;
231 // ManagementProcedures provided by Calimero: allow managing other KNX devices, e.g. check if an address is
233 // Note for KNX Secure: ManagementProceduresImpl currently does not provide a public ctor with external SAL.
234 // Protected ctor using given ManagementClientImpl is available (custom class to be inherited)
235 managementProcedures = new CustomManagementProceduresImpl(managementClient, tl);
237 // OH helper for reading device info, based on managementClient above
238 deviceInfoClient = new DeviceInfoClientImpl(managementClient);
240 // ProcessCommunicator provides main KNX communication (Calimero).
241 final boolean useGoDiagnostics = true;
242 ProcessCommunicator processCommunicator = new ProcessCommunicatorImpl(link, sal, useGoDiagnostics);
243 processCommunicator.responseTimeout(Duration.ofSeconds(responseTimeout));
244 processCommunicator.addProcessListener(processListener);
245 this.processCommunicator = processCommunicator;
247 // ProcessCommunicationResponder provides responses to requests from KNX bus (Calimero).
248 ProcessCommunicationResponder responseCommunicator = new ProcessCommunicationResponder(link, sal);
249 this.responseCommunicator = responseCommunicator;
251 // register this class, callbacks will be triggered
252 link.addLinkListener(this);
254 // create a job carrying out read requests
255 busJob = knxScheduler.scheduleWithFixedDelay(this::readNextQueuedDatapoint, 0, readingPause,
256 TimeUnit.MILLISECONDS);
258 statusUpdateCallback.updateStatus(ThingStatus.ONLINE);
261 logger.info("Bridge {} connected to KNX bus", thingUID);
263 state = ClientState.RUNNING;
265 } catch (InterruptedException e) {
266 ClientState lastState = state;
267 state = ClientState.INTERRUPTED;
269 logger.trace("Bridge {}, connection interrupted", thingUID);
272 if (lastState != ClientState.DISPOSE) {
273 scheduleReconnectJob();
277 } catch (KNXException | KnxSecureException e) {
278 logger.debug("Bridge {} cannot connect: {}", thingUID, e.getMessage());
280 scheduleReconnectJob();
282 } catch (KNXIllegalArgumentException e) {
283 logger.debug("Bridge {} cannot connect: {}", thingUID, e.getMessage());
284 disconnect(e, ThingStatusDetail.CONFIGURATION_ERROR);
289 private synchronized void disconnect(@Nullable Exception e) {
293 private synchronized void disconnect(@Nullable Exception e, @Nullable ThingStatusDetail detail) {
296 statusUpdateCallback.updateStatus(ThingStatus.OFFLINE,
297 detail != null ? detail : ThingStatusDetail.COMMUNICATION_ERROR,
298 KNXTranslationProvider.I18N.getLocalizedException(e));
300 statusUpdateCallback.updateStatus(ThingStatus.OFFLINE);
304 protected void releaseConnection() {
305 logger.debug("Bridge {} is disconnecting from KNX bus", thingUID);
307 if (tmpLink != null) {
308 tmpLink.removeLinkListener(this);
310 readDatapoints.clear();
311 busJob = nullify(busJob, j -> j.cancel(true));
312 deviceInfoClient = null;
313 managementProcedures = nullify(managementProcedures, ManagementProcedures::detach);
314 managementClient = nullify(managementClient, ManagementClient::detach);
315 processCommunicator = nullify(processCommunicator, pc -> {
316 pc.removeProcessListener(processListener);
319 responseCommunicator = nullify(responseCommunicator, rc -> {
320 rc.removeProcessListener(processListener);
323 link = nullify(link, KNXNetworkLink::close);
324 logger.trace("Bridge {} disconnected from KNX bus", thingUID);
327 private <T> @Nullable T nullify(@Nullable T target, @Nullable Consumer<T> lastWill) {
328 if (target != null && lastWill != null) {
329 lastWill.accept(target);
334 private void processEvent(String task, ProcessEvent event, ListenerNotification action) {
335 GroupAddress destination = event.getDestination();
336 IndividualAddress source = event.getSourceAddr();
337 byte[] asdu = event.getASDU();
338 logger.trace("Received a {} telegram from '{}' to '{}' with value '{}'", task, source, destination, asdu);
339 boolean isHandled = false;
340 for (GroupAddressListener listener : groupAddressListeners) {
341 if (listener.listensTo(destination)) {
343 knxScheduler.schedule(() -> action.apply(listener, source, destination, asdu), 0, TimeUnit.SECONDS);
346 // Store information about unhandled GAs, can be shown on console using knx:list-unknown-ga.
347 // The idea is to store GA, message type, and size as key. The value counts the number of packets.
349 logger.trace("Address '{}' is not configured in openHAB", destination);
350 final String type = switch (event.getServiceCode()) {
351 case 0x80 -> " GROUP_WRITE(";
352 case 0x40 -> " GROUP_RESPONSE(";
353 case 0x00 -> " GROUP_READ(";
356 final String key = destination.toString() + type + event.getASDU().length + ")";
357 commandExtensionData.unknownGA().compute(key, (k, v) -> v == null ? 1 : v + 1);
361 // datapoint is null at end of the list, warning is misleading
362 @SuppressWarnings("null")
363 private void readNextQueuedDatapoint() {
364 if (!connectIfNotAutomatic()) {
367 ProcessCommunicator processCommunicator = this.processCommunicator;
368 if (processCommunicator == null) {
371 ReadDatapoint datapoint = readDatapoints.poll();
372 if (datapoint != null) {
373 // TODO #8872: allow write access, currently only listening mode
374 if (openhabSecurity.groupKeys().containsKey(datapoint.getDatapoint().getMainAddress())) {
375 logger.debug("outgoing secure communication not implemented, explicit read from GA '{}' skipped",
376 datapoint.getDatapoint().getMainAddress());
380 datapoint.incrementRetries();
382 logger.trace("Sending a Group Read Request telegram for {}", datapoint.getDatapoint().getMainAddress());
383 processCommunicator.read(datapoint.getDatapoint());
384 } catch (KNXException e) {
385 // Note: KnxException does not cover KnxRuntimeException and subclasses KnxSecureException,
386 // KnxIllegalArgumentException
387 if (datapoint.getRetries() < datapoint.getLimit()) {
388 readDatapoints.add(datapoint);
389 logger.debug("Could not read value for datapoint {}: {}. Going to retry.",
390 datapoint.getDatapoint().getMainAddress(), e.getMessage());
392 logger.warn("Giving up reading datapoint {}, the number of maximum retries ({}) is reached.",
393 datapoint.getDatapoint().getMainAddress(), datapoint.getLimit());
395 } catch (InterruptedException | CancellationException e) {
396 logger.debug("Interrupted sending KNX read request");
397 } catch (Exception e) {
398 // Any other exception: Fail gracefully, i.e. notify user and continue reading next DP.
399 // Not catching this would end the scheduled read for all DPs in case of an error.
400 // Severity is warning as this is likely caused by a configuration error.
401 logger.warn("Error reading datapoint {}: {}", datapoint.getDatapoint().getMainAddress(),
407 public void dispose() {
408 state = ClientState.DISPOSE;
410 cancelReconnectJob();
415 public void linkClosed(@Nullable CloseEvent closeEvent) {
416 KNXNetworkLink link = this.link;
417 if (link == null || closeEvent == null) {
420 if (!link.isOpen() && CloseEvent.USER_REQUEST != closeEvent.getInitiator()) {
421 final String reason = closeEvent.getReason();
422 statusUpdateCallback.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
423 KNXTranslationProvider.I18N.get(reason));
424 logger.debug("KNX link has been lost (reason: {} on object {})", closeEvent.getReason(),
425 closeEvent.getSource().toString());
426 scheduleReconnectJob();
431 public void indication(@Nullable FrameEvent e) {
436 public void confirmation(@Nullable FrameEvent e) {
441 public final synchronized boolean isReachable(@Nullable IndividualAddress address) throws KNXException {
442 ManagementProcedures managementProcedures = this.managementProcedures;
443 if (managementProcedures == null || address == null) {
447 return managementProcedures.isAddressOccupied(address);
448 } catch (InterruptedException e) {
449 logger.debug("Interrupted pinging KNX device '{}'", address);
455 public final synchronized void restartNetworkDevice(@Nullable IndividualAddress address) {
456 ManagementClient managementClient = this.managementClient;
457 if (address == null || managementClient == null) {
460 Destination destination = null;
462 destination = managementClient.createDestination(address, true);
463 managementClient.restart(destination);
464 } catch (KNXException e) {
465 logger.warn("Could not reset device with address '{}': {}", address, e.getMessage());
466 } catch (InterruptedException e) { // ignored as in Calimero pre-2.4.0
468 if (destination != null) {
469 destination.destroy();
475 public void readDatapoint(Datapoint datapoint) {
476 synchronized (this) {
477 ReadDatapoint retryDatapoint = new ReadDatapoint(datapoint, readRetriesLimit);
478 if (!readDatapoints.contains(retryDatapoint)) {
479 readDatapoints.add(retryDatapoint);
485 public final void registerGroupAddressListener(GroupAddressListener listener) {
486 groupAddressListeners.add(listener);
490 public final void unregisterGroupAddressListener(GroupAddressListener listener) {
491 groupAddressListeners.remove(listener);
495 public boolean isConnected() {
496 KNXNetworkLink tmpLink = link;
497 return tmpLink != null && tmpLink.isOpen();
501 public DeviceInfoClient getDeviceInfoClient() {
502 DeviceInfoClient deviceInfoClient = this.deviceInfoClient;
503 if (deviceInfoClient != null) {
504 return deviceInfoClient;
506 throw new IllegalStateException();
511 public void writeToKNX(OutboundSpec commandSpec) throws KNXException {
512 ProcessCommunicator processCommunicator = this.processCommunicator;
513 KNXNetworkLink link = this.link;
514 if (processCommunicator == null || link == null) {
515 logger.debug("Cannot write to KNX bus (processCommunicator: {}, link: {})",
516 processCommunicator == null ? "Not OK" : "OK",
517 link == null ? "Not OK" : (link.isOpen() ? "Open" : "Closed"));
520 GroupAddress groupAddress = commandSpec.getGroupAddress();
522 logger.trace("writeToKNX groupAddress '{}', commandSpec '{}:{} {}'", groupAddress, groupAddress,
523 commandSpec.getDPT(), commandSpec.getValue());
525 sendToKNX(processCommunicator, groupAddress, commandSpec.getDPT(), commandSpec.getValue());
529 public void respondToKNX(OutboundSpec responseSpec) throws KNXException {
530 ProcessCommunicationResponder responseCommunicator = this.responseCommunicator;
531 KNXNetworkLink link = this.link;
532 if (responseCommunicator == null || link == null) {
533 logger.debug("Cannot write to KNX bus (responseCommunicator: {}, link: {})",
534 responseCommunicator == null ? "Not OK" : "OK",
535 link == null ? "Not OK" : (link.isOpen() ? "Open" : "Closed"));
538 GroupAddress groupAddress = responseSpec.getGroupAddress();
540 logger.trace("respondToKNX groupAddress '{}', responseSpec '{}'", groupAddress, responseSpec);
542 sendToKNX(responseCommunicator, groupAddress, responseSpec.getDPT(), responseSpec.getValue());
545 private void sendToKNX(ProcessCommunication communicator, GroupAddress groupAddress, String dpt, Type type)
546 throws KNXException {
547 if (!connectIfNotAutomatic()) {
551 // TODO #8872: allow write access, currently only listening mode
552 if (openhabSecurity.groupKeys().containsKey(groupAddress)) {
553 logger.debug("outgoing secure communication not implemented, write to GA '{}' skipped", groupAddress);
557 Datapoint datapoint = new CommandDP(groupAddress, thingUID.toString(), 0,
558 NORMALIZED_DPT.getOrDefault(dpt, dpt));
559 String mappedValue = ValueEncoder.encode(type, dpt);
560 if (mappedValue == null) {
561 logger.debug("Value '{}' of type '{}' cannot be mapped to datapoint '{}'", type, type.getClass(),
565 logger.trace("sendToKNX mappedValue: '{}' groupAddress: '{}'", mappedValue, groupAddress);
567 for (int i = 0;; i++) {
569 communicator.write(datapoint, mappedValue);
570 logger.debug("Wrote value '{}' to datapoint '{}' ({}. attempt).", type, datapoint, i);
572 } catch (KNXException e) {
573 if (i < MAX_SEND_ATTEMPTS - 1) {
574 logger.debug("Value '{}' could not be sent to KNX bus using datapoint '{}': {}. Will retry.", type,
575 datapoint, e.getLocalizedMessage());
577 logger.warn("Value '{}' could not be sent to KNX bus using datapoint '{}': {}. Giving up now.",
578 type, datapoint, e.getLocalizedMessage());