2 * Copyright (c) 2010-2022 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.knx.internal.client;
15 import java.time.Duration;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.CopyOnWriteArraySet;
19 import java.util.concurrent.LinkedBlockingQueue;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.ScheduledFuture;
22 import java.util.concurrent.TimeUnit;
23 import java.util.function.Consumer;
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.openhab.binding.knx.internal.KNXTypeMapper;
28 import org.openhab.binding.knx.internal.dpt.KNXCoreTypeMapper;
29 import org.openhab.binding.knx.internal.handler.GroupAddressListener;
30 import org.openhab.core.thing.ThingStatus;
31 import org.openhab.core.thing.ThingStatusDetail;
32 import org.openhab.core.thing.ThingUID;
33 import org.openhab.core.types.Type;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 import tuwien.auto.calimero.CloseEvent;
38 import tuwien.auto.calimero.DetachEvent;
39 import tuwien.auto.calimero.FrameEvent;
40 import tuwien.auto.calimero.GroupAddress;
41 import tuwien.auto.calimero.IndividualAddress;
42 import tuwien.auto.calimero.KNXException;
43 import tuwien.auto.calimero.datapoint.CommandDP;
44 import tuwien.auto.calimero.datapoint.Datapoint;
45 import tuwien.auto.calimero.device.ProcessCommunicationResponder;
46 import tuwien.auto.calimero.link.KNXNetworkLink;
47 import tuwien.auto.calimero.link.NetworkLinkListener;
48 import tuwien.auto.calimero.mgmt.Destination;
49 import tuwien.auto.calimero.mgmt.ManagementClient;
50 import tuwien.auto.calimero.mgmt.ManagementClientImpl;
51 import tuwien.auto.calimero.mgmt.ManagementProcedures;
52 import tuwien.auto.calimero.mgmt.ManagementProceduresImpl;
53 import tuwien.auto.calimero.process.ProcessCommunication;
54 import tuwien.auto.calimero.process.ProcessCommunicator;
55 import tuwien.auto.calimero.process.ProcessCommunicatorImpl;
56 import tuwien.auto.calimero.process.ProcessEvent;
57 import tuwien.auto.calimero.process.ProcessListener;
58 import tuwien.auto.calimero.secure.SecureApplicationLayer;
59 import tuwien.auto.calimero.secure.Security;
62 * KNX Client which encapsulates the communication with the KNX bus via the calimero libary.
64 * @author Simon Kaufmann - initial contribution and API.
68 public abstract class AbstractKNXClient implements NetworkLinkListener, KNXClient {
70 private static final int MAX_SEND_ATTEMPTS = 2;
72 private final Logger logger = LoggerFactory.getLogger(AbstractKNXClient.class);
73 private final KNXTypeMapper typeHelper = new KNXCoreTypeMapper();
75 private final ThingUID thingUID;
76 private final int responseTimeout;
77 private final int readingPause;
78 private final int autoReconnectPeriod;
79 private final int readRetriesLimit;
80 private final StatusUpdateCallback statusUpdateCallback;
81 private final ScheduledExecutorService knxScheduler;
83 private @Nullable ProcessCommunicator processCommunicator;
84 private @Nullable ProcessCommunicationResponder responseCommunicator;
85 private @Nullable ManagementProcedures managementProcedures;
86 private @Nullable ManagementClient managementClient;
87 private @Nullable KNXNetworkLink link;
88 private @Nullable DeviceInfoClient deviceInfoClient;
89 private @Nullable ScheduledFuture<?> busJob;
90 private @Nullable ScheduledFuture<?> connectJob;
92 private final Set<GroupAddressListener> groupAddressListeners = new CopyOnWriteArraySet<>();
93 private final LinkedBlockingQueue<ReadDatapoint> readDatapoints = new LinkedBlockingQueue<>();
96 private interface ListenerNotification {
97 void apply(BusMessageListener listener, IndividualAddress source, GroupAddress destination, byte[] asdu);
100 @NonNullByDefault({})
101 private final ProcessListener processListener = new ProcessListener() {
104 public void detached(DetachEvent e) {
105 logger.debug("The KNX network link was detached from the process communicator");
109 public void groupWrite(ProcessEvent e) {
110 processEvent("Group Write", e, (listener, source, destination, asdu) -> {
111 listener.onGroupWrite(AbstractKNXClient.this, source, destination, asdu);
116 public void groupReadRequest(ProcessEvent e) {
117 processEvent("Group Read Request", e, (listener, source, destination, asdu) -> {
118 listener.onGroupRead(AbstractKNXClient.this, source, destination, asdu);
123 public void groupReadResponse(ProcessEvent e) {
124 processEvent("Group Read Response", e, (listener, source, destination, asdu) -> {
125 listener.onGroupReadResponse(AbstractKNXClient.this, source, destination, asdu);
130 public AbstractKNXClient(int autoReconnectPeriod, ThingUID thingUID, int responseTimeout, int readingPause,
131 int readRetriesLimit, ScheduledExecutorService knxScheduler, StatusUpdateCallback statusUpdateCallback) {
132 this.autoReconnectPeriod = autoReconnectPeriod;
133 this.thingUID = thingUID;
134 this.responseTimeout = responseTimeout;
135 this.readingPause = readingPause;
136 this.readRetriesLimit = readRetriesLimit;
137 this.knxScheduler = knxScheduler;
138 this.statusUpdateCallback = statusUpdateCallback;
141 public void initialize() {
142 if (!scheduleReconnectJob()) {
147 private boolean scheduleReconnectJob() {
148 if (autoReconnectPeriod > 0) {
149 connectJob = knxScheduler.schedule(this::connect, autoReconnectPeriod, TimeUnit.SECONDS);
156 private void cancelReconnectJob() {
157 ScheduledFuture<?> currentReconnectJob = connectJob;
158 if (currentReconnectJob != null) {
159 currentReconnectJob.cancel(true);
164 protected abstract KNXNetworkLink establishConnection() throws KNXException, InterruptedException;
166 private synchronized boolean connectIfNotAutomatic() {
167 if (!isConnected()) {
168 return connectJob != null ? false : connect();
173 private synchronized boolean connect() {
180 logger.debug("Bridge {} is connecting to the KNX bus", thingUID);
182 KNXNetworkLink link = establishConnection();
185 managementProcedures = new ManagementProceduresImpl(link);
187 ManagementClient managementClient = new ManagementClientImpl(link);
188 managementClient.responseTimeout(Duration.ofSeconds(responseTimeout));
189 this.managementClient = managementClient;
191 deviceInfoClient = new DeviceInfoClientImpl(managementClient);
193 ProcessCommunicator processCommunicator = new ProcessCommunicatorImpl(link);
194 processCommunicator.responseTimeout(Duration.ofSeconds(responseTimeout));
195 processCommunicator.addProcessListener(processListener);
196 this.processCommunicator = processCommunicator;
198 ProcessCommunicationResponder responseCommunicator = new ProcessCommunicationResponder(link,
199 new SecureApplicationLayer(link, Security.defaultInstallation()));
200 this.responseCommunicator = responseCommunicator;
202 link.addLinkListener(this);
204 busJob = knxScheduler.scheduleWithFixedDelay(() -> readNextQueuedDatapoint(), 0, readingPause,
205 TimeUnit.MILLISECONDS);
207 statusUpdateCallback.updateStatus(ThingStatus.ONLINE);
210 } catch (KNXException | InterruptedException e) {
211 logger.debug("Error connecting to the bus: {}", e.getMessage(), e);
213 scheduleReconnectJob();
218 private void disconnect(@Nullable Exception e) {
221 String message = e.getLocalizedMessage();
222 statusUpdateCallback.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
223 message != null ? message : "");
225 statusUpdateCallback.updateStatus(ThingStatus.OFFLINE);
229 @SuppressWarnings("null")
230 private void releaseConnection() {
231 logger.debug("Bridge {} is disconnecting from the KNX bus", thingUID);
232 readDatapoints.clear();
233 busJob = nullify(busJob, j -> j.cancel(true));
234 deviceInfoClient = null;
235 managementProcedures = nullify(managementProcedures, mp -> mp.detach());
236 managementClient = nullify(managementClient, mc -> mc.detach());
237 link = nullify(link, l -> l.close());
238 processCommunicator = nullify(processCommunicator, pc -> {
239 pc.removeProcessListener(processListener);
242 responseCommunicator = nullify(responseCommunicator, rc -> {
243 rc.removeProcessListener(processListener);
248 private <T> @Nullable T nullify(T target, @Nullable Consumer<T> lastWill) {
249 if (target != null && lastWill != null) {
250 lastWill.accept(target);
255 private void processEvent(String task, ProcessEvent event, ListenerNotification action) {
256 GroupAddress destination = event.getDestination();
257 IndividualAddress source = event.getSourceAddr();
258 byte[] asdu = event.getASDU();
259 logger.trace("Received a {} telegram from '{}' to '{}' with value '{}'", task, source, destination, asdu);
260 for (GroupAddressListener listener : groupAddressListeners) {
261 if (listener.listensTo(destination)) {
262 knxScheduler.schedule(() -> action.apply(listener, source, destination, asdu), 0, TimeUnit.SECONDS);
268 * Transforms a {@link Type} into a datapoint type value for the KNX bus.
270 * @param type the {@link Type} to transform
271 * @param dpt the datapoint type to which should be converted
272 * @return the corresponding KNX datapoint type value as a string
275 private String toDPTValue(Type type, String dpt) {
276 return typeHelper.toDPTValue(type, dpt);
279 @SuppressWarnings("null")
280 private void readNextQueuedDatapoint() {
281 if (!connectIfNotAutomatic()) {
284 ProcessCommunicator processCommunicator = this.processCommunicator;
285 if (processCommunicator == null) {
288 ReadDatapoint datapoint = readDatapoints.poll();
289 if (datapoint != null) {
290 datapoint.incrementRetries();
292 logger.trace("Sending a Group Read Request telegram for {}", datapoint.getDatapoint().getMainAddress());
293 processCommunicator.read(datapoint.getDatapoint());
294 } catch (KNXException e) {
295 // Note: KnxException does not cover KnxRuntimeException and subclasses KnxSecureException,
296 // KnxIllegArgumentException
297 if (datapoint.getRetries() < datapoint.getLimit()) {
298 readDatapoints.add(datapoint);
299 logger.debug("Could not read value for datapoint {}: {}. Going to retry.",
300 datapoint.getDatapoint().getMainAddress(), e.getMessage());
302 logger.warn("Giving up reading datapoint {}, the number of maximum retries ({}) is reached.",
303 datapoint.getDatapoint().getMainAddress(), datapoint.getLimit());
305 } catch (InterruptedException | CancellationException e) {
306 logger.debug("Interrupted sending KNX read request");
308 } catch (Exception e) {
309 // Any other exception: Fail gracefully, i.e. notify user and continue reading next DP.
310 // Not catching this would end the scheduled read for all DPs in case of an error.
311 // Severity is warning as this is likely caused by a configuration error.
312 logger.warn("Error reading datapoint {}: {}", datapoint.getDatapoint().getMainAddress(),
318 public void dispose() {
319 cancelReconnectJob();
324 public void linkClosed(@Nullable CloseEvent closeEvent) {
325 KNXNetworkLink link = this.link;
326 if (link == null || closeEvent == null) {
329 if (!link.isOpen() && CloseEvent.USER_REQUEST != closeEvent.getInitiator()) {
330 statusUpdateCallback.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
331 closeEvent.getReason());
332 logger.debug("KNX link has been lost (reason: {} on object {})", closeEvent.getReason(),
333 closeEvent.getSource().toString());
334 scheduleReconnectJob();
339 public void indication(@Nullable FrameEvent e) {
344 public void confirmation(@Nullable FrameEvent e) {
349 public final synchronized boolean isReachable(@Nullable IndividualAddress address) throws KNXException {
350 ManagementProcedures managementProcedures = this.managementProcedures;
351 if (managementProcedures == null || address == null) {
355 return managementProcedures.isAddressOccupied(address);
356 } catch (InterruptedException e) {
357 logger.debug("Interrupted pinging KNX device '{}'", address);
363 public final synchronized void restartNetworkDevice(@Nullable IndividualAddress address) {
364 ManagementClient managementClient = this.managementClient;
365 if (address == null || managementClient == null) {
368 Destination destination = null;
370 destination = managementClient.createDestination(address, true);
371 managementClient.restart(destination);
372 } catch (KNXException e) {
373 logger.warn("Could not reset device with address '{}': {}", address, e.getMessage());
374 } catch (InterruptedException e) { // ignored as in Calimero pre-2.4.0
376 if (destination != null) {
377 destination.destroy();
383 public void readDatapoint(Datapoint datapoint) {
384 synchronized (this) {
385 ReadDatapoint retryDatapoint = new ReadDatapoint(datapoint, readRetriesLimit);
386 if (!readDatapoints.contains(retryDatapoint)) {
387 readDatapoints.add(retryDatapoint);
393 public final boolean registerGroupAddressListener(GroupAddressListener listener) {
394 return groupAddressListeners.add(listener);
398 public final boolean unregisterGroupAddressListener(GroupAddressListener listener) {
399 return groupAddressListeners.remove(listener);
403 public boolean isConnected() {
404 final var tmpLink = link;
405 return tmpLink != null && tmpLink.isOpen();
409 public DeviceInfoClient getDeviceInfoClient() {
410 DeviceInfoClient deviceInfoClient = this.deviceInfoClient;
411 if (deviceInfoClient != null) {
412 return deviceInfoClient;
414 throw new IllegalStateException();
419 public void writeToKNX(OutboundSpec commandSpec) throws KNXException {
420 ProcessCommunicator processCommunicator = this.processCommunicator;
421 KNXNetworkLink link = this.link;
422 if (processCommunicator == null || link == null) {
423 logger.debug("Cannot write to the KNX bus (processCommuicator: {}, link: {})",
424 processCommunicator == null ? "Not OK" : "OK",
425 link == null ? "Not OK" : (link.isOpen() ? "Open" : "Closed"));
428 GroupAddress groupAddress = commandSpec.getGroupAddress();
430 logger.trace("writeToKNX groupAddress '{}', commandSpec '{}'", groupAddress, commandSpec);
432 if (groupAddress != null) {
433 sendToKNX(processCommunicator, link, groupAddress, commandSpec.getDPT(), commandSpec.getType());
438 public void respondToKNX(OutboundSpec responseSpec) throws KNXException {
439 ProcessCommunicationResponder responseCommunicator = this.responseCommunicator;
440 KNXNetworkLink link = this.link;
441 if (responseCommunicator == null || link == null) {
442 logger.debug("Cannot write to the KNX bus (responseCommunicator: {}, link: {})",
443 responseCommunicator == null ? "Not OK" : "OK",
444 link == null ? "Not OK" : (link.isOpen() ? "Open" : "Closed"));
447 GroupAddress groupAddress = responseSpec.getGroupAddress();
449 logger.trace("respondToKNX groupAddress '{}', responseSpec '{}'", groupAddress, responseSpec);
451 if (groupAddress != null) {
452 sendToKNX(responseCommunicator, link, groupAddress, responseSpec.getDPT(), responseSpec.getType());
456 private void sendToKNX(ProcessCommunication communicator, KNXNetworkLink link, GroupAddress groupAddress,
457 String dpt, Type type) throws KNXException {
458 if (!connectIfNotAutomatic()) {
462 Datapoint datapoint = new CommandDP(groupAddress, thingUID.toString(), 0, dpt);
463 String mappedValue = toDPTValue(type, dpt);
465 logger.trace("sendToKNX mappedValue: '{}' groupAddress: '{}'", mappedValue, groupAddress);
467 if (mappedValue == null) {
468 logger.debug("Value '{}' cannot be mapped to datapoint '{}'", type, datapoint);
471 for (int i = 0; i < MAX_SEND_ATTEMPTS; i++) {
473 communicator.write(datapoint, mappedValue);
474 logger.debug("Wrote value '{}' to datapoint '{}' ({}. attempt).", type, datapoint, i);
476 } catch (KNXException e) {
477 if (i < MAX_SEND_ATTEMPTS - 1) {
478 logger.debug("Value '{}' could not be sent to the KNX bus using datapoint '{}': {}. Will retry.",
479 type, datapoint, e.getLocalizedMessage());
481 logger.warn("Value '{}' could not be sent to the KNX bus using datapoint '{}': {}. Giving up now.",
482 type, datapoint, e.getLocalizedMessage());