]> git.basschouten.com Git - openhab-addons.git/blob
4027c6ba9bef01e7e8038b2dd1a0e707d5026805
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2022 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.knx.internal.client;
14
15 import java.time.Duration;
16 import java.util.Set;
17 import java.util.concurrent.CopyOnWriteArraySet;
18 import java.util.concurrent.LinkedBlockingQueue;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.TimeUnit;
22 import java.util.function.Consumer;
23
24 import org.eclipse.jdt.annotation.NonNullByDefault;
25 import org.eclipse.jdt.annotation.Nullable;
26 import org.openhab.binding.knx.internal.KNXTypeMapper;
27 import org.openhab.binding.knx.internal.dpt.KNXCoreTypeMapper;
28 import org.openhab.binding.knx.internal.handler.GroupAddressListener;
29 import org.openhab.core.thing.ThingStatus;
30 import org.openhab.core.thing.ThingStatusDetail;
31 import org.openhab.core.thing.ThingUID;
32 import org.openhab.core.types.Type;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 import tuwien.auto.calimero.CloseEvent;
37 import tuwien.auto.calimero.DetachEvent;
38 import tuwien.auto.calimero.FrameEvent;
39 import tuwien.auto.calimero.GroupAddress;
40 import tuwien.auto.calimero.IndividualAddress;
41 import tuwien.auto.calimero.KNXException;
42 import tuwien.auto.calimero.datapoint.CommandDP;
43 import tuwien.auto.calimero.datapoint.Datapoint;
44 import tuwien.auto.calimero.device.ProcessCommunicationResponder;
45 import tuwien.auto.calimero.link.KNXNetworkLink;
46 import tuwien.auto.calimero.link.NetworkLinkListener;
47 import tuwien.auto.calimero.mgmt.Destination;
48 import tuwien.auto.calimero.mgmt.ManagementClient;
49 import tuwien.auto.calimero.mgmt.ManagementClientImpl;
50 import tuwien.auto.calimero.mgmt.ManagementProcedures;
51 import tuwien.auto.calimero.mgmt.ManagementProceduresImpl;
52 import tuwien.auto.calimero.process.ProcessCommunication;
53 import tuwien.auto.calimero.process.ProcessCommunicator;
54 import tuwien.auto.calimero.process.ProcessCommunicatorImpl;
55 import tuwien.auto.calimero.process.ProcessEvent;
56 import tuwien.auto.calimero.process.ProcessListener;
57
58 /**
59  * KNX Client which encapsulates the communication with the KNX bus via the calimero libary.
60  *
61  * @author Simon Kaufmann - initial contribution and API.
62  *
63  */
64 @NonNullByDefault
65 public abstract class AbstractKNXClient implements NetworkLinkListener, KNXClient {
66
67     private static final int MAX_SEND_ATTEMPTS = 2;
68
69     private final Logger logger = LoggerFactory.getLogger(AbstractKNXClient.class);
70     private final KNXTypeMapper typeHelper = new KNXCoreTypeMapper();
71
72     private final ThingUID thingUID;
73     private final int responseTimeout;
74     private final int readingPause;
75     private final int autoReconnectPeriod;
76     private final int readRetriesLimit;
77     private final StatusUpdateCallback statusUpdateCallback;
78     private final ScheduledExecutorService knxScheduler;
79
80     private @Nullable ProcessCommunicator processCommunicator;
81     private @Nullable ProcessCommunicationResponder responseCommunicator;
82     private @Nullable ManagementProcedures managementProcedures;
83     private @Nullable ManagementClient managementClient;
84     private @Nullable KNXNetworkLink link;
85     private @Nullable DeviceInfoClient deviceInfoClient;
86     private @Nullable ScheduledFuture<?> busJob;
87     private @Nullable ScheduledFuture<?> connectJob;
88
89     private final Set<GroupAddressListener> groupAddressListeners = new CopyOnWriteArraySet<>();
90     private final LinkedBlockingQueue<ReadDatapoint> readDatapoints = new LinkedBlockingQueue<>();
91
92     @FunctionalInterface
93     private interface ListenerNotification {
94         void apply(BusMessageListener listener, IndividualAddress source, GroupAddress destination, byte[] asdu);
95     }
96
97     @NonNullByDefault({})
98     private final ProcessListener processListener = new ProcessListener() {
99
100         @Override
101         public void detached(DetachEvent e) {
102             logger.debug("The KNX network link was detached from the process communicator");
103         }
104
105         @Override
106         public void groupWrite(ProcessEvent e) {
107             processEvent("Group Write", e, (listener, source, destination, asdu) -> {
108                 listener.onGroupWrite(AbstractKNXClient.this, source, destination, asdu);
109             });
110         }
111
112         @Override
113         public void groupReadRequest(ProcessEvent e) {
114             processEvent("Group Read Request", e, (listener, source, destination, asdu) -> {
115                 listener.onGroupRead(AbstractKNXClient.this, source, destination, asdu);
116             });
117         }
118
119         @Override
120         public void groupReadResponse(ProcessEvent e) {
121             processEvent("Group Read Response", e, (listener, source, destination, asdu) -> {
122                 listener.onGroupReadResponse(AbstractKNXClient.this, source, destination, asdu);
123             });
124         }
125     };
126
127     public AbstractKNXClient(int autoReconnectPeriod, ThingUID thingUID, int responseTimeout, int readingPause,
128             int readRetriesLimit, ScheduledExecutorService knxScheduler, StatusUpdateCallback statusUpdateCallback) {
129         this.autoReconnectPeriod = autoReconnectPeriod;
130         this.thingUID = thingUID;
131         this.responseTimeout = responseTimeout;
132         this.readingPause = readingPause;
133         this.readRetriesLimit = readRetriesLimit;
134         this.knxScheduler = knxScheduler;
135         this.statusUpdateCallback = statusUpdateCallback;
136     }
137
138     public void initialize() {
139         if (!scheduleReconnectJob()) {
140             connect();
141         }
142     }
143
144     private boolean scheduleReconnectJob() {
145         if (autoReconnectPeriod > 0) {
146             connectJob = knxScheduler.schedule(this::connect, autoReconnectPeriod, TimeUnit.SECONDS);
147             return true;
148         } else {
149             return false;
150         }
151     }
152
153     private void cancelReconnectJob() {
154         ScheduledFuture<?> currentReconnectJob = connectJob;
155         if (currentReconnectJob != null) {
156             currentReconnectJob.cancel(true);
157             connectJob = null;
158         }
159     }
160
161     protected abstract KNXNetworkLink establishConnection() throws KNXException, InterruptedException;
162
163     private synchronized boolean connectIfNotAutomatic() {
164         if (!isConnected()) {
165             return connectJob != null ? false : connect();
166         }
167         return true;
168     }
169
170     private synchronized boolean connect() {
171         if (isConnected()) {
172             return true;
173         }
174         try {
175             releaseConnection();
176
177             logger.debug("Bridge {} is connecting to the KNX bus", thingUID);
178
179             KNXNetworkLink link = establishConnection();
180             this.link = link;
181
182             managementProcedures = new ManagementProceduresImpl(link);
183
184             ManagementClient managementClient = new ManagementClientImpl(link);
185             managementClient.responseTimeout(Duration.ofSeconds(responseTimeout));
186             this.managementClient = managementClient;
187
188             deviceInfoClient = new DeviceInfoClientImpl(managementClient);
189
190             ProcessCommunicator processCommunicator = new ProcessCommunicatorImpl(link);
191             processCommunicator.responseTimeout(Duration.ofSeconds(responseTimeout));
192             processCommunicator.addProcessListener(processListener);
193             this.processCommunicator = processCommunicator;
194
195             ProcessCommunicationResponder responseCommunicator = new ProcessCommunicationResponder(link, null);
196             this.responseCommunicator = responseCommunicator;
197
198             link.addLinkListener(this);
199
200             busJob = knxScheduler.scheduleWithFixedDelay(() -> readNextQueuedDatapoint(), 0, readingPause,
201                     TimeUnit.MILLISECONDS);
202
203             statusUpdateCallback.updateStatus(ThingStatus.ONLINE);
204             connectJob = null;
205             return true;
206         } catch (KNXException | InterruptedException e) {
207             logger.debug("Error connecting to the bus: {}", e.getMessage(), e);
208             disconnect(e);
209             scheduleReconnectJob();
210             return false;
211         }
212     }
213
214     private void disconnect(@Nullable Exception e) {
215         releaseConnection();
216         if (e != null) {
217             String message = e.getLocalizedMessage();
218             statusUpdateCallback.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
219                     message != null ? message : "");
220         } else {
221             statusUpdateCallback.updateStatus(ThingStatus.OFFLINE);
222         }
223     }
224
225     @SuppressWarnings("null")
226     private void releaseConnection() {
227         logger.debug("Bridge {} is disconnecting from the KNX bus", thingUID);
228         readDatapoints.clear();
229         busJob = nullify(busJob, j -> j.cancel(true));
230         deviceInfoClient = null;
231         managementProcedures = nullify(managementProcedures, mp -> mp.detach());
232         managementClient = nullify(managementClient, mc -> mc.detach());
233         link = nullify(link, l -> l.close());
234         processCommunicator = nullify(processCommunicator, pc -> {
235             pc.removeProcessListener(processListener);
236             pc.detach();
237         });
238         responseCommunicator = nullify(responseCommunicator, rc -> {
239             rc.removeProcessListener(processListener);
240             rc.detach();
241         });
242     }
243
244     private <T> T nullify(T target, @Nullable Consumer<T> lastWill) {
245         if (target != null && lastWill != null) {
246             lastWill.accept(target);
247         }
248         return null;
249     }
250
251     private void processEvent(String task, ProcessEvent event, ListenerNotification action) {
252         GroupAddress destination = event.getDestination();
253         IndividualAddress source = event.getSourceAddr();
254         byte[] asdu = event.getASDU();
255         logger.trace("Received a {} telegram from '{}' to '{}' with value '{}'", task, source, destination, asdu);
256         for (GroupAddressListener listener : groupAddressListeners) {
257             if (listener.listensTo(destination)) {
258                 knxScheduler.schedule(() -> action.apply(listener, source, destination, asdu), 0, TimeUnit.SECONDS);
259             }
260         }
261     }
262
263     /**
264      * Transforms a {@link Type} into a datapoint type value for the KNX bus.
265      *
266      * @param type the {@link Type} to transform
267      * @param dpt the datapoint type to which should be converted
268      * @return the corresponding KNX datapoint type value as a string
269      */
270     @Nullable
271     private String toDPTValue(Type type, String dpt) {
272         return typeHelper.toDPTValue(type, dpt);
273     }
274
275     @SuppressWarnings("null")
276     private void readNextQueuedDatapoint() {
277         if (!connectIfNotAutomatic()) {
278             return;
279         }
280         ProcessCommunicator processCommunicator = this.processCommunicator;
281         if (processCommunicator == null) {
282             return;
283         }
284         ReadDatapoint datapoint = readDatapoints.poll();
285         if (datapoint != null) {
286             datapoint.incrementRetries();
287             try {
288                 logger.trace("Sending a Group Read Request telegram for {}", datapoint.getDatapoint().getMainAddress());
289                 processCommunicator.read(datapoint.getDatapoint());
290             } catch (KNXException e) {
291                 if (datapoint.getRetries() < datapoint.getLimit()) {
292                     readDatapoints.add(datapoint);
293                     logger.debug("Could not read value for datapoint {}: {}. Going to retry.",
294                             datapoint.getDatapoint().getMainAddress(), e.getMessage());
295                 } else {
296                     logger.warn("Giving up reading datapoint {}, the number of maximum retries ({}) is reached.",
297                             datapoint.getDatapoint().getMainAddress(), datapoint.getLimit());
298                 }
299             } catch (InterruptedException e) {
300                 logger.debug("Interrupted sending KNX read request");
301                 return;
302             }
303         }
304     }
305
306     public void dispose() {
307         cancelReconnectJob();
308         disconnect(null);
309     }
310
311     @Override
312     public void linkClosed(@Nullable CloseEvent closeEvent) {
313         KNXNetworkLink link = this.link;
314         if (link == null || closeEvent == null) {
315             return;
316         }
317         if (!link.isOpen() && CloseEvent.USER_REQUEST != closeEvent.getInitiator()) {
318             statusUpdateCallback.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
319                     closeEvent.getReason());
320             logger.debug("KNX link has been lost (reason: {} on object {})", closeEvent.getReason(),
321                     closeEvent.getSource().toString());
322             scheduleReconnectJob();
323         }
324     }
325
326     @Override
327     public void indication(@Nullable FrameEvent e) {
328         // no-op
329     }
330
331     @Override
332     public void confirmation(@Nullable FrameEvent e) {
333         // no-op
334     }
335
336     @Override
337     public final synchronized boolean isReachable(@Nullable IndividualAddress address) throws KNXException {
338         ManagementProcedures managementProcedures = this.managementProcedures;
339         if (managementProcedures == null || address == null) {
340             return false;
341         }
342         try {
343             return managementProcedures.isAddressOccupied(address);
344         } catch (InterruptedException e) {
345             logger.debug("Interrupted pinging KNX device '{}'", address);
346         }
347         return false;
348     }
349
350     @Override
351     public final synchronized void restartNetworkDevice(@Nullable IndividualAddress address) {
352         ManagementClient managementClient = this.managementClient;
353         if (address == null || managementClient == null) {
354             return;
355         }
356         Destination destination = null;
357         try {
358             destination = managementClient.createDestination(address, true);
359             managementClient.restart(destination);
360         } catch (KNXException e) {
361             logger.warn("Could not reset device with address '{}': {}", address, e.getMessage());
362         } catch (InterruptedException e) { // ignored as in Calimero pre-2.4.0
363         } finally {
364             if (destination != null) {
365                 destination.destroy();
366             }
367         }
368     }
369
370     @Override
371     public void readDatapoint(Datapoint datapoint) {
372         synchronized (this) {
373             ReadDatapoint retryDatapoint = new ReadDatapoint(datapoint, readRetriesLimit);
374             if (!readDatapoints.contains(retryDatapoint)) {
375                 readDatapoints.add(retryDatapoint);
376             }
377         }
378     }
379
380     @Override
381     public final boolean registerGroupAddressListener(GroupAddressListener listener) {
382         return groupAddressListeners.add(listener);
383     }
384
385     @Override
386     public final boolean unregisterGroupAddressListener(GroupAddressListener listener) {
387         return groupAddressListeners.remove(listener);
388     }
389
390     @Override
391     public boolean isConnected() {
392         return link != null && link.isOpen();
393     }
394
395     @Override
396     public DeviceInfoClient getDeviceInfoClient() {
397         DeviceInfoClient deviceInfoClient = this.deviceInfoClient;
398         if (deviceInfoClient != null) {
399             return deviceInfoClient;
400         } else {
401             throw new IllegalStateException();
402         }
403     }
404
405     @Override
406     public void writeToKNX(OutboundSpec commandSpec) throws KNXException {
407         ProcessCommunicator processCommunicator = this.processCommunicator;
408         KNXNetworkLink link = this.link;
409         if (processCommunicator == null || link == null) {
410             logger.debug("Cannot write to the KNX bus (processCommuicator: {}, link: {})",
411                     processCommunicator == null ? "Not OK" : "OK",
412                     link == null ? "Not OK" : (link.isOpen() ? "Open" : "Closed"));
413             return;
414         }
415         GroupAddress groupAddress = commandSpec.getGroupAddress();
416
417         logger.trace("writeToKNX groupAddress '{}', commandSpec '{}'", groupAddress, commandSpec);
418
419         if (groupAddress != null) {
420             sendToKNX(processCommunicator, link, groupAddress, commandSpec.getDPT(), commandSpec.getType());
421         }
422     }
423
424     @Override
425     public void respondToKNX(OutboundSpec responseSpec) throws KNXException {
426         ProcessCommunicationResponder responseCommunicator = this.responseCommunicator;
427         KNXNetworkLink link = this.link;
428         if (responseCommunicator == null || link == null) {
429             logger.debug("Cannot write to the KNX bus (responseCommunicator: {}, link: {})",
430                     responseCommunicator == null ? "Not OK" : "OK",
431                     link == null ? "Not OK" : (link.isOpen() ? "Open" : "Closed"));
432             return;
433         }
434         GroupAddress groupAddress = responseSpec.getGroupAddress();
435
436         logger.trace("respondToKNX groupAddress '{}', responseSpec '{}'", groupAddress, responseSpec);
437
438         if (groupAddress != null) {
439             sendToKNX(responseCommunicator, link, groupAddress, responseSpec.getDPT(), responseSpec.getType());
440         }
441     }
442
443     private void sendToKNX(ProcessCommunication communicator, KNXNetworkLink link, GroupAddress groupAddress,
444             String dpt, Type type) throws KNXException {
445         if (!connectIfNotAutomatic()) {
446             return;
447         }
448
449         Datapoint datapoint = new CommandDP(groupAddress, thingUID.toString(), 0, dpt);
450         String mappedValue = toDPTValue(type, dpt);
451
452         logger.trace("sendToKNX mappedValue: '{}' groupAddress: '{}'", mappedValue, groupAddress);
453
454         if (mappedValue == null) {
455             logger.debug("Value '{}' cannot be mapped to datapoint '{}'", type, datapoint);
456             return;
457         }
458         for (int i = 0; i < MAX_SEND_ATTEMPTS; i++) {
459             try {
460                 communicator.write(datapoint, mappedValue);
461                 logger.debug("Wrote value '{}' to datapoint '{}' ({}. attempt).", type, datapoint, i);
462                 break;
463             } catch (KNXException e) {
464                 if (i < MAX_SEND_ATTEMPTS - 1) {
465                     logger.debug("Value '{}' could not be sent to the KNX bus using datapoint '{}': {}. Will retry.",
466                             type, datapoint, e.getLocalizedMessage());
467                 } else {
468                     logger.warn("Value '{}' could not be sent to the KNX bus using datapoint '{}': {}. Giving up now.",
469                             type, datapoint, e.getLocalizedMessage());
470                     throw e;
471                 }
472             }
473         }
474     }
475 }