]> git.basschouten.com Git - openhab-addons.git/blob
695edb277edfea3ced5148c377fbc3528cfbe272
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2022 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.knx.internal.client;
14
15 import java.time.Duration;
16 import java.util.Set;
17 import java.util.concurrent.CopyOnWriteArraySet;
18 import java.util.concurrent.LinkedBlockingQueue;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.TimeUnit;
22 import java.util.function.Consumer;
23
24 import org.eclipse.jdt.annotation.NonNullByDefault;
25 import org.eclipse.jdt.annotation.Nullable;
26 import org.openhab.binding.knx.internal.KNXTypeMapper;
27 import org.openhab.binding.knx.internal.dpt.KNXCoreTypeMapper;
28 import org.openhab.binding.knx.internal.handler.GroupAddressListener;
29 import org.openhab.core.thing.ThingStatus;
30 import org.openhab.core.thing.ThingStatusDetail;
31 import org.openhab.core.thing.ThingUID;
32 import org.openhab.core.types.Type;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 import tuwien.auto.calimero.CloseEvent;
37 import tuwien.auto.calimero.DetachEvent;
38 import tuwien.auto.calimero.FrameEvent;
39 import tuwien.auto.calimero.GroupAddress;
40 import tuwien.auto.calimero.IndividualAddress;
41 import tuwien.auto.calimero.KNXException;
42 import tuwien.auto.calimero.datapoint.CommandDP;
43 import tuwien.auto.calimero.datapoint.Datapoint;
44 import tuwien.auto.calimero.device.ProcessCommunicationResponder;
45 import tuwien.auto.calimero.link.KNXNetworkLink;
46 import tuwien.auto.calimero.link.NetworkLinkListener;
47 import tuwien.auto.calimero.mgmt.Destination;
48 import tuwien.auto.calimero.mgmt.ManagementClient;
49 import tuwien.auto.calimero.mgmt.ManagementClientImpl;
50 import tuwien.auto.calimero.mgmt.ManagementProcedures;
51 import tuwien.auto.calimero.mgmt.ManagementProceduresImpl;
52 import tuwien.auto.calimero.process.ProcessCommunication;
53 import tuwien.auto.calimero.process.ProcessCommunicator;
54 import tuwien.auto.calimero.process.ProcessCommunicatorImpl;
55 import tuwien.auto.calimero.process.ProcessEvent;
56 import tuwien.auto.calimero.process.ProcessListener;
57 import tuwien.auto.calimero.secure.SecureApplicationLayer;
58 import tuwien.auto.calimero.secure.Security;
59
60 /**
61  * KNX Client which encapsulates the communication with the KNX bus via the calimero libary.
62  *
63  * @author Simon Kaufmann - initial contribution and API.
64  *
65  */
66 @NonNullByDefault
67 public abstract class AbstractKNXClient implements NetworkLinkListener, KNXClient {
68
69     private static final int MAX_SEND_ATTEMPTS = 2;
70
71     private final Logger logger = LoggerFactory.getLogger(AbstractKNXClient.class);
72     private final KNXTypeMapper typeHelper = new KNXCoreTypeMapper();
73
74     private final ThingUID thingUID;
75     private final int responseTimeout;
76     private final int readingPause;
77     private final int autoReconnectPeriod;
78     private final int readRetriesLimit;
79     private final StatusUpdateCallback statusUpdateCallback;
80     private final ScheduledExecutorService knxScheduler;
81
82     private @Nullable ProcessCommunicator processCommunicator;
83     private @Nullable ProcessCommunicationResponder responseCommunicator;
84     private @Nullable ManagementProcedures managementProcedures;
85     private @Nullable ManagementClient managementClient;
86     private @Nullable KNXNetworkLink link;
87     private @Nullable DeviceInfoClient deviceInfoClient;
88     private @Nullable ScheduledFuture<?> busJob;
89     private @Nullable ScheduledFuture<?> connectJob;
90
91     private final Set<GroupAddressListener> groupAddressListeners = new CopyOnWriteArraySet<>();
92     private final LinkedBlockingQueue<ReadDatapoint> readDatapoints = new LinkedBlockingQueue<>();
93
94     @FunctionalInterface
95     private interface ListenerNotification {
96         void apply(BusMessageListener listener, IndividualAddress source, GroupAddress destination, byte[] asdu);
97     }
98
99     @NonNullByDefault({})
100     private final ProcessListener processListener = new ProcessListener() {
101
102         @Override
103         public void detached(DetachEvent e) {
104             logger.debug("The KNX network link was detached from the process communicator");
105         }
106
107         @Override
108         public void groupWrite(ProcessEvent e) {
109             processEvent("Group Write", e, (listener, source, destination, asdu) -> {
110                 listener.onGroupWrite(AbstractKNXClient.this, source, destination, asdu);
111             });
112         }
113
114         @Override
115         public void groupReadRequest(ProcessEvent e) {
116             processEvent("Group Read Request", e, (listener, source, destination, asdu) -> {
117                 listener.onGroupRead(AbstractKNXClient.this, source, destination, asdu);
118             });
119         }
120
121         @Override
122         public void groupReadResponse(ProcessEvent e) {
123             processEvent("Group Read Response", e, (listener, source, destination, asdu) -> {
124                 listener.onGroupReadResponse(AbstractKNXClient.this, source, destination, asdu);
125             });
126         }
127     };
128
129     public AbstractKNXClient(int autoReconnectPeriod, ThingUID thingUID, int responseTimeout, int readingPause,
130             int readRetriesLimit, ScheduledExecutorService knxScheduler, StatusUpdateCallback statusUpdateCallback) {
131         this.autoReconnectPeriod = autoReconnectPeriod;
132         this.thingUID = thingUID;
133         this.responseTimeout = responseTimeout;
134         this.readingPause = readingPause;
135         this.readRetriesLimit = readRetriesLimit;
136         this.knxScheduler = knxScheduler;
137         this.statusUpdateCallback = statusUpdateCallback;
138     }
139
140     public void initialize() {
141         if (!scheduleReconnectJob()) {
142             connect();
143         }
144     }
145
146     private boolean scheduleReconnectJob() {
147         if (autoReconnectPeriod > 0) {
148             connectJob = knxScheduler.schedule(this::connect, autoReconnectPeriod, TimeUnit.SECONDS);
149             return true;
150         } else {
151             return false;
152         }
153     }
154
155     private void cancelReconnectJob() {
156         ScheduledFuture<?> currentReconnectJob = connectJob;
157         if (currentReconnectJob != null) {
158             currentReconnectJob.cancel(true);
159             connectJob = null;
160         }
161     }
162
163     protected abstract KNXNetworkLink establishConnection() throws KNXException, InterruptedException;
164
165     private synchronized boolean connectIfNotAutomatic() {
166         if (!isConnected()) {
167             return connectJob != null ? false : connect();
168         }
169         return true;
170     }
171
172     private synchronized boolean connect() {
173         if (isConnected()) {
174             return true;
175         }
176         try {
177             releaseConnection();
178
179             logger.debug("Bridge {} is connecting to the KNX bus", thingUID);
180
181             KNXNetworkLink link = establishConnection();
182             this.link = link;
183
184             managementProcedures = new ManagementProceduresImpl(link);
185
186             ManagementClient managementClient = new ManagementClientImpl(link);
187             managementClient.responseTimeout(Duration.ofSeconds(responseTimeout));
188             this.managementClient = managementClient;
189
190             deviceInfoClient = new DeviceInfoClientImpl(managementClient);
191
192             ProcessCommunicator processCommunicator = new ProcessCommunicatorImpl(link);
193             processCommunicator.responseTimeout(Duration.ofSeconds(responseTimeout));
194             processCommunicator.addProcessListener(processListener);
195             this.processCommunicator = processCommunicator;
196
197             ProcessCommunicationResponder responseCommunicator = new ProcessCommunicationResponder(link,
198                     new SecureApplicationLayer(link, Security.defaultInstallation()));
199             this.responseCommunicator = responseCommunicator;
200
201             link.addLinkListener(this);
202
203             busJob = knxScheduler.scheduleWithFixedDelay(() -> readNextQueuedDatapoint(), 0, readingPause,
204                     TimeUnit.MILLISECONDS);
205
206             statusUpdateCallback.updateStatus(ThingStatus.ONLINE);
207             connectJob = null;
208             return true;
209         } catch (KNXException | InterruptedException e) {
210             logger.debug("Error connecting to the bus: {}", e.getMessage(), e);
211             disconnect(e);
212             scheduleReconnectJob();
213             return false;
214         }
215     }
216
217     private void disconnect(@Nullable Exception e) {
218         releaseConnection();
219         if (e != null) {
220             String message = e.getLocalizedMessage();
221             statusUpdateCallback.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
222                     message != null ? message : "");
223         } else {
224             statusUpdateCallback.updateStatus(ThingStatus.OFFLINE);
225         }
226     }
227
228     @SuppressWarnings("null")
229     private void releaseConnection() {
230         logger.debug("Bridge {} is disconnecting from the KNX bus", thingUID);
231         readDatapoints.clear();
232         busJob = nullify(busJob, j -> j.cancel(true));
233         deviceInfoClient = null;
234         managementProcedures = nullify(managementProcedures, mp -> mp.detach());
235         managementClient = nullify(managementClient, mc -> mc.detach());
236         link = nullify(link, l -> l.close());
237         processCommunicator = nullify(processCommunicator, pc -> {
238             pc.removeProcessListener(processListener);
239             pc.detach();
240         });
241         responseCommunicator = nullify(responseCommunicator, rc -> {
242             rc.removeProcessListener(processListener);
243             rc.detach();
244         });
245     }
246
247     private <T> T nullify(T target, @Nullable Consumer<T> lastWill) {
248         if (target != null && lastWill != null) {
249             lastWill.accept(target);
250         }
251         return null;
252     }
253
254     private void processEvent(String task, ProcessEvent event, ListenerNotification action) {
255         GroupAddress destination = event.getDestination();
256         IndividualAddress source = event.getSourceAddr();
257         byte[] asdu = event.getASDU();
258         logger.trace("Received a {} telegram from '{}' to '{}' with value '{}'", task, source, destination, asdu);
259         for (GroupAddressListener listener : groupAddressListeners) {
260             if (listener.listensTo(destination)) {
261                 knxScheduler.schedule(() -> action.apply(listener, source, destination, asdu), 0, TimeUnit.SECONDS);
262             }
263         }
264     }
265
266     /**
267      * Transforms a {@link Type} into a datapoint type value for the KNX bus.
268      *
269      * @param type the {@link Type} to transform
270      * @param dpt the datapoint type to which should be converted
271      * @return the corresponding KNX datapoint type value as a string
272      */
273     @Nullable
274     private String toDPTValue(Type type, String dpt) {
275         return typeHelper.toDPTValue(type, dpt);
276     }
277
278     @SuppressWarnings("null")
279     private void readNextQueuedDatapoint() {
280         if (!connectIfNotAutomatic()) {
281             return;
282         }
283         ProcessCommunicator processCommunicator = this.processCommunicator;
284         if (processCommunicator == null) {
285             return;
286         }
287         ReadDatapoint datapoint = readDatapoints.poll();
288         if (datapoint != null) {
289             datapoint.incrementRetries();
290             try {
291                 logger.trace("Sending a Group Read Request telegram for {}", datapoint.getDatapoint().getMainAddress());
292                 processCommunicator.read(datapoint.getDatapoint());
293             } catch (KNXException e) {
294                 if (datapoint.getRetries() < datapoint.getLimit()) {
295                     readDatapoints.add(datapoint);
296                     logger.debug("Could not read value for datapoint {}: {}. Going to retry.",
297                             datapoint.getDatapoint().getMainAddress(), e.getMessage());
298                 } else {
299                     logger.warn("Giving up reading datapoint {}, the number of maximum retries ({}) is reached.",
300                             datapoint.getDatapoint().getMainAddress(), datapoint.getLimit());
301                 }
302             } catch (InterruptedException e) {
303                 logger.debug("Interrupted sending KNX read request");
304                 return;
305             }
306         }
307     }
308
309     public void dispose() {
310         cancelReconnectJob();
311         disconnect(null);
312     }
313
314     @Override
315     public void linkClosed(@Nullable CloseEvent closeEvent) {
316         KNXNetworkLink link = this.link;
317         if (link == null || closeEvent == null) {
318             return;
319         }
320         if (!link.isOpen() && CloseEvent.USER_REQUEST != closeEvent.getInitiator()) {
321             statusUpdateCallback.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
322                     closeEvent.getReason());
323             logger.debug("KNX link has been lost (reason: {} on object {})", closeEvent.getReason(),
324                     closeEvent.getSource().toString());
325             scheduleReconnectJob();
326         }
327     }
328
329     @Override
330     public void indication(@Nullable FrameEvent e) {
331         // no-op
332     }
333
334     @Override
335     public void confirmation(@Nullable FrameEvent e) {
336         // no-op
337     }
338
339     @Override
340     public final synchronized boolean isReachable(@Nullable IndividualAddress address) throws KNXException {
341         ManagementProcedures managementProcedures = this.managementProcedures;
342         if (managementProcedures == null || address == null) {
343             return false;
344         }
345         try {
346             return managementProcedures.isAddressOccupied(address);
347         } catch (InterruptedException e) {
348             logger.debug("Interrupted pinging KNX device '{}'", address);
349         }
350         return false;
351     }
352
353     @Override
354     public final synchronized void restartNetworkDevice(@Nullable IndividualAddress address) {
355         ManagementClient managementClient = this.managementClient;
356         if (address == null || managementClient == null) {
357             return;
358         }
359         Destination destination = null;
360         try {
361             destination = managementClient.createDestination(address, true);
362             managementClient.restart(destination);
363         } catch (KNXException e) {
364             logger.warn("Could not reset device with address '{}': {}", address, e.getMessage());
365         } catch (InterruptedException e) { // ignored as in Calimero pre-2.4.0
366         } finally {
367             if (destination != null) {
368                 destination.destroy();
369             }
370         }
371     }
372
373     @Override
374     public void readDatapoint(Datapoint datapoint) {
375         synchronized (this) {
376             ReadDatapoint retryDatapoint = new ReadDatapoint(datapoint, readRetriesLimit);
377             if (!readDatapoints.contains(retryDatapoint)) {
378                 readDatapoints.add(retryDatapoint);
379             }
380         }
381     }
382
383     @Override
384     public final boolean registerGroupAddressListener(GroupAddressListener listener) {
385         return groupAddressListeners.add(listener);
386     }
387
388     @Override
389     public final boolean unregisterGroupAddressListener(GroupAddressListener listener) {
390         return groupAddressListeners.remove(listener);
391     }
392
393     @Override
394     public boolean isConnected() {
395         return link != null && link.isOpen();
396     }
397
398     @Override
399     public DeviceInfoClient getDeviceInfoClient() {
400         DeviceInfoClient deviceInfoClient = this.deviceInfoClient;
401         if (deviceInfoClient != null) {
402             return deviceInfoClient;
403         } else {
404             throw new IllegalStateException();
405         }
406     }
407
408     @Override
409     public void writeToKNX(OutboundSpec commandSpec) throws KNXException {
410         ProcessCommunicator processCommunicator = this.processCommunicator;
411         KNXNetworkLink link = this.link;
412         if (processCommunicator == null || link == null) {
413             logger.debug("Cannot write to the KNX bus (processCommuicator: {}, link: {})",
414                     processCommunicator == null ? "Not OK" : "OK",
415                     link == null ? "Not OK" : (link.isOpen() ? "Open" : "Closed"));
416             return;
417         }
418         GroupAddress groupAddress = commandSpec.getGroupAddress();
419
420         logger.trace("writeToKNX groupAddress '{}', commandSpec '{}'", groupAddress, commandSpec);
421
422         if (groupAddress != null) {
423             sendToKNX(processCommunicator, link, groupAddress, commandSpec.getDPT(), commandSpec.getType());
424         }
425     }
426
427     @Override
428     public void respondToKNX(OutboundSpec responseSpec) throws KNXException {
429         ProcessCommunicationResponder responseCommunicator = this.responseCommunicator;
430         KNXNetworkLink link = this.link;
431         if (responseCommunicator == null || link == null) {
432             logger.debug("Cannot write to the KNX bus (responseCommunicator: {}, link: {})",
433                     responseCommunicator == null ? "Not OK" : "OK",
434                     link == null ? "Not OK" : (link.isOpen() ? "Open" : "Closed"));
435             return;
436         }
437         GroupAddress groupAddress = responseSpec.getGroupAddress();
438
439         logger.trace("respondToKNX groupAddress '{}', responseSpec '{}'", groupAddress, responseSpec);
440
441         if (groupAddress != null) {
442             sendToKNX(responseCommunicator, link, groupAddress, responseSpec.getDPT(), responseSpec.getType());
443         }
444     }
445
446     private void sendToKNX(ProcessCommunication communicator, KNXNetworkLink link, GroupAddress groupAddress,
447             String dpt, Type type) throws KNXException {
448         if (!connectIfNotAutomatic()) {
449             return;
450         }
451
452         Datapoint datapoint = new CommandDP(groupAddress, thingUID.toString(), 0, dpt);
453         String mappedValue = toDPTValue(type, dpt);
454
455         logger.trace("sendToKNX mappedValue: '{}' groupAddress: '{}'", mappedValue, groupAddress);
456
457         if (mappedValue == null) {
458             logger.debug("Value '{}' cannot be mapped to datapoint '{}'", type, datapoint);
459             return;
460         }
461         for (int i = 0; i < MAX_SEND_ATTEMPTS; i++) {
462             try {
463                 communicator.write(datapoint, mappedValue);
464                 logger.debug("Wrote value '{}' to datapoint '{}' ({}. attempt).", type, datapoint, i);
465                 break;
466             } catch (KNXException e) {
467                 if (i < MAX_SEND_ATTEMPTS - 1) {
468                     logger.debug("Value '{}' could not be sent to the KNX bus using datapoint '{}': {}. Will retry.",
469                             type, datapoint, e.getLocalizedMessage());
470                 } else {
471                     logger.warn("Value '{}' could not be sent to the KNX bus using datapoint '{}': {}. Giving up now.",
472                             type, datapoint, e.getLocalizedMessage());
473                     throw e;
474                 }
475             }
476         }
477     }
478 }