]> git.basschouten.com Git - openhab-addons.git/blob
df3d42e34a58c48b21fdaefec7bd621719db413e
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.knx.internal.client;
14
15 import static org.openhab.binding.knx.internal.dpt.DPTUtil.NORMALIZED_DPT;
16
17 import java.time.Duration;
18 import java.util.Optional;
19 import java.util.Set;
20 import java.util.concurrent.CancellationException;
21 import java.util.concurrent.CopyOnWriteArraySet;
22 import java.util.concurrent.LinkedBlockingQueue;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import java.util.function.Consumer;
27
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.openhab.binding.knx.internal.dpt.ValueEncoder;
31 import org.openhab.binding.knx.internal.handler.GroupAddressListener;
32 import org.openhab.binding.knx.internal.i18n.KNXTranslationProvider;
33 import org.openhab.core.thing.ThingStatus;
34 import org.openhab.core.thing.ThingStatusDetail;
35 import org.openhab.core.thing.ThingUID;
36 import org.openhab.core.types.Type;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 import tuwien.auto.calimero.CloseEvent;
41 import tuwien.auto.calimero.DetachEvent;
42 import tuwien.auto.calimero.FrameEvent;
43 import tuwien.auto.calimero.GroupAddress;
44 import tuwien.auto.calimero.IndividualAddress;
45 import tuwien.auto.calimero.KNXException;
46 import tuwien.auto.calimero.KNXIllegalArgumentException;
47 import tuwien.auto.calimero.datapoint.CommandDP;
48 import tuwien.auto.calimero.datapoint.Datapoint;
49 import tuwien.auto.calimero.device.ProcessCommunicationResponder;
50 import tuwien.auto.calimero.link.KNXNetworkLink;
51 import tuwien.auto.calimero.link.NetworkLinkListener;
52 import tuwien.auto.calimero.mgmt.Destination;
53 import tuwien.auto.calimero.mgmt.ManagementClient;
54 import tuwien.auto.calimero.mgmt.ManagementClientImpl;
55 import tuwien.auto.calimero.mgmt.ManagementProcedures;
56 import tuwien.auto.calimero.mgmt.ManagementProceduresImpl;
57 import tuwien.auto.calimero.process.ProcessCommunication;
58 import tuwien.auto.calimero.process.ProcessCommunicator;
59 import tuwien.auto.calimero.process.ProcessCommunicatorImpl;
60 import tuwien.auto.calimero.process.ProcessEvent;
61 import tuwien.auto.calimero.process.ProcessListener;
62 import tuwien.auto.calimero.secure.KnxSecureException;
63 import tuwien.auto.calimero.secure.SecureApplicationLayer;
64 import tuwien.auto.calimero.secure.Security;
65
66 /**
67  * KNX Client which encapsulates the communication with the KNX bus via the calimero libary.
68  *
69  * @author Simon Kaufmann - initial contribution and API.
70  *
71  */
72 @NonNullByDefault
73 public abstract class AbstractKNXClient implements NetworkLinkListener, KNXClient {
74     public enum ClientState {
75         INIT,
76         RUNNING,
77         INTERRUPTED,
78         DISPOSE
79     }
80
81     private ClientState state = ClientState.INIT;
82
83     private static final int MAX_SEND_ATTEMPTS = 2;
84
85     private final Logger logger = LoggerFactory.getLogger(AbstractKNXClient.class);
86
87     private final ThingUID thingUID;
88     private final int responseTimeout;
89     private final int readingPause;
90     private final int autoReconnectPeriod;
91     private final int readRetriesLimit;
92     private final StatusUpdateCallback statusUpdateCallback;
93     private final ScheduledExecutorService knxScheduler;
94
95     private @Nullable ProcessCommunicator processCommunicator;
96     private @Nullable ProcessCommunicationResponder responseCommunicator;
97     private @Nullable ManagementProcedures managementProcedures;
98     private @Nullable ManagementClient managementClient;
99     private @Nullable KNXNetworkLink link;
100     private @Nullable DeviceInfoClient deviceInfoClient;
101     private @Nullable ScheduledFuture<?> busJob;
102     private @Nullable ScheduledFuture<?> connectJob;
103
104     private final Set<GroupAddressListener> groupAddressListeners = new CopyOnWriteArraySet<>();
105     private final LinkedBlockingQueue<ReadDatapoint> readDatapoints = new LinkedBlockingQueue<>();
106
107     @FunctionalInterface
108     private interface ListenerNotification {
109         void apply(BusMessageListener listener, IndividualAddress source, GroupAddress destination, byte[] asdu);
110     }
111
112     @NonNullByDefault({})
113     private final ProcessListener processListener = new ProcessListener() {
114
115         @Override
116         public void detached(DetachEvent e) {
117             logger.debug("The KNX network link was detached from the process communicator");
118         }
119
120         @Override
121         public void groupWrite(ProcessEvent e) {
122             processEvent("Group Write", e, (listener, source, destination, asdu) -> listener
123                     .onGroupWrite(AbstractKNXClient.this, source, destination, asdu));
124         }
125
126         @Override
127         public void groupReadRequest(ProcessEvent e) {
128             processEvent("Group Read Request", e, (listener, source, destination, asdu) -> listener
129                     .onGroupRead(AbstractKNXClient.this, source, destination, asdu));
130         }
131
132         @Override
133         public void groupReadResponse(ProcessEvent e) {
134             processEvent("Group Read Response", e, (listener, source, destination, asdu) -> listener
135                     .onGroupReadResponse(AbstractKNXClient.this, source, destination, asdu));
136         }
137     };
138
139     public AbstractKNXClient(int autoReconnectPeriod, ThingUID thingUID, int responseTimeout, int readingPause,
140             int readRetriesLimit, ScheduledExecutorService knxScheduler, StatusUpdateCallback statusUpdateCallback) {
141         this.autoReconnectPeriod = autoReconnectPeriod;
142         this.thingUID = thingUID;
143         this.responseTimeout = responseTimeout;
144         this.readingPause = readingPause;
145         this.readRetriesLimit = readRetriesLimit;
146         this.knxScheduler = knxScheduler;
147         this.statusUpdateCallback = statusUpdateCallback;
148     }
149
150     public void initialize() {
151         connect();
152     }
153
154     private void scheduleReconnectJob() {
155         if (autoReconnectPeriod > 0) {
156             // schedule connect job, for the first connection ignore autoReconnectPeriod and use 1 sec
157             final long reconnectDelayS = (state == ClientState.INIT) ? 1 : autoReconnectPeriod;
158             final String prefix = (state == ClientState.INIT) ? "re" : "";
159             logger.debug("Bridge {} scheduling {}connect in {}s", thingUID, prefix, reconnectDelayS);
160             connectJob = knxScheduler.schedule(this::connect, reconnectDelayS, TimeUnit.SECONDS);
161         }
162     }
163
164     private void cancelReconnectJob() {
165         final ScheduledFuture<?> currentReconnectJob = connectJob;
166         if (currentReconnectJob != null) {
167             currentReconnectJob.cancel(true);
168             connectJob = null;
169         }
170     }
171
172     protected abstract KNXNetworkLink establishConnection() throws KNXException, InterruptedException;
173
174     private synchronized boolean connectIfNotAutomatic() {
175         if (!isConnected()) {
176             return connectJob == null && connect();
177         }
178         return true;
179     }
180
181     private synchronized boolean connect() {
182         if (state == ClientState.INIT) {
183             state = ClientState.RUNNING;
184         } else if (state == ClientState.DISPOSE) {
185             logger.trace("connect() ignored, closing down");
186             return false;
187         }
188
189         if (isConnected()) {
190             return true;
191         }
192         try {
193             // We have a valid "connection" object, this is ensured by IPClient.java.
194             // "releaseConnection" is actually removing all registered users of this connection and stopping
195             // all threads.
196             // Note that this will also kill this function in the following call to sleep in case of a
197             // connection loss -> restart is via triggered via scheduledReconnect in handler for InterruptedException.
198             releaseConnection();
199             Thread.sleep(1000);
200             logger.debug("Bridge {} is connecting to KNX bus", thingUID);
201
202             // now establish (possibly encrypted) connection, according to settings (tunnel, routing, secure...)
203             KNXNetworkLink link = establishConnection();
204             this.link = link;
205
206             // ManagementProcedures provided by Calimero: allow managing other KNX devices, e.g. check if an address is
207             // reachable.
208             // Note for KNX Secure: ManagmentProcedueresImpl currently does not provide a ctor with external SAL,
209             // it internally creates an instance of ManagementClientImpl, which uses
210             // Security.defaultInstallation().deviceToolKeys()
211             // Protected ctor using given ManagementClientImpl is avalable (custom class to be inherited)
212             managementProcedures = new ManagementProceduresImpl(link);
213
214             // ManagementClient provided by Calimero: allow reading device info, etc.
215             // Note for KNX Secure: ManagementClientImpl does not provide a ctor with external SAL in Calimero 2.5,
216             // is uses global Security.defaultInstalltion().deviceToolKeys()
217             // Current main branch includes a protected ctor (custom class to be inherited)
218             // TODO Calimero>2.5: check if there is a new way to provide security info, there is a new protected ctor
219             // TODO check if we can avoid creating another ManagementClient and re-use this from ManagemntProcedures
220             ManagementClient managementClient = new ManagementClientImpl(link);
221             managementClient.responseTimeout(Duration.ofSeconds(responseTimeout));
222             this.managementClient = managementClient;
223
224             // OH helper for reading device info, based on managementClient above
225             deviceInfoClient = new DeviceInfoClientImpl(managementClient);
226
227             // ProcessCommunicator provides main KNX communication (Calimero).
228             // Note for KNX Secure: SAL to be provided
229             ProcessCommunicator processCommunicator = new ProcessCommunicatorImpl(link);
230             processCommunicator.responseTimeout(Duration.ofSeconds(responseTimeout));
231             processCommunicator.addProcessListener(processListener);
232             this.processCommunicator = processCommunicator;
233
234             // ProcessCommunicationResponder provides responses to requests from KNX bus (Calimero).
235             // Note for KNX Secure: SAL to be provided
236             this.responseCommunicator = new ProcessCommunicationResponder(link,
237                     new SecureApplicationLayer(link, Security.defaultInstallation()));
238
239             // register this class, callbacks will be triggered
240             link.addLinkListener(this);
241
242             // create a job carrying out read requests
243             busJob = knxScheduler.scheduleWithFixedDelay(this::readNextQueuedDatapoint, 0, readingPause,
244                     TimeUnit.MILLISECONDS);
245
246             statusUpdateCallback.updateStatus(ThingStatus.ONLINE);
247             connectJob = null;
248
249             logger.info("Bridge {} connected to KNX bus", thingUID);
250
251             state = ClientState.RUNNING;
252             return true;
253         } catch (InterruptedException e) {
254             final var lastState = state;
255             state = ClientState.INTERRUPTED;
256
257             logger.trace("Bridge {}, connection interrupted", thingUID);
258
259             disconnect(e);
260             if (lastState != ClientState.DISPOSE) {
261                 scheduleReconnectJob();
262             }
263
264             return false;
265         } catch (KNXException | KnxSecureException e) {
266             logger.debug("Bridge {} cannot connect: {}", thingUID, e.getMessage());
267             disconnect(e);
268             scheduleReconnectJob();
269             return false;
270         } catch (KNXIllegalArgumentException e) {
271             logger.debug("Bridge {} cannot connect: {}", thingUID, e.getMessage());
272             disconnect(e, Optional.of(ThingStatusDetail.CONFIGURATION_ERROR));
273             return false;
274         }
275     }
276
277     private void disconnect(@Nullable Exception e) {
278         disconnect(e, Optional.empty());
279     }
280
281     private synchronized void disconnect(@Nullable Exception e, Optional<ThingStatusDetail> detail) {
282         releaseConnection();
283         if (e != null) {
284             statusUpdateCallback.updateStatus(ThingStatus.OFFLINE, detail.orElse(ThingStatusDetail.COMMUNICATION_ERROR),
285                     KNXTranslationProvider.I18N.getLocalizedException(e));
286         } else {
287             statusUpdateCallback.updateStatus(ThingStatus.OFFLINE);
288         }
289     }
290
291     protected void releaseConnection() {
292         logger.debug("Bridge {} is disconnecting from KNX bus", thingUID);
293         var tmplink = link;
294         if (tmplink != null) {
295             tmplink.removeLinkListener(this);
296         }
297         busJob = nullify(busJob, j -> j.cancel(true));
298         readDatapoints.clear();
299         responseCommunicator = nullify(responseCommunicator, rc -> {
300             rc.removeProcessListener(processListener);
301             rc.detach();
302         });
303         processCommunicator = nullify(processCommunicator, pc -> {
304             pc.removeProcessListener(processListener);
305             pc.detach();
306         });
307         deviceInfoClient = null;
308         managementClient = nullify(managementClient, ManagementClient::detach);
309         managementProcedures = nullify(managementProcedures, ManagementProcedures::detach);
310         link = nullify(link, KNXNetworkLink::close);
311         logger.trace("Bridge {} disconnected from KNX bus", thingUID);
312     }
313
314     private <T> @Nullable T nullify(@Nullable T target, @Nullable Consumer<T> lastWill) {
315         if (target != null && lastWill != null) {
316             lastWill.accept(target);
317         }
318         return null;
319     }
320
321     private void processEvent(String task, ProcessEvent event, ListenerNotification action) {
322         GroupAddress destination = event.getDestination();
323         IndividualAddress source = event.getSourceAddr();
324         byte[] asdu = event.getASDU();
325         logger.trace("Received a {} telegram from '{}' to '{}' with value '{}'", task, source, destination, asdu);
326         for (GroupAddressListener listener : groupAddressListeners) {
327             if (listener.listensTo(destination)) {
328                 knxScheduler.schedule(() -> action.apply(listener, source, destination, asdu), 0, TimeUnit.SECONDS);
329             }
330         }
331     }
332
333     // datapoint is null at end of the list, warning is misleading
334     @SuppressWarnings("null")
335     private void readNextQueuedDatapoint() {
336         if (!connectIfNotAutomatic()) {
337             return;
338         }
339         ProcessCommunicator processCommunicator = this.processCommunicator;
340         if (processCommunicator == null) {
341             return;
342         }
343         ReadDatapoint datapoint = readDatapoints.poll();
344         if (datapoint != null) {
345             datapoint.incrementRetries();
346             try {
347                 logger.trace("Sending a Group Read Request telegram for {}", datapoint.getDatapoint().getMainAddress());
348                 processCommunicator.read(datapoint.getDatapoint());
349             } catch (KNXException e) {
350                 // Note: KnxException does not cover KnxRuntimeException and subclasses KnxSecureException,
351                 // KnxIllegArgumentException
352                 if (datapoint.getRetries() < datapoint.getLimit()) {
353                     readDatapoints.add(datapoint);
354                     logger.debug("Could not read value for datapoint {}: {}. Going to retry.",
355                             datapoint.getDatapoint().getMainAddress(), e.getMessage());
356                 } else {
357                     logger.warn("Giving up reading datapoint {}, the number of maximum retries ({}) is reached.",
358                             datapoint.getDatapoint().getMainAddress(), datapoint.getLimit());
359                 }
360             } catch (InterruptedException | CancellationException e) {
361                 logger.debug("Interrupted sending KNX read request");
362             } catch (Exception e) {
363                 // Any other exception: Fail gracefully, i.e. notify user and continue reading next DP.
364                 // Not catching this would end the scheduled read for all DPs in case of an error.
365                 // Severity is warning as this is likely caused by a configuration error.
366                 logger.warn("Error reading datapoint {}: {}", datapoint.getDatapoint().getMainAddress(),
367                         e.getMessage());
368             }
369         }
370     }
371
372     public void dispose() {
373         state = ClientState.DISPOSE;
374
375         cancelReconnectJob();
376         disconnect(null);
377     }
378
379     @Override
380     public void linkClosed(@Nullable CloseEvent closeEvent) {
381         KNXNetworkLink link = this.link;
382         if (link == null || closeEvent == null) {
383             return;
384         }
385         if (!link.isOpen() && CloseEvent.USER_REQUEST != closeEvent.getInitiator()) {
386             final String reason = closeEvent.getReason();
387             statusUpdateCallback.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
388                     KNXTranslationProvider.I18N.get(reason));
389             logger.debug("KNX link has been lost (reason: {} on object {})", closeEvent.getReason(),
390                     closeEvent.getSource().toString());
391             scheduleReconnectJob();
392         }
393     }
394
395     @Override
396     public void indication(@Nullable FrameEvent e) {
397         // no-op
398     }
399
400     @Override
401     public void confirmation(@Nullable FrameEvent e) {
402         // no-op
403     }
404
405     @Override
406     public final synchronized boolean isReachable(@Nullable IndividualAddress address) throws KNXException {
407         ManagementProcedures managementProcedures = this.managementProcedures;
408         if (managementProcedures == null || address == null) {
409             return false;
410         }
411         try {
412             return managementProcedures.isAddressOccupied(address);
413         } catch (InterruptedException e) {
414             logger.debug("Interrupted pinging KNX device '{}'", address);
415         }
416         return false;
417     }
418
419     @Override
420     public final synchronized void restartNetworkDevice(@Nullable IndividualAddress address) {
421         ManagementClient managementClient = this.managementClient;
422         if (address == null || managementClient == null) {
423             return;
424         }
425         Destination destination = null;
426         try {
427             destination = managementClient.createDestination(address, true);
428             managementClient.restart(destination);
429         } catch (KNXException e) {
430             logger.warn("Could not reset device with address '{}': {}", address, e.getMessage());
431         } catch (InterruptedException e) { // ignored as in Calimero pre-2.4.0
432         } finally {
433             if (destination != null) {
434                 destination.destroy();
435             }
436         }
437     }
438
439     @Override
440     public void readDatapoint(Datapoint datapoint) {
441         synchronized (this) {
442             ReadDatapoint retryDatapoint = new ReadDatapoint(datapoint, readRetriesLimit);
443             if (!readDatapoints.contains(retryDatapoint)) {
444                 readDatapoints.add(retryDatapoint);
445             }
446         }
447     }
448
449     @Override
450     public final void registerGroupAddressListener(GroupAddressListener listener) {
451         groupAddressListeners.add(listener);
452     }
453
454     @Override
455     public final void unregisterGroupAddressListener(GroupAddressListener listener) {
456         groupAddressListeners.remove(listener);
457     }
458
459     @Override
460     public boolean isConnected() {
461         final var tmpLink = link;
462         return tmpLink != null && tmpLink.isOpen();
463     }
464
465     @Override
466     public DeviceInfoClient getDeviceInfoClient() {
467         DeviceInfoClient deviceInfoClient = this.deviceInfoClient;
468         if (deviceInfoClient != null) {
469             return deviceInfoClient;
470         } else {
471             throw new IllegalStateException();
472         }
473     }
474
475     @Override
476     public void writeToKNX(OutboundSpec commandSpec) throws KNXException {
477         ProcessCommunicator processCommunicator = this.processCommunicator;
478         KNXNetworkLink link = this.link;
479         if (processCommunicator == null || link == null) {
480             logger.debug("Cannot write to KNX bus (processCommunicator: {}, link: {})",
481                     processCommunicator == null ? "Not OK" : "OK",
482                     link == null ? "Not OK" : (link.isOpen() ? "Open" : "Closed"));
483             return;
484         }
485         GroupAddress groupAddress = commandSpec.getGroupAddress();
486
487         logger.trace("writeToKNX groupAddress '{}', commandSpec '{}'", groupAddress, commandSpec);
488
489         sendToKNX(processCommunicator, groupAddress, commandSpec.getDPT(), commandSpec.getValue());
490     }
491
492     @Override
493     public void respondToKNX(OutboundSpec responseSpec) throws KNXException {
494         ProcessCommunicationResponder responseCommunicator = this.responseCommunicator;
495         KNXNetworkLink link = this.link;
496         if (responseCommunicator == null || link == null) {
497             logger.debug("Cannot write to KNX bus (responseCommunicator: {}, link: {})",
498                     responseCommunicator == null ? "Not OK" : "OK",
499                     link == null ? "Not OK" : (link.isOpen() ? "Open" : "Closed"));
500             return;
501         }
502         GroupAddress groupAddress = responseSpec.getGroupAddress();
503
504         logger.trace("respondToKNX groupAddress '{}', responseSpec '{}'", groupAddress, responseSpec);
505
506         sendToKNX(responseCommunicator, groupAddress, responseSpec.getDPT(), responseSpec.getValue());
507     }
508
509     private void sendToKNX(ProcessCommunication communicator, GroupAddress groupAddress, String dpt, Type type)
510             throws KNXException {
511         if (!connectIfNotAutomatic()) {
512             return;
513         }
514
515         Datapoint datapoint = new CommandDP(groupAddress, thingUID.toString(), 0,
516                 NORMALIZED_DPT.getOrDefault(dpt, dpt));
517         String mappedValue = ValueEncoder.encode(type, dpt);
518         if (mappedValue == null) {
519             logger.debug("Value '{}' of type '{}' cannot be mapped to datapoint '{}'", type, type.getClass(),
520                     datapoint);
521             return;
522         }
523         logger.trace("sendToKNX mappedValue: '{}' groupAddress: '{}'", mappedValue, groupAddress);
524
525         for (int i = 0;; i++) {
526             try {
527                 communicator.write(datapoint, mappedValue);
528                 logger.debug("Wrote value '{}' to datapoint '{}' ({}. attempt).", type, datapoint, i);
529                 break;
530             } catch (KNXException e) {
531                 if (i < MAX_SEND_ATTEMPTS - 1) {
532                     logger.debug("Value '{}' could not be sent to KNX bus using datapoint '{}': {}. Will retry.", type,
533                             datapoint, e.getLocalizedMessage());
534                 } else {
535                     logger.warn("Value '{}' could not be sent to KNX bus using datapoint '{}': {}. Giving up now.",
536                             type, datapoint, e.getLocalizedMessage());
537                     throw e;
538                 }
539             }
540         }
541     }
542 }