2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.knx.internal.client;
15 import java.time.Duration;
16 import java.util.Optional;
18 import java.util.concurrent.CancellationException;
19 import java.util.concurrent.CopyOnWriteArraySet;
20 import java.util.concurrent.LinkedBlockingQueue;
21 import java.util.concurrent.ScheduledExecutorService;
22 import java.util.concurrent.ScheduledFuture;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.Consumer;
26 import org.eclipse.jdt.annotation.NonNullByDefault;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.openhab.binding.knx.internal.KNXTypeMapper;
29 import org.openhab.binding.knx.internal.dpt.KNXCoreTypeMapper;
30 import org.openhab.binding.knx.internal.handler.GroupAddressListener;
31 import org.openhab.binding.knx.internal.i18n.KNXTranslationProvider;
32 import org.openhab.core.thing.ThingStatus;
33 import org.openhab.core.thing.ThingStatusDetail;
34 import org.openhab.core.thing.ThingUID;
35 import org.openhab.core.types.Type;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
39 import tuwien.auto.calimero.CloseEvent;
40 import tuwien.auto.calimero.DetachEvent;
41 import tuwien.auto.calimero.FrameEvent;
42 import tuwien.auto.calimero.GroupAddress;
43 import tuwien.auto.calimero.IndividualAddress;
44 import tuwien.auto.calimero.KNXException;
45 import tuwien.auto.calimero.KNXIllegalArgumentException;
46 import tuwien.auto.calimero.datapoint.CommandDP;
47 import tuwien.auto.calimero.datapoint.Datapoint;
48 import tuwien.auto.calimero.device.ProcessCommunicationResponder;
49 import tuwien.auto.calimero.link.KNXNetworkLink;
50 import tuwien.auto.calimero.link.NetworkLinkListener;
51 import tuwien.auto.calimero.mgmt.Destination;
52 import tuwien.auto.calimero.mgmt.ManagementClient;
53 import tuwien.auto.calimero.mgmt.ManagementClientImpl;
54 import tuwien.auto.calimero.mgmt.ManagementProcedures;
55 import tuwien.auto.calimero.mgmt.ManagementProceduresImpl;
56 import tuwien.auto.calimero.process.ProcessCommunication;
57 import tuwien.auto.calimero.process.ProcessCommunicator;
58 import tuwien.auto.calimero.process.ProcessCommunicatorImpl;
59 import tuwien.auto.calimero.process.ProcessEvent;
60 import tuwien.auto.calimero.process.ProcessListener;
61 import tuwien.auto.calimero.secure.KnxSecureException;
62 import tuwien.auto.calimero.secure.SecureApplicationLayer;
63 import tuwien.auto.calimero.secure.Security;
66 * KNX Client which encapsulates the communication with the KNX bus via the calimero libary.
68 * @author Simon Kaufmann - initial contribution and API.
72 public abstract class AbstractKNXClient implements NetworkLinkListener, KNXClient {
73 public enum ClientState {
80 private ClientState state = ClientState.INIT;
82 private static final int MAX_SEND_ATTEMPTS = 2;
84 private final Logger logger = LoggerFactory.getLogger(AbstractKNXClient.class);
85 private final KNXTypeMapper typeHelper = new KNXCoreTypeMapper();
87 private final ThingUID thingUID;
88 private final int responseTimeout;
89 private final int readingPause;
90 private final int autoReconnectPeriod;
91 private final int readRetriesLimit;
92 private final StatusUpdateCallback statusUpdateCallback;
93 private final ScheduledExecutorService knxScheduler;
95 private @Nullable ProcessCommunicator processCommunicator;
96 private @Nullable ProcessCommunicationResponder responseCommunicator;
97 private @Nullable ManagementProcedures managementProcedures;
98 private @Nullable ManagementClient managementClient;
99 private @Nullable KNXNetworkLink link;
100 private @Nullable DeviceInfoClient deviceInfoClient;
101 private @Nullable ScheduledFuture<?> busJob;
102 private @Nullable ScheduledFuture<?> connectJob;
104 private final Set<GroupAddressListener> groupAddressListeners = new CopyOnWriteArraySet<>();
105 private final LinkedBlockingQueue<ReadDatapoint> readDatapoints = new LinkedBlockingQueue<>();
108 private interface ListenerNotification {
109 void apply(BusMessageListener listener, IndividualAddress source, GroupAddress destination, byte[] asdu);
112 @NonNullByDefault({})
113 private final ProcessListener processListener = new ProcessListener() {
116 public void detached(DetachEvent e) {
117 logger.debug("The KNX network link was detached from the process communicator");
121 public void groupWrite(ProcessEvent e) {
122 processEvent("Group Write", e, (listener, source, destination, asdu) -> {
123 listener.onGroupWrite(AbstractKNXClient.this, source, destination, asdu);
128 public void groupReadRequest(ProcessEvent e) {
129 processEvent("Group Read Request", e, (listener, source, destination, asdu) -> {
130 listener.onGroupRead(AbstractKNXClient.this, source, destination, asdu);
135 public void groupReadResponse(ProcessEvent e) {
136 processEvent("Group Read Response", e, (listener, source, destination, asdu) -> {
137 listener.onGroupReadResponse(AbstractKNXClient.this, source, destination, asdu);
142 public AbstractKNXClient(int autoReconnectPeriod, ThingUID thingUID, int responseTimeout, int readingPause,
143 int readRetriesLimit, ScheduledExecutorService knxScheduler, StatusUpdateCallback statusUpdateCallback) {
144 this.autoReconnectPeriod = autoReconnectPeriod;
145 this.thingUID = thingUID;
146 this.responseTimeout = responseTimeout;
147 this.readingPause = readingPause;
148 this.readRetriesLimit = readRetriesLimit;
149 this.knxScheduler = knxScheduler;
150 this.statusUpdateCallback = statusUpdateCallback;
153 public void initialize() {
154 if (!scheduleReconnectJob()) {
159 private boolean scheduleReconnectJob() {
160 if (autoReconnectPeriod > 0) {
161 // schedule connect job, for the first connection ignore autoReconnectPeriod and use 1 sec
162 final long reconnectDelayS = (state == ClientState.INIT) ? 1 : autoReconnectPeriod;
163 final String prefix = (state == ClientState.INIT) ? "re" : "";
164 logger.debug("Bridge {} scheduling {}connect in {}s", thingUID, prefix, reconnectDelayS);
165 connectJob = knxScheduler.schedule(this::connect, reconnectDelayS, TimeUnit.SECONDS);
172 private void cancelReconnectJob() {
173 final ScheduledFuture<?> currentReconnectJob = connectJob;
174 if (currentReconnectJob != null) {
175 currentReconnectJob.cancel(true);
180 protected abstract KNXNetworkLink establishConnection() throws KNXException, InterruptedException;
182 private synchronized boolean connectIfNotAutomatic() {
183 if (!isConnected()) {
184 return connectJob != null ? false : connect();
189 private synchronized boolean connect() {
190 if (state == ClientState.INIT) {
191 state = ClientState.RUNNING;
192 } else if (state == ClientState.DISPOSE) {
193 logger.trace("connect() ignored, closing down");
201 // We have a valid "connection" object, this is ensured by IPClient.java.
202 // "releaseConnection" is actually removing all registered users of this connection and stopping
204 // Note that this will also kill this function in the following call to sleep in case of a
205 // connection loss -> restart is via triggered via scheduledReconnect in handler for InterruptedException.
208 logger.debug("Bridge {} is connecting to KNX bus", thingUID);
210 // now establish (possibly encrypted) connection, according to settings (tunnel, routing, secure...)
211 KNXNetworkLink link = establishConnection();
214 // ManagementProcedures provided by Calimero: allow managing other KNX devices, e.g. check if an address is
216 // Note for KNX Secure: ManagmentProcedueresImpl currently does not provide a ctor with external SAL,
217 // it internally creates an instance of ManagementClientImpl, which uses
218 // Security.defaultInstallation().deviceToolKeys()
219 // Protected ctor using given ManagementClientImpl is avalable (custom class to be inherited)
220 managementProcedures = new ManagementProceduresImpl(link);
222 // ManagementClient provided by Calimero: allow reading device info, etc.
223 // Note for KNX Secure: ManagementClientImpl does not provide a ctor with external SAL in Calimero 2.5,
224 // is uses global Security.defaultInstalltion().deviceToolKeys()
225 // Current main branch includes a protected ctor (custom class to be inherited)
226 // TODO Calimero>2.5: check if there is a new way to provide security info, there is a new protected ctor
227 // TODO check if we can avoid creating another ManagementClient and re-use this from ManagemntProcedures
228 ManagementClient managementClient = new ManagementClientImpl(link);
229 managementClient.responseTimeout(Duration.ofSeconds(responseTimeout));
230 this.managementClient = managementClient;
232 // OH helper for reading device info, based on managementClient above
233 deviceInfoClient = new DeviceInfoClientImpl(managementClient);
235 // ProcessCommunicator provides main KNX communication (Calimero).
236 // Note for KNX Secure: SAL to be provided
237 ProcessCommunicator processCommunicator = new ProcessCommunicatorImpl(link);
238 processCommunicator.responseTimeout(Duration.ofSeconds(responseTimeout));
239 processCommunicator.addProcessListener(processListener);
240 this.processCommunicator = processCommunicator;
242 // ProcessCommunicationResponder provides responses to requests from KNX bus (Calimero).
243 // Note for KNX Secure: SAL to be provided
244 ProcessCommunicationResponder responseCommunicator = new ProcessCommunicationResponder(link,
245 new SecureApplicationLayer(link, Security.defaultInstallation()));
246 this.responseCommunicator = responseCommunicator;
248 // register this class, callbacks will be triggered
249 link.addLinkListener(this);
251 // create a job carrying out read requests
252 busJob = knxScheduler.scheduleWithFixedDelay(() -> readNextQueuedDatapoint(), 0, readingPause,
253 TimeUnit.MILLISECONDS);
255 statusUpdateCallback.updateStatus(ThingStatus.ONLINE);
258 logger.info("Bridge {} connected to KNX bus", thingUID);
260 state = ClientState.RUNNING;
262 } catch (InterruptedException e) {
263 final var lastState = state;
264 state = ClientState.INTERRUPTED;
266 logger.trace("Bridge {}, connection interrupted", thingUID);
269 if (lastState != ClientState.DISPOSE) {
270 scheduleReconnectJob();
274 } catch (KNXException | KnxSecureException e) {
275 logger.debug("Bridge {} cannot connect: {}", thingUID, e.getMessage());
277 scheduleReconnectJob();
279 } catch (KNXIllegalArgumentException e) {
280 logger.debug("Bridge {} cannot connect: {}", thingUID, e.getMessage());
281 disconnect(e, Optional.of(ThingStatusDetail.CONFIGURATION_ERROR));
286 private void disconnect(@Nullable Exception e) {
287 disconnect(e, Optional.empty());
290 private synchronized void disconnect(@Nullable Exception e, Optional<ThingStatusDetail> detail) {
293 statusUpdateCallback.updateStatus(ThingStatus.OFFLINE, detail.orElse(ThingStatusDetail.COMMUNICATION_ERROR),
294 KNXTranslationProvider.I18N.getLocalizedException(e));
296 statusUpdateCallback.updateStatus(ThingStatus.OFFLINE);
300 @SuppressWarnings("null")
301 protected void releaseConnection() {
302 logger.debug("Bridge {} is disconnecting from KNX bus", thingUID);
304 if (tmplink != null) {
305 link.removeLinkListener(this);
307 busJob = nullify(busJob, j -> j.cancel(true));
308 readDatapoints.clear();
309 responseCommunicator = nullify(responseCommunicator, rc -> {
310 rc.removeProcessListener(processListener);
313 processCommunicator = nullify(processCommunicator, pc -> {
314 pc.removeProcessListener(processListener);
317 deviceInfoClient = null;
318 managementClient = nullify(managementClient, mc -> mc.detach());
319 managementProcedures = nullify(managementProcedures, mp -> mp.detach());
320 link = nullify(link, l -> l.close());
321 logger.trace("Bridge {} disconnected from KNX bus", thingUID);
324 private <T> @Nullable T nullify(T target, @Nullable Consumer<T> lastWill) {
325 if (target != null && lastWill != null) {
326 lastWill.accept(target);
331 private void processEvent(String task, ProcessEvent event, ListenerNotification action) {
332 GroupAddress destination = event.getDestination();
333 IndividualAddress source = event.getSourceAddr();
334 byte[] asdu = event.getASDU();
335 logger.trace("Received a {} telegram from '{}' to '{}' with value '{}'", task, source, destination, asdu);
336 for (GroupAddressListener listener : groupAddressListeners) {
337 if (listener.listensTo(destination)) {
338 knxScheduler.schedule(() -> action.apply(listener, source, destination, asdu), 0, TimeUnit.SECONDS);
344 * Transforms a {@link Type} into a datapoint type value for the KNX bus.
346 * @param type the {@link Type} to transform
347 * @param dpt the datapoint type to which should be converted
348 * @return the corresponding KNX datapoint type value as a string
351 private String toDPTValue(Type type, String dpt) {
352 return typeHelper.toDPTValue(type, dpt);
355 // datapoint is null at end of the list, warning is misleading
356 @SuppressWarnings("null")
357 private void readNextQueuedDatapoint() {
358 if (!connectIfNotAutomatic()) {
361 ProcessCommunicator processCommunicator = this.processCommunicator;
362 if (processCommunicator == null) {
365 ReadDatapoint datapoint = readDatapoints.poll();
366 if (datapoint != null) {
367 datapoint.incrementRetries();
369 logger.trace("Sending a Group Read Request telegram for {}", datapoint.getDatapoint().getMainAddress());
370 processCommunicator.read(datapoint.getDatapoint());
371 } catch (KNXException e) {
372 // Note: KnxException does not cover KnxRuntimeException and subclasses KnxSecureException,
373 // KnxIllegArgumentException
374 if (datapoint.getRetries() < datapoint.getLimit()) {
375 readDatapoints.add(datapoint);
376 logger.debug("Could not read value for datapoint {}: {}. Going to retry.",
377 datapoint.getDatapoint().getMainAddress(), e.getMessage());
379 logger.warn("Giving up reading datapoint {}, the number of maximum retries ({}) is reached.",
380 datapoint.getDatapoint().getMainAddress(), datapoint.getLimit());
382 } catch (InterruptedException | CancellationException e) {
383 logger.debug("Interrupted sending KNX read request");
385 } catch (Exception e) {
386 // Any other exception: Fail gracefully, i.e. notify user and continue reading next DP.
387 // Not catching this would end the scheduled read for all DPs in case of an error.
388 // Severity is warning as this is likely caused by a configuration error.
389 logger.warn("Error reading datapoint {}: {}", datapoint.getDatapoint().getMainAddress(),
395 public void dispose() {
396 state = ClientState.DISPOSE;
398 cancelReconnectJob();
403 public void linkClosed(@Nullable CloseEvent closeEvent) {
404 KNXNetworkLink link = this.link;
405 if (link == null || closeEvent == null) {
408 if (!link.isOpen() && CloseEvent.USER_REQUEST != closeEvent.getInitiator()) {
409 final String reason = closeEvent.getReason();
410 statusUpdateCallback.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
411 KNXTranslationProvider.I18N.get(reason));
412 logger.debug("KNX link has been lost (reason: {} on object {})", closeEvent.getReason(),
413 closeEvent.getSource().toString());
414 scheduleReconnectJob();
419 public void indication(@Nullable FrameEvent e) {
424 public void confirmation(@Nullable FrameEvent e) {
429 public final synchronized boolean isReachable(@Nullable IndividualAddress address) throws KNXException {
430 ManagementProcedures managementProcedures = this.managementProcedures;
431 if (managementProcedures == null || address == null) {
435 return managementProcedures.isAddressOccupied(address);
436 } catch (InterruptedException e) {
437 logger.debug("Interrupted pinging KNX device '{}'", address);
443 public final synchronized void restartNetworkDevice(@Nullable IndividualAddress address) {
444 ManagementClient managementClient = this.managementClient;
445 if (address == null || managementClient == null) {
448 Destination destination = null;
450 destination = managementClient.createDestination(address, true);
451 managementClient.restart(destination);
452 } catch (KNXException e) {
453 logger.warn("Could not reset device with address '{}': {}", address, e.getMessage());
454 } catch (InterruptedException e) { // ignored as in Calimero pre-2.4.0
456 if (destination != null) {
457 destination.destroy();
463 public void readDatapoint(Datapoint datapoint) {
464 synchronized (this) {
465 ReadDatapoint retryDatapoint = new ReadDatapoint(datapoint, readRetriesLimit);
466 if (!readDatapoints.contains(retryDatapoint)) {
467 readDatapoints.add(retryDatapoint);
473 public final boolean registerGroupAddressListener(GroupAddressListener listener) {
474 return groupAddressListeners.add(listener);
478 public final boolean unregisterGroupAddressListener(GroupAddressListener listener) {
479 return groupAddressListeners.remove(listener);
483 public boolean isConnected() {
484 final var tmpLink = link;
485 return tmpLink != null && tmpLink.isOpen();
489 public DeviceInfoClient getDeviceInfoClient() {
490 DeviceInfoClient deviceInfoClient = this.deviceInfoClient;
491 if (deviceInfoClient != null) {
492 return deviceInfoClient;
494 throw new IllegalStateException();
499 public void writeToKNX(OutboundSpec commandSpec) throws KNXException {
500 ProcessCommunicator processCommunicator = this.processCommunicator;
501 KNXNetworkLink link = this.link;
502 if (processCommunicator == null || link == null) {
503 logger.debug("Cannot write to KNX bus (processCommuicator: {}, link: {})",
504 processCommunicator == null ? "Not OK" : "OK",
505 link == null ? "Not OK" : (link.isOpen() ? "Open" : "Closed"));
508 GroupAddress groupAddress = commandSpec.getGroupAddress();
510 logger.trace("writeToKNX groupAddress '{}', commandSpec '{}'", groupAddress, commandSpec);
512 if (groupAddress != null) {
513 sendToKNX(processCommunicator, link, groupAddress, commandSpec.getDPT(), commandSpec.getType());
518 public void respondToKNX(OutboundSpec responseSpec) throws KNXException {
519 ProcessCommunicationResponder responseCommunicator = this.responseCommunicator;
520 KNXNetworkLink link = this.link;
521 if (responseCommunicator == null || link == null) {
522 logger.debug("Cannot write to KNX bus (responseCommunicator: {}, link: {})",
523 responseCommunicator == null ? "Not OK" : "OK",
524 link == null ? "Not OK" : (link.isOpen() ? "Open" : "Closed"));
527 GroupAddress groupAddress = responseSpec.getGroupAddress();
529 logger.trace("respondToKNX groupAddress '{}', responseSpec '{}'", groupAddress, responseSpec);
531 if (groupAddress != null) {
532 sendToKNX(responseCommunicator, link, groupAddress, responseSpec.getDPT(), responseSpec.getType());
536 private void sendToKNX(ProcessCommunication communicator, KNXNetworkLink link, GroupAddress groupAddress,
537 String dpt, Type type) throws KNXException {
538 if (!connectIfNotAutomatic()) {
542 Datapoint datapoint = new CommandDP(groupAddress, thingUID.toString(), 0, dpt);
543 String mappedValue = toDPTValue(type, dpt);
545 logger.trace("sendToKNX mappedValue: '{}' groupAddress: '{}'", mappedValue, groupAddress);
547 if (mappedValue == null) {
548 logger.debug("Value '{}' cannot be mapped to datapoint '{}'", type, datapoint);
551 for (int i = 0; i < MAX_SEND_ATTEMPTS; i++) {
553 communicator.write(datapoint, mappedValue);
554 logger.debug("Wrote value '{}' to datapoint '{}' ({}. attempt).", type, datapoint, i);
556 } catch (KNXException e) {
557 if (i < MAX_SEND_ATTEMPTS - 1) {
558 logger.debug("Value '{}' could not be sent to KNX bus using datapoint '{}': {}. Will retry.", type,
559 datapoint, e.getLocalizedMessage());
561 logger.warn("Value '{}' could not be sent to KNX bus using datapoint '{}': {}. Giving up now.",
562 type, datapoint, e.getLocalizedMessage());