]> git.basschouten.com Git - openhab-addons.git/blob
ecc905fd68e0561de3442e6b8e019617cfb24217
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2022 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.knx.internal.client;
14
15 import java.time.Duration;
16 import java.util.Set;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.CopyOnWriteArraySet;
19 import java.util.concurrent.LinkedBlockingQueue;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.ScheduledFuture;
22 import java.util.concurrent.TimeUnit;
23 import java.util.function.Consumer;
24
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.openhab.binding.knx.internal.KNXTypeMapper;
28 import org.openhab.binding.knx.internal.dpt.KNXCoreTypeMapper;
29 import org.openhab.binding.knx.internal.handler.GroupAddressListener;
30 import org.openhab.core.thing.ThingStatus;
31 import org.openhab.core.thing.ThingStatusDetail;
32 import org.openhab.core.thing.ThingUID;
33 import org.openhab.core.types.Type;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import tuwien.auto.calimero.CloseEvent;
38 import tuwien.auto.calimero.DetachEvent;
39 import tuwien.auto.calimero.FrameEvent;
40 import tuwien.auto.calimero.GroupAddress;
41 import tuwien.auto.calimero.IndividualAddress;
42 import tuwien.auto.calimero.KNXException;
43 import tuwien.auto.calimero.datapoint.CommandDP;
44 import tuwien.auto.calimero.datapoint.Datapoint;
45 import tuwien.auto.calimero.device.ProcessCommunicationResponder;
46 import tuwien.auto.calimero.link.KNXNetworkLink;
47 import tuwien.auto.calimero.link.NetworkLinkListener;
48 import tuwien.auto.calimero.mgmt.Destination;
49 import tuwien.auto.calimero.mgmt.ManagementClient;
50 import tuwien.auto.calimero.mgmt.ManagementClientImpl;
51 import tuwien.auto.calimero.mgmt.ManagementProcedures;
52 import tuwien.auto.calimero.mgmt.ManagementProceduresImpl;
53 import tuwien.auto.calimero.process.ProcessCommunication;
54 import tuwien.auto.calimero.process.ProcessCommunicator;
55 import tuwien.auto.calimero.process.ProcessCommunicatorImpl;
56 import tuwien.auto.calimero.process.ProcessEvent;
57 import tuwien.auto.calimero.process.ProcessListener;
58 import tuwien.auto.calimero.secure.SecureApplicationLayer;
59 import tuwien.auto.calimero.secure.Security;
60
61 /**
62  * KNX Client which encapsulates the communication with the KNX bus via the calimero libary.
63  *
64  * @author Simon Kaufmann - initial contribution and API.
65  *
66  */
67 @NonNullByDefault
68 public abstract class AbstractKNXClient implements NetworkLinkListener, KNXClient {
69
70     private static final int MAX_SEND_ATTEMPTS = 2;
71
72     private final Logger logger = LoggerFactory.getLogger(AbstractKNXClient.class);
73     private final KNXTypeMapper typeHelper = new KNXCoreTypeMapper();
74
75     private final ThingUID thingUID;
76     private final int responseTimeout;
77     private final int readingPause;
78     private final int autoReconnectPeriod;
79     private final int readRetriesLimit;
80     private final StatusUpdateCallback statusUpdateCallback;
81     private final ScheduledExecutorService knxScheduler;
82
83     private @Nullable ProcessCommunicator processCommunicator;
84     private @Nullable ProcessCommunicationResponder responseCommunicator;
85     private @Nullable ManagementProcedures managementProcedures;
86     private @Nullable ManagementClient managementClient;
87     private @Nullable KNXNetworkLink link;
88     private @Nullable DeviceInfoClient deviceInfoClient;
89     private @Nullable ScheduledFuture<?> busJob;
90     private @Nullable ScheduledFuture<?> connectJob;
91
92     private final Set<GroupAddressListener> groupAddressListeners = new CopyOnWriteArraySet<>();
93     private final LinkedBlockingQueue<ReadDatapoint> readDatapoints = new LinkedBlockingQueue<>();
94
95     @FunctionalInterface
96     private interface ListenerNotification {
97         void apply(BusMessageListener listener, IndividualAddress source, GroupAddress destination, byte[] asdu);
98     }
99
100     @NonNullByDefault({})
101     private final ProcessListener processListener = new ProcessListener() {
102
103         @Override
104         public void detached(DetachEvent e) {
105             logger.debug("The KNX network link was detached from the process communicator");
106         }
107
108         @Override
109         public void groupWrite(ProcessEvent e) {
110             processEvent("Group Write", e, (listener, source, destination, asdu) -> {
111                 listener.onGroupWrite(AbstractKNXClient.this, source, destination, asdu);
112             });
113         }
114
115         @Override
116         public void groupReadRequest(ProcessEvent e) {
117             processEvent("Group Read Request", e, (listener, source, destination, asdu) -> {
118                 listener.onGroupRead(AbstractKNXClient.this, source, destination, asdu);
119             });
120         }
121
122         @Override
123         public void groupReadResponse(ProcessEvent e) {
124             processEvent("Group Read Response", e, (listener, source, destination, asdu) -> {
125                 listener.onGroupReadResponse(AbstractKNXClient.this, source, destination, asdu);
126             });
127         }
128     };
129
130     public AbstractKNXClient(int autoReconnectPeriod, ThingUID thingUID, int responseTimeout, int readingPause,
131             int readRetriesLimit, ScheduledExecutorService knxScheduler, StatusUpdateCallback statusUpdateCallback) {
132         this.autoReconnectPeriod = autoReconnectPeriod;
133         this.thingUID = thingUID;
134         this.responseTimeout = responseTimeout;
135         this.readingPause = readingPause;
136         this.readRetriesLimit = readRetriesLimit;
137         this.knxScheduler = knxScheduler;
138         this.statusUpdateCallback = statusUpdateCallback;
139     }
140
141     public void initialize() {
142         if (!scheduleReconnectJob()) {
143             connect();
144         }
145     }
146
147     private boolean scheduleReconnectJob() {
148         if (autoReconnectPeriod > 0) {
149             connectJob = knxScheduler.schedule(this::connect, autoReconnectPeriod, TimeUnit.SECONDS);
150             return true;
151         } else {
152             return false;
153         }
154     }
155
156     private void cancelReconnectJob() {
157         ScheduledFuture<?> currentReconnectJob = connectJob;
158         if (currentReconnectJob != null) {
159             currentReconnectJob.cancel(true);
160             connectJob = null;
161         }
162     }
163
164     protected abstract KNXNetworkLink establishConnection() throws KNXException, InterruptedException;
165
166     private synchronized boolean connectIfNotAutomatic() {
167         if (!isConnected()) {
168             return connectJob != null ? false : connect();
169         }
170         return true;
171     }
172
173     private synchronized boolean connect() {
174         if (isConnected()) {
175             return true;
176         }
177         try {
178             releaseConnection();
179
180             logger.debug("Bridge {} is connecting to the KNX bus", thingUID);
181
182             KNXNetworkLink link = establishConnection();
183             this.link = link;
184
185             managementProcedures = new ManagementProceduresImpl(link);
186
187             ManagementClient managementClient = new ManagementClientImpl(link);
188             managementClient.responseTimeout(Duration.ofSeconds(responseTimeout));
189             this.managementClient = managementClient;
190
191             deviceInfoClient = new DeviceInfoClientImpl(managementClient);
192
193             ProcessCommunicator processCommunicator = new ProcessCommunicatorImpl(link);
194             processCommunicator.responseTimeout(Duration.ofSeconds(responseTimeout));
195             processCommunicator.addProcessListener(processListener);
196             this.processCommunicator = processCommunicator;
197
198             ProcessCommunicationResponder responseCommunicator = new ProcessCommunicationResponder(link,
199                     new SecureApplicationLayer(link, Security.defaultInstallation()));
200             this.responseCommunicator = responseCommunicator;
201
202             link.addLinkListener(this);
203
204             busJob = knxScheduler.scheduleWithFixedDelay(() -> readNextQueuedDatapoint(), 0, readingPause,
205                     TimeUnit.MILLISECONDS);
206
207             statusUpdateCallback.updateStatus(ThingStatus.ONLINE);
208             connectJob = null;
209             return true;
210         } catch (KNXException | InterruptedException e) {
211             logger.debug("Error connecting to the bus: {}", e.getMessage(), e);
212             disconnect(e);
213             scheduleReconnectJob();
214             return false;
215         }
216     }
217
218     private void disconnect(@Nullable Exception e) {
219         releaseConnection();
220         if (e != null) {
221             String message = e.getLocalizedMessage();
222             statusUpdateCallback.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
223                     message != null ? message : "");
224         } else {
225             statusUpdateCallback.updateStatus(ThingStatus.OFFLINE);
226         }
227     }
228
229     @SuppressWarnings("null")
230     private void releaseConnection() {
231         logger.debug("Bridge {} is disconnecting from the KNX bus", thingUID);
232         readDatapoints.clear();
233         busJob = nullify(busJob, j -> j.cancel(true));
234         deviceInfoClient = null;
235         managementProcedures = nullify(managementProcedures, mp -> mp.detach());
236         managementClient = nullify(managementClient, mc -> mc.detach());
237         link = nullify(link, l -> l.close());
238         processCommunicator = nullify(processCommunicator, pc -> {
239             pc.removeProcessListener(processListener);
240             pc.detach();
241         });
242         responseCommunicator = nullify(responseCommunicator, rc -> {
243             rc.removeProcessListener(processListener);
244             rc.detach();
245         });
246     }
247
248     private <T> @Nullable T nullify(T target, @Nullable Consumer<T> lastWill) {
249         if (target != null && lastWill != null) {
250             lastWill.accept(target);
251         }
252         return null;
253     }
254
255     private void processEvent(String task, ProcessEvent event, ListenerNotification action) {
256         GroupAddress destination = event.getDestination();
257         IndividualAddress source = event.getSourceAddr();
258         byte[] asdu = event.getASDU();
259         logger.trace("Received a {} telegram from '{}' to '{}' with value '{}'", task, source, destination, asdu);
260         for (GroupAddressListener listener : groupAddressListeners) {
261             if (listener.listensTo(destination)) {
262                 knxScheduler.schedule(() -> action.apply(listener, source, destination, asdu), 0, TimeUnit.SECONDS);
263             }
264         }
265     }
266
267     /**
268      * Transforms a {@link Type} into a datapoint type value for the KNX bus.
269      *
270      * @param type the {@link Type} to transform
271      * @param dpt the datapoint type to which should be converted
272      * @return the corresponding KNX datapoint type value as a string
273      */
274     @Nullable
275     private String toDPTValue(Type type, String dpt) {
276         return typeHelper.toDPTValue(type, dpt);
277     }
278
279     @SuppressWarnings("null")
280     private void readNextQueuedDatapoint() {
281         if (!connectIfNotAutomatic()) {
282             return;
283         }
284         ProcessCommunicator processCommunicator = this.processCommunicator;
285         if (processCommunicator == null) {
286             return;
287         }
288         ReadDatapoint datapoint = readDatapoints.poll();
289         if (datapoint != null) {
290             datapoint.incrementRetries();
291             try {
292                 logger.trace("Sending a Group Read Request telegram for {}", datapoint.getDatapoint().getMainAddress());
293                 processCommunicator.read(datapoint.getDatapoint());
294             } catch (KNXException e) {
295                 // Note: KnxException does not cover KnxRuntimeException and subclasses KnxSecureException,
296                 // KnxIllegArgumentException
297                 if (datapoint.getRetries() < datapoint.getLimit()) {
298                     readDatapoints.add(datapoint);
299                     logger.debug("Could not read value for datapoint {}: {}. Going to retry.",
300                             datapoint.getDatapoint().getMainAddress(), e.getMessage());
301                 } else {
302                     logger.warn("Giving up reading datapoint {}, the number of maximum retries ({}) is reached.",
303                             datapoint.getDatapoint().getMainAddress(), datapoint.getLimit());
304                 }
305             } catch (InterruptedException | CancellationException e) {
306                 logger.debug("Interrupted sending KNX read request");
307                 return;
308             } catch (Exception e) {
309                 // Any other exception: Fail gracefully, i.e. notify user and continue reading next DP.
310                 // Not catching this would end the scheduled read for all DPs in case of an error.
311                 // Severity is warning as this is likely caused by a configuration error.
312                 logger.warn("Error reading datapoint {}: {}", datapoint.getDatapoint().getMainAddress(),
313                         e.getMessage());
314             }
315         }
316     }
317
318     public void dispose() {
319         cancelReconnectJob();
320         disconnect(null);
321     }
322
323     @Override
324     public void linkClosed(@Nullable CloseEvent closeEvent) {
325         KNXNetworkLink link = this.link;
326         if (link == null || closeEvent == null) {
327             return;
328         }
329         if (!link.isOpen() && CloseEvent.USER_REQUEST != closeEvent.getInitiator()) {
330             statusUpdateCallback.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
331                     closeEvent.getReason());
332             logger.debug("KNX link has been lost (reason: {} on object {})", closeEvent.getReason(),
333                     closeEvent.getSource().toString());
334             scheduleReconnectJob();
335         }
336     }
337
338     @Override
339     public void indication(@Nullable FrameEvent e) {
340         // no-op
341     }
342
343     @Override
344     public void confirmation(@Nullable FrameEvent e) {
345         // no-op
346     }
347
348     @Override
349     public final synchronized boolean isReachable(@Nullable IndividualAddress address) throws KNXException {
350         ManagementProcedures managementProcedures = this.managementProcedures;
351         if (managementProcedures == null || address == null) {
352             return false;
353         }
354         try {
355             return managementProcedures.isAddressOccupied(address);
356         } catch (InterruptedException e) {
357             logger.debug("Interrupted pinging KNX device '{}'", address);
358         }
359         return false;
360     }
361
362     @Override
363     public final synchronized void restartNetworkDevice(@Nullable IndividualAddress address) {
364         ManagementClient managementClient = this.managementClient;
365         if (address == null || managementClient == null) {
366             return;
367         }
368         Destination destination = null;
369         try {
370             destination = managementClient.createDestination(address, true);
371             managementClient.restart(destination);
372         } catch (KNXException e) {
373             logger.warn("Could not reset device with address '{}': {}", address, e.getMessage());
374         } catch (InterruptedException e) { // ignored as in Calimero pre-2.4.0
375         } finally {
376             if (destination != null) {
377                 destination.destroy();
378             }
379         }
380     }
381
382     @Override
383     public void readDatapoint(Datapoint datapoint) {
384         synchronized (this) {
385             ReadDatapoint retryDatapoint = new ReadDatapoint(datapoint, readRetriesLimit);
386             if (!readDatapoints.contains(retryDatapoint)) {
387                 readDatapoints.add(retryDatapoint);
388             }
389         }
390     }
391
392     @Override
393     public final boolean registerGroupAddressListener(GroupAddressListener listener) {
394         return groupAddressListeners.add(listener);
395     }
396
397     @Override
398     public final boolean unregisterGroupAddressListener(GroupAddressListener listener) {
399         return groupAddressListeners.remove(listener);
400     }
401
402     @Override
403     public boolean isConnected() {
404         final var tmpLink = link;
405         return tmpLink != null && tmpLink.isOpen();
406     }
407
408     @Override
409     public DeviceInfoClient getDeviceInfoClient() {
410         DeviceInfoClient deviceInfoClient = this.deviceInfoClient;
411         if (deviceInfoClient != null) {
412             return deviceInfoClient;
413         } else {
414             throw new IllegalStateException();
415         }
416     }
417
418     @Override
419     public void writeToKNX(OutboundSpec commandSpec) throws KNXException {
420         ProcessCommunicator processCommunicator = this.processCommunicator;
421         KNXNetworkLink link = this.link;
422         if (processCommunicator == null || link == null) {
423             logger.debug("Cannot write to the KNX bus (processCommuicator: {}, link: {})",
424                     processCommunicator == null ? "Not OK" : "OK",
425                     link == null ? "Not OK" : (link.isOpen() ? "Open" : "Closed"));
426             return;
427         }
428         GroupAddress groupAddress = commandSpec.getGroupAddress();
429
430         logger.trace("writeToKNX groupAddress '{}', commandSpec '{}'", groupAddress, commandSpec);
431
432         if (groupAddress != null) {
433             sendToKNX(processCommunicator, link, groupAddress, commandSpec.getDPT(), commandSpec.getType());
434         }
435     }
436
437     @Override
438     public void respondToKNX(OutboundSpec responseSpec) throws KNXException {
439         ProcessCommunicationResponder responseCommunicator = this.responseCommunicator;
440         KNXNetworkLink link = this.link;
441         if (responseCommunicator == null || link == null) {
442             logger.debug("Cannot write to the KNX bus (responseCommunicator: {}, link: {})",
443                     responseCommunicator == null ? "Not OK" : "OK",
444                     link == null ? "Not OK" : (link.isOpen() ? "Open" : "Closed"));
445             return;
446         }
447         GroupAddress groupAddress = responseSpec.getGroupAddress();
448
449         logger.trace("respondToKNX groupAddress '{}', responseSpec '{}'", groupAddress, responseSpec);
450
451         if (groupAddress != null) {
452             sendToKNX(responseCommunicator, link, groupAddress, responseSpec.getDPT(), responseSpec.getType());
453         }
454     }
455
456     private void sendToKNX(ProcessCommunication communicator, KNXNetworkLink link, GroupAddress groupAddress,
457             String dpt, Type type) throws KNXException {
458         if (!connectIfNotAutomatic()) {
459             return;
460         }
461
462         Datapoint datapoint = new CommandDP(groupAddress, thingUID.toString(), 0, dpt);
463         String mappedValue = toDPTValue(type, dpt);
464
465         logger.trace("sendToKNX mappedValue: '{}' groupAddress: '{}'", mappedValue, groupAddress);
466
467         if (mappedValue == null) {
468             logger.debug("Value '{}' cannot be mapped to datapoint '{}'", type, datapoint);
469             return;
470         }
471         for (int i = 0; i < MAX_SEND_ATTEMPTS; i++) {
472             try {
473                 communicator.write(datapoint, mappedValue);
474                 logger.debug("Wrote value '{}' to datapoint '{}' ({}. attempt).", type, datapoint, i);
475                 break;
476             } catch (KNXException e) {
477                 if (i < MAX_SEND_ATTEMPTS - 1) {
478                     logger.debug("Value '{}' could not be sent to the KNX bus using datapoint '{}': {}. Will retry.",
479                             type, datapoint, e.getLocalizedMessage());
480                 } else {
481                     logger.warn("Value '{}' could not be sent to the KNX bus using datapoint '{}': {}. Giving up now.",
482                             type, datapoint, e.getLocalizedMessage());
483                     throw e;
484                 }
485             }
486         }
487     }
488 }