]> git.basschouten.com Git - openhab-addons.git/blob
87ef9245163e50d944233d1669091caee928f778
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2024 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.knx.internal.client;
14
15 import static org.openhab.binding.knx.internal.dpt.DPTUtil.NORMALIZED_DPT;
16
17 import java.time.Duration;
18 import java.util.Set;
19 import java.util.concurrent.CancellationException;
20 import java.util.concurrent.CopyOnWriteArraySet;
21 import java.util.concurrent.LinkedBlockingQueue;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.TimeUnit;
25 import java.util.function.Consumer;
26
27 import org.eclipse.jdt.annotation.NonNullByDefault;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.openhab.binding.knx.internal.dpt.ValueEncoder;
30 import org.openhab.binding.knx.internal.handler.GroupAddressListener;
31 import org.openhab.binding.knx.internal.handler.KNXBridgeBaseThingHandler.CommandExtensionData;
32 import org.openhab.binding.knx.internal.i18n.KNXTranslationProvider;
33 import org.openhab.core.thing.ThingStatus;
34 import org.openhab.core.thing.ThingStatusDetail;
35 import org.openhab.core.thing.ThingUID;
36 import org.openhab.core.types.Type;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 import tuwien.auto.calimero.CloseEvent;
41 import tuwien.auto.calimero.DetachEvent;
42 import tuwien.auto.calimero.FrameEvent;
43 import tuwien.auto.calimero.GroupAddress;
44 import tuwien.auto.calimero.IndividualAddress;
45 import tuwien.auto.calimero.KNXException;
46 import tuwien.auto.calimero.KNXIllegalArgumentException;
47 import tuwien.auto.calimero.datapoint.CommandDP;
48 import tuwien.auto.calimero.datapoint.Datapoint;
49 import tuwien.auto.calimero.device.ProcessCommunicationResponder;
50 import tuwien.auto.calimero.link.KNXNetworkLink;
51 import tuwien.auto.calimero.link.NetworkLinkListener;
52 import tuwien.auto.calimero.mgmt.Destination;
53 import tuwien.auto.calimero.mgmt.ManagementClient;
54 import tuwien.auto.calimero.mgmt.ManagementClientImpl;
55 import tuwien.auto.calimero.mgmt.ManagementProcedures;
56 import tuwien.auto.calimero.mgmt.ManagementProceduresImpl;
57 import tuwien.auto.calimero.process.ProcessCommunication;
58 import tuwien.auto.calimero.process.ProcessCommunicator;
59 import tuwien.auto.calimero.process.ProcessCommunicatorImpl;
60 import tuwien.auto.calimero.process.ProcessEvent;
61 import tuwien.auto.calimero.process.ProcessListener;
62 import tuwien.auto.calimero.secure.KnxSecureException;
63 import tuwien.auto.calimero.secure.SecureApplicationLayer;
64 import tuwien.auto.calimero.secure.Security;
65
66 /**
67  * KNX Client which encapsulates the communication with the KNX bus via the calimero libary.
68  *
69  * @author Simon Kaufmann - initial contribution and API.
70  *
71  */
72 @NonNullByDefault
73 public abstract class AbstractKNXClient implements NetworkLinkListener, KNXClient {
74     public enum ClientState {
75         INIT,
76         RUNNING,
77         INTERRUPTED,
78         DISPOSE
79     }
80
81     private ClientState state = ClientState.INIT;
82
83     private static final int MAX_SEND_ATTEMPTS = 2;
84
85     private final Logger logger = LoggerFactory.getLogger(AbstractKNXClient.class);
86
87     private final ThingUID thingUID;
88     private final int responseTimeout;
89     private final int readingPause;
90     private final int autoReconnectPeriod;
91     private final int readRetriesLimit;
92     private final StatusUpdateCallback statusUpdateCallback;
93     private final ScheduledExecutorService knxScheduler;
94     private final CommandExtensionData commandExtensionData;
95
96     private @Nullable ProcessCommunicator processCommunicator;
97     private @Nullable ProcessCommunicationResponder responseCommunicator;
98     private @Nullable ManagementProcedures managementProcedures;
99     private @Nullable ManagementClient managementClient;
100     private @Nullable KNXNetworkLink link;
101     private @Nullable DeviceInfoClient deviceInfoClient;
102     private @Nullable ScheduledFuture<?> busJob;
103     private @Nullable ScheduledFuture<?> connectJob;
104
105     private final Set<GroupAddressListener> groupAddressListeners = new CopyOnWriteArraySet<>();
106     private final LinkedBlockingQueue<ReadDatapoint> readDatapoints = new LinkedBlockingQueue<>();
107
108     @FunctionalInterface
109     private interface ListenerNotification {
110         void apply(BusMessageListener listener, IndividualAddress source, GroupAddress destination, byte[] asdu);
111     }
112
113     @NonNullByDefault({})
114     private final ProcessListener processListener = new ProcessListener() {
115
116         @Override
117         public void detached(DetachEvent e) {
118             logger.debug("The KNX network link was detached from the process communicator");
119         }
120
121         @Override
122         public void groupWrite(ProcessEvent e) {
123             processEvent("Group Write", e, (listener, source, destination, asdu) -> listener
124                     .onGroupWrite(AbstractKNXClient.this, source, destination, asdu));
125         }
126
127         @Override
128         public void groupReadRequest(ProcessEvent e) {
129             processEvent("Group Read Request", e, (listener, source, destination, asdu) -> listener
130                     .onGroupRead(AbstractKNXClient.this, source, destination, asdu));
131         }
132
133         @Override
134         public void groupReadResponse(ProcessEvent e) {
135             processEvent("Group Read Response", e, (listener, source, destination, asdu) -> listener
136                     .onGroupReadResponse(AbstractKNXClient.this, source, destination, asdu));
137         }
138     };
139
140     public AbstractKNXClient(int autoReconnectPeriod, ThingUID thingUID, int responseTimeout, int readingPause,
141             int readRetriesLimit, ScheduledExecutorService knxScheduler, CommandExtensionData commandExtensionData,
142             StatusUpdateCallback statusUpdateCallback) {
143         this.autoReconnectPeriod = autoReconnectPeriod;
144         this.thingUID = thingUID;
145         this.responseTimeout = responseTimeout;
146         this.readingPause = readingPause;
147         this.readRetriesLimit = readRetriesLimit;
148         this.knxScheduler = knxScheduler;
149         this.statusUpdateCallback = statusUpdateCallback;
150         this.commandExtensionData = commandExtensionData;
151     }
152
153     public void initialize() {
154         connect();
155     }
156
157     private void scheduleReconnectJob() {
158         if (autoReconnectPeriod > 0) {
159             // schedule connect job, for the first connection ignore autoReconnectPeriod and use 1 sec
160             final long reconnectDelayS = (state == ClientState.INIT) ? 1 : autoReconnectPeriod;
161             final String prefix = (state == ClientState.INIT) ? "re" : "";
162             logger.debug("Bridge {} scheduling {}connect in {}s", thingUID, prefix, reconnectDelayS);
163             connectJob = knxScheduler.schedule(this::connect, reconnectDelayS, TimeUnit.SECONDS);
164         }
165     }
166
167     private void cancelReconnectJob() {
168         final ScheduledFuture<?> currentReconnectJob = connectJob;
169         if (currentReconnectJob != null) {
170             currentReconnectJob.cancel(true);
171             connectJob = null;
172         }
173     }
174
175     protected abstract KNXNetworkLink establishConnection() throws KNXException, InterruptedException;
176
177     private synchronized boolean connectIfNotAutomatic() {
178         if (!isConnected()) {
179             return connectJob == null && connect();
180         }
181         return true;
182     }
183
184     private synchronized boolean connect() {
185         if (state == ClientState.INIT) {
186             state = ClientState.RUNNING;
187         } else if (state == ClientState.DISPOSE) {
188             logger.trace("connect() ignored, closing down");
189             return false;
190         }
191
192         if (isConnected()) {
193             return true;
194         }
195         try {
196             // We have a valid "connection" object, this is ensured by IPClient.java.
197             // "releaseConnection" is actually removing all registered users of this connection and stopping
198             // all threads.
199             // Note that this will also kill this function in the following call to sleep in case of a
200             // connection loss -> restart is via triggered via scheduledReconnect in handler for InterruptedException.
201             releaseConnection();
202             Thread.sleep(1000);
203             logger.debug("Bridge {} is connecting to KNX bus", thingUID);
204
205             // now establish (possibly encrypted) connection, according to settings (tunnel, routing, secure...)
206             KNXNetworkLink link = establishConnection();
207             this.link = link;
208
209             // ManagementProcedures provided by Calimero: allow managing other KNX devices, e.g. check if an address is
210             // reachable.
211             // Note for KNX Secure: ManagmentProcedueresImpl currently does not provide a ctor with external SAL,
212             // it internally creates an instance of ManagementClientImpl, which uses
213             // Security.defaultInstallation().deviceToolKeys()
214             // Protected ctor using given ManagementClientImpl is avalable (custom class to be inherited)
215             managementProcedures = new ManagementProceduresImpl(link);
216
217             // ManagementClient provided by Calimero: allow reading device info, etc.
218             // Note for KNX Secure: ManagementClientImpl does not provide a ctor with external SAL in Calimero 2.5,
219             // is uses global Security.defaultInstalltion().deviceToolKeys()
220             // Current main branch includes a protected ctor (custom class to be inherited)
221             // TODO Calimero>2.5: check if there is a new way to provide security info, there is a new protected ctor
222             // TODO check if we can avoid creating another ManagementClient and re-use this from ManagemntProcedures
223             ManagementClient managementClient = new ManagementClientImpl(link);
224             managementClient.responseTimeout(Duration.ofSeconds(responseTimeout));
225             this.managementClient = managementClient;
226
227             // OH helper for reading device info, based on managementClient above
228             deviceInfoClient = new DeviceInfoClientImpl(managementClient);
229
230             // ProcessCommunicator provides main KNX communication (Calimero).
231             // Note for KNX Secure: SAL to be provided
232             ProcessCommunicator processCommunicator = new ProcessCommunicatorImpl(link);
233             processCommunicator.responseTimeout(Duration.ofSeconds(responseTimeout));
234             processCommunicator.addProcessListener(processListener);
235             this.processCommunicator = processCommunicator;
236
237             // ProcessCommunicationResponder provides responses to requests from KNX bus (Calimero).
238             // Note for KNX Secure: SAL to be provided
239             this.responseCommunicator = new ProcessCommunicationResponder(link,
240                     new SecureApplicationLayer(link, Security.defaultInstallation()));
241
242             // register this class, callbacks will be triggered
243             link.addLinkListener(this);
244
245             // create a job carrying out read requests
246             busJob = knxScheduler.scheduleWithFixedDelay(this::readNextQueuedDatapoint, 0, readingPause,
247                     TimeUnit.MILLISECONDS);
248
249             statusUpdateCallback.updateStatus(ThingStatus.ONLINE);
250             connectJob = null;
251
252             logger.info("Bridge {} connected to KNX bus", thingUID);
253
254             state = ClientState.RUNNING;
255             return true;
256         } catch (InterruptedException e) {
257             ClientState lastState = state;
258             state = ClientState.INTERRUPTED;
259
260             logger.trace("Bridge {}, connection interrupted", thingUID);
261
262             disconnect(e);
263             if (lastState != ClientState.DISPOSE) {
264                 scheduleReconnectJob();
265             }
266
267             return false;
268         } catch (KNXException | KnxSecureException e) {
269             logger.debug("Bridge {} cannot connect: {}", thingUID, e.getMessage());
270             disconnect(e);
271             scheduleReconnectJob();
272             return false;
273         } catch (KNXIllegalArgumentException e) {
274             logger.debug("Bridge {} cannot connect: {}", thingUID, e.getMessage());
275             disconnect(e, ThingStatusDetail.CONFIGURATION_ERROR);
276             return false;
277         }
278     }
279
280     private void disconnect(@Nullable Exception e) {
281         disconnect(e, null);
282     }
283
284     private synchronized void disconnect(@Nullable Exception e, @Nullable ThingStatusDetail detail) {
285         releaseConnection();
286         if (e != null) {
287             statusUpdateCallback.updateStatus(ThingStatus.OFFLINE,
288                     detail != null ? detail : ThingStatusDetail.COMMUNICATION_ERROR,
289                     KNXTranslationProvider.I18N.getLocalizedException(e));
290         } else {
291             statusUpdateCallback.updateStatus(ThingStatus.OFFLINE);
292         }
293     }
294
295     protected void releaseConnection() {
296         logger.debug("Bridge {} is disconnecting from KNX bus", thingUID);
297         var tmplink = link;
298         if (tmplink != null) {
299             tmplink.removeLinkListener(this);
300         }
301         busJob = nullify(busJob, j -> j.cancel(true));
302         readDatapoints.clear();
303         responseCommunicator = nullify(responseCommunicator, rc -> {
304             rc.removeProcessListener(processListener);
305             rc.detach();
306         });
307         processCommunicator = nullify(processCommunicator, pc -> {
308             pc.removeProcessListener(processListener);
309             pc.detach();
310         });
311         deviceInfoClient = null;
312         managementClient = nullify(managementClient, ManagementClient::detach);
313         managementProcedures = nullify(managementProcedures, ManagementProcedures::detach);
314         link = nullify(link, KNXNetworkLink::close);
315         logger.trace("Bridge {} disconnected from KNX bus", thingUID);
316     }
317
318     private <T> @Nullable T nullify(@Nullable T target, @Nullable Consumer<T> lastWill) {
319         if (target != null && lastWill != null) {
320             lastWill.accept(target);
321         }
322         return null;
323     }
324
325     private void processEvent(String task, ProcessEvent event, ListenerNotification action) {
326         GroupAddress destination = event.getDestination();
327         IndividualAddress source = event.getSourceAddr();
328         byte[] asdu = event.getASDU();
329         logger.trace("Received a {} telegram from '{}' to '{}' with value '{}'", task, source, destination, asdu);
330         boolean isHandled = false;
331         for (GroupAddressListener listener : groupAddressListeners) {
332             if (listener.listensTo(destination)) {
333                 isHandled = true;
334                 knxScheduler.schedule(() -> action.apply(listener, source, destination, asdu), 0, TimeUnit.SECONDS);
335             }
336         }
337         // Store information about unhandled GAs, can be shown on console using knx:list-unknown-ga.
338         // The idea is to store GA, message type, and size as key. The value counts the number of packets.
339         if (!isHandled) {
340             logger.trace("Address '{}' is not configured in openHAB", destination);
341             final String type = switch (event.getServiceCode()) {
342                 case 0x80 -> " GROUP_WRITE(";
343                 case 0x40 -> " GROUP_RESPONSE(";
344                 case 0x00 -> " GROUP_READ(";
345                 default -> " ?(";
346             };
347             final String key = destination.toString() + type + event.getASDU().length + ")";
348             commandExtensionData.unknownGA().compute(key, (k, v) -> v == null ? 1 : v + 1);
349         }
350     }
351
352     // datapoint is null at end of the list, warning is misleading
353     @SuppressWarnings("null")
354     private void readNextQueuedDatapoint() {
355         if (!connectIfNotAutomatic()) {
356             return;
357         }
358         ProcessCommunicator processCommunicator = this.processCommunicator;
359         if (processCommunicator == null) {
360             return;
361         }
362         ReadDatapoint datapoint = readDatapoints.poll();
363         if (datapoint != null) {
364             datapoint.incrementRetries();
365             try {
366                 logger.trace("Sending a Group Read Request telegram for {}", datapoint.getDatapoint().getMainAddress());
367                 processCommunicator.read(datapoint.getDatapoint());
368             } catch (KNXException e) {
369                 // Note: KnxException does not cover KnxRuntimeException and subclasses KnxSecureException,
370                 // KnxIllegArgumentException
371                 if (datapoint.getRetries() < datapoint.getLimit()) {
372                     readDatapoints.add(datapoint);
373                     logger.debug("Could not read value for datapoint {}: {}. Going to retry.",
374                             datapoint.getDatapoint().getMainAddress(), e.getMessage());
375                 } else {
376                     logger.warn("Giving up reading datapoint {}, the number of maximum retries ({}) is reached.",
377                             datapoint.getDatapoint().getMainAddress(), datapoint.getLimit());
378                 }
379             } catch (InterruptedException | CancellationException e) {
380                 logger.debug("Interrupted sending KNX read request");
381             } catch (Exception e) {
382                 // Any other exception: Fail gracefully, i.e. notify user and continue reading next DP.
383                 // Not catching this would end the scheduled read for all DPs in case of an error.
384                 // Severity is warning as this is likely caused by a configuration error.
385                 logger.warn("Error reading datapoint {}: {}", datapoint.getDatapoint().getMainAddress(),
386                         e.getMessage());
387             }
388         }
389     }
390
391     public void dispose() {
392         state = ClientState.DISPOSE;
393
394         cancelReconnectJob();
395         disconnect(null);
396     }
397
398     @Override
399     public void linkClosed(@Nullable CloseEvent closeEvent) {
400         KNXNetworkLink link = this.link;
401         if (link == null || closeEvent == null) {
402             return;
403         }
404         if (!link.isOpen() && CloseEvent.USER_REQUEST != closeEvent.getInitiator()) {
405             final String reason = closeEvent.getReason();
406             statusUpdateCallback.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
407                     KNXTranslationProvider.I18N.get(reason));
408             logger.debug("KNX link has been lost (reason: {} on object {})", closeEvent.getReason(),
409                     closeEvent.getSource().toString());
410             scheduleReconnectJob();
411         }
412     }
413
414     @Override
415     public void indication(@Nullable FrameEvent e) {
416         // no-op
417     }
418
419     @Override
420     public void confirmation(@Nullable FrameEvent e) {
421         // no-op
422     }
423
424     @Override
425     public final synchronized boolean isReachable(@Nullable IndividualAddress address) throws KNXException {
426         ManagementProcedures managementProcedures = this.managementProcedures;
427         if (managementProcedures == null || address == null) {
428             return false;
429         }
430         try {
431             return managementProcedures.isAddressOccupied(address);
432         } catch (InterruptedException e) {
433             logger.debug("Interrupted pinging KNX device '{}'", address);
434         }
435         return false;
436     }
437
438     @Override
439     public final synchronized void restartNetworkDevice(@Nullable IndividualAddress address) {
440         ManagementClient managementClient = this.managementClient;
441         if (address == null || managementClient == null) {
442             return;
443         }
444         Destination destination = null;
445         try {
446             destination = managementClient.createDestination(address, true);
447             managementClient.restart(destination);
448         } catch (KNXException e) {
449             logger.warn("Could not reset device with address '{}': {}", address, e.getMessage());
450         } catch (InterruptedException e) { // ignored as in Calimero pre-2.4.0
451         } finally {
452             if (destination != null) {
453                 destination.destroy();
454             }
455         }
456     }
457
458     @Override
459     public void readDatapoint(Datapoint datapoint) {
460         synchronized (this) {
461             ReadDatapoint retryDatapoint = new ReadDatapoint(datapoint, readRetriesLimit);
462             if (!readDatapoints.contains(retryDatapoint)) {
463                 readDatapoints.add(retryDatapoint);
464             }
465         }
466     }
467
468     @Override
469     public final void registerGroupAddressListener(GroupAddressListener listener) {
470         groupAddressListeners.add(listener);
471     }
472
473     @Override
474     public final void unregisterGroupAddressListener(GroupAddressListener listener) {
475         groupAddressListeners.remove(listener);
476     }
477
478     @Override
479     public boolean isConnected() {
480         KNXNetworkLink tmpLink = link;
481         return tmpLink != null && tmpLink.isOpen();
482     }
483
484     @Override
485     public DeviceInfoClient getDeviceInfoClient() {
486         DeviceInfoClient deviceInfoClient = this.deviceInfoClient;
487         if (deviceInfoClient != null) {
488             return deviceInfoClient;
489         } else {
490             throw new IllegalStateException();
491         }
492     }
493
494     @Override
495     public void writeToKNX(OutboundSpec commandSpec) throws KNXException {
496         ProcessCommunicator processCommunicator = this.processCommunicator;
497         KNXNetworkLink link = this.link;
498         if (processCommunicator == null || link == null) {
499             logger.debug("Cannot write to KNX bus (processCommunicator: {}, link: {})",
500                     processCommunicator == null ? "Not OK" : "OK",
501                     link == null ? "Not OK" : (link.isOpen() ? "Open" : "Closed"));
502             return;
503         }
504         GroupAddress groupAddress = commandSpec.getGroupAddress();
505
506         logger.trace("writeToKNX groupAddress '{}', commandSpec '{}:{} {}'", groupAddress, groupAddress,
507                 commandSpec.getDPT(), commandSpec.getValue());
508
509         sendToKNX(processCommunicator, groupAddress, commandSpec.getDPT(), commandSpec.getValue());
510     }
511
512     @Override
513     public void respondToKNX(OutboundSpec responseSpec) throws KNXException {
514         ProcessCommunicationResponder responseCommunicator = this.responseCommunicator;
515         KNXNetworkLink link = this.link;
516         if (responseCommunicator == null || link == null) {
517             logger.debug("Cannot write to KNX bus (responseCommunicator: {}, link: {})",
518                     responseCommunicator == null ? "Not OK" : "OK",
519                     link == null ? "Not OK" : (link.isOpen() ? "Open" : "Closed"));
520             return;
521         }
522         GroupAddress groupAddress = responseSpec.getGroupAddress();
523
524         logger.trace("respondToKNX groupAddress '{}', responseSpec '{}'", groupAddress, responseSpec);
525
526         sendToKNX(responseCommunicator, groupAddress, responseSpec.getDPT(), responseSpec.getValue());
527     }
528
529     private void sendToKNX(ProcessCommunication communicator, GroupAddress groupAddress, String dpt, Type type)
530             throws KNXException {
531         if (!connectIfNotAutomatic()) {
532             return;
533         }
534
535         Datapoint datapoint = new CommandDP(groupAddress, thingUID.toString(), 0,
536                 NORMALIZED_DPT.getOrDefault(dpt, dpt));
537         String mappedValue = ValueEncoder.encode(type, dpt);
538         if (mappedValue == null) {
539             logger.debug("Value '{}' of type '{}' cannot be mapped to datapoint '{}'", type, type.getClass(),
540                     datapoint);
541             return;
542         }
543         logger.trace("sendToKNX mappedValue: '{}' groupAddress: '{}'", mappedValue, groupAddress);
544
545         for (int i = 0;; i++) {
546             try {
547                 communicator.write(datapoint, mappedValue);
548                 logger.debug("Wrote value '{}' to datapoint '{}' ({}. attempt).", type, datapoint, i);
549                 break;
550             } catch (KNXException e) {
551                 if (i < MAX_SEND_ATTEMPTS - 1) {
552                     logger.debug("Value '{}' could not be sent to KNX bus using datapoint '{}': {}. Will retry.", type,
553                             datapoint, e.getLocalizedMessage());
554                 } else {
555                     logger.warn("Value '{}' could not be sent to KNX bus using datapoint '{}': {}. Giving up now.",
556                             type, datapoint, e.getLocalizedMessage());
557                     throw e;
558                 }
559             }
560         }
561     }
562 }