]> git.basschouten.com Git - openhab-addons.git/blob
7740173200af8ae31e339c8acd01554df0085467
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.knx.internal.client;
14
15 import java.time.Duration;
16 import java.util.Optional;
17 import java.util.Set;
18 import java.util.concurrent.CancellationException;
19 import java.util.concurrent.CopyOnWriteArraySet;
20 import java.util.concurrent.LinkedBlockingQueue;
21 import java.util.concurrent.ScheduledExecutorService;
22 import java.util.concurrent.ScheduledFuture;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.Consumer;
25
26 import org.eclipse.jdt.annotation.NonNullByDefault;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.openhab.binding.knx.internal.KNXTypeMapper;
29 import org.openhab.binding.knx.internal.dpt.KNXCoreTypeMapper;
30 import org.openhab.binding.knx.internal.handler.GroupAddressListener;
31 import org.openhab.binding.knx.internal.i18n.KNXTranslationProvider;
32 import org.openhab.core.thing.ThingStatus;
33 import org.openhab.core.thing.ThingStatusDetail;
34 import org.openhab.core.thing.ThingUID;
35 import org.openhab.core.types.Type;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 import tuwien.auto.calimero.CloseEvent;
40 import tuwien.auto.calimero.DetachEvent;
41 import tuwien.auto.calimero.FrameEvent;
42 import tuwien.auto.calimero.GroupAddress;
43 import tuwien.auto.calimero.IndividualAddress;
44 import tuwien.auto.calimero.KNXException;
45 import tuwien.auto.calimero.KNXIllegalArgumentException;
46 import tuwien.auto.calimero.datapoint.CommandDP;
47 import tuwien.auto.calimero.datapoint.Datapoint;
48 import tuwien.auto.calimero.device.ProcessCommunicationResponder;
49 import tuwien.auto.calimero.link.KNXNetworkLink;
50 import tuwien.auto.calimero.link.NetworkLinkListener;
51 import tuwien.auto.calimero.mgmt.Destination;
52 import tuwien.auto.calimero.mgmt.ManagementClient;
53 import tuwien.auto.calimero.mgmt.ManagementClientImpl;
54 import tuwien.auto.calimero.mgmt.ManagementProcedures;
55 import tuwien.auto.calimero.mgmt.ManagementProceduresImpl;
56 import tuwien.auto.calimero.process.ProcessCommunication;
57 import tuwien.auto.calimero.process.ProcessCommunicator;
58 import tuwien.auto.calimero.process.ProcessCommunicatorImpl;
59 import tuwien.auto.calimero.process.ProcessEvent;
60 import tuwien.auto.calimero.process.ProcessListener;
61 import tuwien.auto.calimero.secure.KnxSecureException;
62 import tuwien.auto.calimero.secure.SecureApplicationLayer;
63 import tuwien.auto.calimero.secure.Security;
64
65 /**
66  * KNX Client which encapsulates the communication with the KNX bus via the calimero libary.
67  *
68  * @author Simon Kaufmann - initial contribution and API.
69  *
70  */
71 @NonNullByDefault
72 public abstract class AbstractKNXClient implements NetworkLinkListener, KNXClient {
73     public enum ClientState {
74         INIT,
75         RUNNING,
76         INTERRUPTED,
77         DISPOSE
78     }
79
80     private ClientState state = ClientState.INIT;
81
82     private static final int MAX_SEND_ATTEMPTS = 2;
83
84     private final Logger logger = LoggerFactory.getLogger(AbstractKNXClient.class);
85     private final KNXTypeMapper typeHelper = new KNXCoreTypeMapper();
86
87     private final ThingUID thingUID;
88     private final int responseTimeout;
89     private final int readingPause;
90     private final int autoReconnectPeriod;
91     private final int readRetriesLimit;
92     private final StatusUpdateCallback statusUpdateCallback;
93     private final ScheduledExecutorService knxScheduler;
94
95     private @Nullable ProcessCommunicator processCommunicator;
96     private @Nullable ProcessCommunicationResponder responseCommunicator;
97     private @Nullable ManagementProcedures managementProcedures;
98     private @Nullable ManagementClient managementClient;
99     private @Nullable KNXNetworkLink link;
100     private @Nullable DeviceInfoClient deviceInfoClient;
101     private @Nullable ScheduledFuture<?> busJob;
102     private @Nullable ScheduledFuture<?> connectJob;
103
104     private final Set<GroupAddressListener> groupAddressListeners = new CopyOnWriteArraySet<>();
105     private final LinkedBlockingQueue<ReadDatapoint> readDatapoints = new LinkedBlockingQueue<>();
106
107     @FunctionalInterface
108     private interface ListenerNotification {
109         void apply(BusMessageListener listener, IndividualAddress source, GroupAddress destination, byte[] asdu);
110     }
111
112     @NonNullByDefault({})
113     private final ProcessListener processListener = new ProcessListener() {
114
115         @Override
116         public void detached(DetachEvent e) {
117             logger.debug("The KNX network link was detached from the process communicator");
118         }
119
120         @Override
121         public void groupWrite(ProcessEvent e) {
122             processEvent("Group Write", e, (listener, source, destination, asdu) -> {
123                 listener.onGroupWrite(AbstractKNXClient.this, source, destination, asdu);
124             });
125         }
126
127         @Override
128         public void groupReadRequest(ProcessEvent e) {
129             processEvent("Group Read Request", e, (listener, source, destination, asdu) -> {
130                 listener.onGroupRead(AbstractKNXClient.this, source, destination, asdu);
131             });
132         }
133
134         @Override
135         public void groupReadResponse(ProcessEvent e) {
136             processEvent("Group Read Response", e, (listener, source, destination, asdu) -> {
137                 listener.onGroupReadResponse(AbstractKNXClient.this, source, destination, asdu);
138             });
139         }
140     };
141
142     public AbstractKNXClient(int autoReconnectPeriod, ThingUID thingUID, int responseTimeout, int readingPause,
143             int readRetriesLimit, ScheduledExecutorService knxScheduler, StatusUpdateCallback statusUpdateCallback) {
144         this.autoReconnectPeriod = autoReconnectPeriod;
145         this.thingUID = thingUID;
146         this.responseTimeout = responseTimeout;
147         this.readingPause = readingPause;
148         this.readRetriesLimit = readRetriesLimit;
149         this.knxScheduler = knxScheduler;
150         this.statusUpdateCallback = statusUpdateCallback;
151     }
152
153     public void initialize() {
154         if (!scheduleReconnectJob()) {
155             connect();
156         }
157     }
158
159     private boolean scheduleReconnectJob() {
160         if (autoReconnectPeriod > 0) {
161             // schedule connect job, for the first connection ignore autoReconnectPeriod and use 1 sec
162             final long reconnectDelayS = (state == ClientState.INIT) ? 1 : autoReconnectPeriod;
163             final String prefix = (state == ClientState.INIT) ? "re" : "";
164             logger.debug("Bridge {} scheduling {}connect in {}s", thingUID, prefix, reconnectDelayS);
165             connectJob = knxScheduler.schedule(this::connect, reconnectDelayS, TimeUnit.SECONDS);
166             return true;
167         } else {
168             return false;
169         }
170     }
171
172     private void cancelReconnectJob() {
173         final ScheduledFuture<?> currentReconnectJob = connectJob;
174         if (currentReconnectJob != null) {
175             currentReconnectJob.cancel(true);
176             connectJob = null;
177         }
178     }
179
180     protected abstract KNXNetworkLink establishConnection() throws KNXException, InterruptedException;
181
182     private synchronized boolean connectIfNotAutomatic() {
183         if (!isConnected()) {
184             return connectJob != null ? false : connect();
185         }
186         return true;
187     }
188
189     private synchronized boolean connect() {
190         if (state == ClientState.INIT) {
191             state = ClientState.RUNNING;
192         } else if (state == ClientState.DISPOSE) {
193             logger.trace("connect() ignored, closing down");
194             return false;
195         }
196
197         if (isConnected()) {
198             return true;
199         }
200         try {
201             // We have a valid "connection" object, this is ensured by IPClient.java.
202             // "releaseConnection" is actually removing all registered users of this connection and stopping
203             // all threads.
204             // Note that this will also kill this function in the following call to sleep in case of a
205             // connection loss -> restart is via triggered via scheduledReconnect in handler for InterruptedException.
206             releaseConnection();
207             Thread.sleep(1000);
208             logger.debug("Bridge {} is connecting to KNX bus", thingUID);
209
210             // now establish (possibly encrypted) connection, according to settings (tunnel, routing, secure...)
211             KNXNetworkLink link = establishConnection();
212             this.link = link;
213
214             // ManagementProcedures provided by Calimero: allow managing other KNX devices, e.g. check if an address is
215             // reachable.
216             // Note for KNX Secure: ManagmentProcedueresImpl currently does not provide a ctor with external SAL,
217             // it internally creates an instance of ManagementClientImpl, which uses
218             // Security.defaultInstallation().deviceToolKeys()
219             // Protected ctor using given ManagementClientImpl is avalable (custom class to be inherited)
220             managementProcedures = new ManagementProceduresImpl(link);
221
222             // ManagementClient provided by Calimero: allow reading device info, etc.
223             // Note for KNX Secure: ManagementClientImpl does not provide a ctor with external SAL in Calimero 2.5,
224             // is uses global Security.defaultInstalltion().deviceToolKeys()
225             // Current main branch includes a protected ctor (custom class to be inherited)
226             // TODO Calimero>2.5: check if there is a new way to provide security info, there is a new protected ctor
227             // TODO check if we can avoid creating another ManagementClient and re-use this from ManagemntProcedures
228             ManagementClient managementClient = new ManagementClientImpl(link);
229             managementClient.responseTimeout(Duration.ofSeconds(responseTimeout));
230             this.managementClient = managementClient;
231
232             // OH helper for reading device info, based on managementClient above
233             deviceInfoClient = new DeviceInfoClientImpl(managementClient);
234
235             // ProcessCommunicator provides main KNX communication (Calimero).
236             // Note for KNX Secure: SAL to be provided
237             ProcessCommunicator processCommunicator = new ProcessCommunicatorImpl(link);
238             processCommunicator.responseTimeout(Duration.ofSeconds(responseTimeout));
239             processCommunicator.addProcessListener(processListener);
240             this.processCommunicator = processCommunicator;
241
242             // ProcessCommunicationResponder provides responses to requests from KNX bus (Calimero).
243             // Note for KNX Secure: SAL to be provided
244             ProcessCommunicationResponder responseCommunicator = new ProcessCommunicationResponder(link,
245                     new SecureApplicationLayer(link, Security.defaultInstallation()));
246             this.responseCommunicator = responseCommunicator;
247
248             // register this class, callbacks will be triggered
249             link.addLinkListener(this);
250
251             // create a job carrying out read requests
252             busJob = knxScheduler.scheduleWithFixedDelay(() -> readNextQueuedDatapoint(), 0, readingPause,
253                     TimeUnit.MILLISECONDS);
254
255             statusUpdateCallback.updateStatus(ThingStatus.ONLINE);
256             connectJob = null;
257
258             logger.info("Bridge {} connected to KNX bus", thingUID);
259
260             state = ClientState.RUNNING;
261             return true;
262         } catch (InterruptedException e) {
263             final var lastState = state;
264             state = ClientState.INTERRUPTED;
265
266             logger.trace("Bridge {}, connection interrupted", thingUID);
267
268             disconnect(e);
269             if (lastState != ClientState.DISPOSE) {
270                 scheduleReconnectJob();
271             }
272
273             return false;
274         } catch (KNXException | KnxSecureException e) {
275             logger.debug("Bridge {} cannot connect: {}", thingUID, e.getMessage());
276             disconnect(e);
277             scheduleReconnectJob();
278             return false;
279         } catch (KNXIllegalArgumentException e) {
280             logger.debug("Bridge {} cannot connect: {}", thingUID, e.getMessage());
281             disconnect(e, Optional.of(ThingStatusDetail.CONFIGURATION_ERROR));
282             return false;
283         }
284     }
285
286     private void disconnect(@Nullable Exception e) {
287         disconnect(e, Optional.empty());
288     }
289
290     private synchronized void disconnect(@Nullable Exception e, Optional<ThingStatusDetail> detail) {
291         releaseConnection();
292         if (e != null) {
293             statusUpdateCallback.updateStatus(ThingStatus.OFFLINE, detail.orElse(ThingStatusDetail.COMMUNICATION_ERROR),
294                     KNXTranslationProvider.I18N.getLocalizedException(e));
295         } else {
296             statusUpdateCallback.updateStatus(ThingStatus.OFFLINE);
297         }
298     }
299
300     @SuppressWarnings("null")
301     protected void releaseConnection() {
302         logger.debug("Bridge {} is disconnecting from KNX bus", thingUID);
303         var tmplink = link;
304         if (tmplink != null) {
305             link.removeLinkListener(this);
306         }
307         busJob = nullify(busJob, j -> j.cancel(true));
308         readDatapoints.clear();
309         responseCommunicator = nullify(responseCommunicator, rc -> {
310             rc.removeProcessListener(processListener);
311             rc.detach();
312         });
313         processCommunicator = nullify(processCommunicator, pc -> {
314             pc.removeProcessListener(processListener);
315             pc.detach();
316         });
317         deviceInfoClient = null;
318         managementClient = nullify(managementClient, mc -> mc.detach());
319         managementProcedures = nullify(managementProcedures, mp -> mp.detach());
320         link = nullify(link, l -> l.close());
321         logger.trace("Bridge {} disconnected from KNX bus", thingUID);
322     }
323
324     private <T> @Nullable T nullify(T target, @Nullable Consumer<T> lastWill) {
325         if (target != null && lastWill != null) {
326             lastWill.accept(target);
327         }
328         return null;
329     }
330
331     private void processEvent(String task, ProcessEvent event, ListenerNotification action) {
332         GroupAddress destination = event.getDestination();
333         IndividualAddress source = event.getSourceAddr();
334         byte[] asdu = event.getASDU();
335         logger.trace("Received a {} telegram from '{}' to '{}' with value '{}'", task, source, destination, asdu);
336         for (GroupAddressListener listener : groupAddressListeners) {
337             if (listener.listensTo(destination)) {
338                 knxScheduler.schedule(() -> action.apply(listener, source, destination, asdu), 0, TimeUnit.SECONDS);
339             }
340         }
341     }
342
343     /**
344      * Transforms a {@link Type} into a datapoint type value for the KNX bus.
345      *
346      * @param type the {@link Type} to transform
347      * @param dpt the datapoint type to which should be converted
348      * @return the corresponding KNX datapoint type value as a string
349      */
350     @Nullable
351     private String toDPTValue(Type type, String dpt) {
352         return typeHelper.toDPTValue(type, dpt);
353     }
354
355     // datapoint is null at end of the list, warning is misleading
356     @SuppressWarnings("null")
357     private void readNextQueuedDatapoint() {
358         if (!connectIfNotAutomatic()) {
359             return;
360         }
361         ProcessCommunicator processCommunicator = this.processCommunicator;
362         if (processCommunicator == null) {
363             return;
364         }
365         ReadDatapoint datapoint = readDatapoints.poll();
366         if (datapoint != null) {
367             datapoint.incrementRetries();
368             try {
369                 logger.trace("Sending a Group Read Request telegram for {}", datapoint.getDatapoint().getMainAddress());
370                 processCommunicator.read(datapoint.getDatapoint());
371             } catch (KNXException e) {
372                 // Note: KnxException does not cover KnxRuntimeException and subclasses KnxSecureException,
373                 // KnxIllegArgumentException
374                 if (datapoint.getRetries() < datapoint.getLimit()) {
375                     readDatapoints.add(datapoint);
376                     logger.debug("Could not read value for datapoint {}: {}. Going to retry.",
377                             datapoint.getDatapoint().getMainAddress(), e.getMessage());
378                 } else {
379                     logger.warn("Giving up reading datapoint {}, the number of maximum retries ({}) is reached.",
380                             datapoint.getDatapoint().getMainAddress(), datapoint.getLimit());
381                 }
382             } catch (InterruptedException | CancellationException e) {
383                 logger.debug("Interrupted sending KNX read request");
384                 return;
385             } catch (Exception e) {
386                 // Any other exception: Fail gracefully, i.e. notify user and continue reading next DP.
387                 // Not catching this would end the scheduled read for all DPs in case of an error.
388                 // Severity is warning as this is likely caused by a configuration error.
389                 logger.warn("Error reading datapoint {}: {}", datapoint.getDatapoint().getMainAddress(),
390                         e.getMessage());
391             }
392         }
393     }
394
395     public void dispose() {
396         state = ClientState.DISPOSE;
397
398         cancelReconnectJob();
399         disconnect(null);
400     }
401
402     @Override
403     public void linkClosed(@Nullable CloseEvent closeEvent) {
404         KNXNetworkLink link = this.link;
405         if (link == null || closeEvent == null) {
406             return;
407         }
408         if (!link.isOpen() && CloseEvent.USER_REQUEST != closeEvent.getInitiator()) {
409             final String reason = closeEvent.getReason();
410             statusUpdateCallback.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
411                     KNXTranslationProvider.I18N.get(reason));
412             logger.debug("KNX link has been lost (reason: {} on object {})", closeEvent.getReason(),
413                     closeEvent.getSource().toString());
414             scheduleReconnectJob();
415         }
416     }
417
418     @Override
419     public void indication(@Nullable FrameEvent e) {
420         // no-op
421     }
422
423     @Override
424     public void confirmation(@Nullable FrameEvent e) {
425         // no-op
426     }
427
428     @Override
429     public final synchronized boolean isReachable(@Nullable IndividualAddress address) throws KNXException {
430         ManagementProcedures managementProcedures = this.managementProcedures;
431         if (managementProcedures == null || address == null) {
432             return false;
433         }
434         try {
435             return managementProcedures.isAddressOccupied(address);
436         } catch (InterruptedException e) {
437             logger.debug("Interrupted pinging KNX device '{}'", address);
438         }
439         return false;
440     }
441
442     @Override
443     public final synchronized void restartNetworkDevice(@Nullable IndividualAddress address) {
444         ManagementClient managementClient = this.managementClient;
445         if (address == null || managementClient == null) {
446             return;
447         }
448         Destination destination = null;
449         try {
450             destination = managementClient.createDestination(address, true);
451             managementClient.restart(destination);
452         } catch (KNXException e) {
453             logger.warn("Could not reset device with address '{}': {}", address, e.getMessage());
454         } catch (InterruptedException e) { // ignored as in Calimero pre-2.4.0
455         } finally {
456             if (destination != null) {
457                 destination.destroy();
458             }
459         }
460     }
461
462     @Override
463     public void readDatapoint(Datapoint datapoint) {
464         synchronized (this) {
465             ReadDatapoint retryDatapoint = new ReadDatapoint(datapoint, readRetriesLimit);
466             if (!readDatapoints.contains(retryDatapoint)) {
467                 readDatapoints.add(retryDatapoint);
468             }
469         }
470     }
471
472     @Override
473     public final boolean registerGroupAddressListener(GroupAddressListener listener) {
474         return groupAddressListeners.add(listener);
475     }
476
477     @Override
478     public final boolean unregisterGroupAddressListener(GroupAddressListener listener) {
479         return groupAddressListeners.remove(listener);
480     }
481
482     @Override
483     public boolean isConnected() {
484         final var tmpLink = link;
485         return tmpLink != null && tmpLink.isOpen();
486     }
487
488     @Override
489     public DeviceInfoClient getDeviceInfoClient() {
490         DeviceInfoClient deviceInfoClient = this.deviceInfoClient;
491         if (deviceInfoClient != null) {
492             return deviceInfoClient;
493         } else {
494             throw new IllegalStateException();
495         }
496     }
497
498     @Override
499     public void writeToKNX(OutboundSpec commandSpec) throws KNXException {
500         ProcessCommunicator processCommunicator = this.processCommunicator;
501         KNXNetworkLink link = this.link;
502         if (processCommunicator == null || link == null) {
503             logger.debug("Cannot write to KNX bus (processCommuicator: {}, link: {})",
504                     processCommunicator == null ? "Not OK" : "OK",
505                     link == null ? "Not OK" : (link.isOpen() ? "Open" : "Closed"));
506             return;
507         }
508         GroupAddress groupAddress = commandSpec.getGroupAddress();
509
510         logger.trace("writeToKNX groupAddress '{}', commandSpec '{}'", groupAddress, commandSpec);
511
512         if (groupAddress != null) {
513             sendToKNX(processCommunicator, link, groupAddress, commandSpec.getDPT(), commandSpec.getType());
514         }
515     }
516
517     @Override
518     public void respondToKNX(OutboundSpec responseSpec) throws KNXException {
519         ProcessCommunicationResponder responseCommunicator = this.responseCommunicator;
520         KNXNetworkLink link = this.link;
521         if (responseCommunicator == null || link == null) {
522             logger.debug("Cannot write to KNX bus (responseCommunicator: {}, link: {})",
523                     responseCommunicator == null ? "Not OK" : "OK",
524                     link == null ? "Not OK" : (link.isOpen() ? "Open" : "Closed"));
525             return;
526         }
527         GroupAddress groupAddress = responseSpec.getGroupAddress();
528
529         logger.trace("respondToKNX groupAddress '{}', responseSpec '{}'", groupAddress, responseSpec);
530
531         if (groupAddress != null) {
532             sendToKNX(responseCommunicator, link, groupAddress, responseSpec.getDPT(), responseSpec.getType());
533         }
534     }
535
536     private void sendToKNX(ProcessCommunication communicator, KNXNetworkLink link, GroupAddress groupAddress,
537             String dpt, Type type) throws KNXException {
538         if (!connectIfNotAutomatic()) {
539             return;
540         }
541
542         Datapoint datapoint = new CommandDP(groupAddress, thingUID.toString(), 0, dpt);
543         String mappedValue = toDPTValue(type, dpt);
544
545         logger.trace("sendToKNX mappedValue: '{}' groupAddress: '{}'", mappedValue, groupAddress);
546
547         if (mappedValue == null) {
548             logger.debug("Value '{}' cannot be mapped to datapoint '{}'", type, datapoint);
549             return;
550         }
551         for (int i = 0; i < MAX_SEND_ATTEMPTS; i++) {
552             try {
553                 communicator.write(datapoint, mappedValue);
554                 logger.debug("Wrote value '{}' to datapoint '{}' ({}. attempt).", type, datapoint, i);
555                 break;
556             } catch (KNXException e) {
557                 if (i < MAX_SEND_ATTEMPTS - 1) {
558                     logger.debug("Value '{}' could not be sent to KNX bus using datapoint '{}': {}. Will retry.", type,
559                             datapoint, e.getLocalizedMessage());
560                 } else {
561                     logger.warn("Value '{}' could not be sent to KNX bus using datapoint '{}': {}. Giving up now.",
562                             type, datapoint, e.getLocalizedMessage());
563                     throw e;
564                 }
565             }
566         }
567     }
568 }