]> git.basschouten.com Git - openhab-addons.git/blob
00bce0df74e79c51af83ada3fb86db0e74a9ba20
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.knx.internal.client;
14
15 import java.util.Set;
16 import java.util.concurrent.CopyOnWriteArraySet;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.ScheduledFuture;
20 import java.util.concurrent.TimeUnit;
21 import java.util.function.Consumer;
22
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.binding.knx.internal.KNXTypeMapper;
26 import org.openhab.binding.knx.internal.dpt.KNXCoreTypeMapper;
27 import org.openhab.binding.knx.internal.handler.GroupAddressListener;
28 import org.openhab.core.thing.ThingStatus;
29 import org.openhab.core.thing.ThingStatusDetail;
30 import org.openhab.core.thing.ThingUID;
31 import org.openhab.core.types.Type;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 import tuwien.auto.calimero.CloseEvent;
36 import tuwien.auto.calimero.DetachEvent;
37 import tuwien.auto.calimero.FrameEvent;
38 import tuwien.auto.calimero.GroupAddress;
39 import tuwien.auto.calimero.IndividualAddress;
40 import tuwien.auto.calimero.KNXException;
41 import tuwien.auto.calimero.datapoint.CommandDP;
42 import tuwien.auto.calimero.datapoint.Datapoint;
43 import tuwien.auto.calimero.device.ProcessCommunicationResponder;
44 import tuwien.auto.calimero.link.KNXNetworkLink;
45 import tuwien.auto.calimero.link.NetworkLinkListener;
46 import tuwien.auto.calimero.mgmt.Destination;
47 import tuwien.auto.calimero.mgmt.ManagementClient;
48 import tuwien.auto.calimero.mgmt.ManagementClientImpl;
49 import tuwien.auto.calimero.mgmt.ManagementProcedures;
50 import tuwien.auto.calimero.mgmt.ManagementProceduresImpl;
51 import tuwien.auto.calimero.process.ProcessCommunicationBase;
52 import tuwien.auto.calimero.process.ProcessCommunicator;
53 import tuwien.auto.calimero.process.ProcessCommunicatorImpl;
54 import tuwien.auto.calimero.process.ProcessEvent;
55 import tuwien.auto.calimero.process.ProcessListener;
56
57 /**
58  * KNX Client which encapsulates the communication with the KNX bus via the calimero libary.
59  *
60  * @author Simon Kaufmann - initial contribution and API.
61  *
62  */
63 @NonNullByDefault
64 public abstract class AbstractKNXClient implements NetworkLinkListener, KNXClient {
65
66     private static final int MAX_SEND_ATTEMPTS = 2;
67
68     private final Logger logger = LoggerFactory.getLogger(AbstractKNXClient.class);
69     private final KNXTypeMapper typeHelper = new KNXCoreTypeMapper();
70
71     private final ThingUID thingUID;
72     private final int responseTimeout;
73     private final int readingPause;
74     private final int autoReconnectPeriod;
75     private final int readRetriesLimit;
76     private final StatusUpdateCallback statusUpdateCallback;
77     private final ScheduledExecutorService knxScheduler;
78
79     private @Nullable ProcessCommunicator processCommunicator;
80     private @Nullable ProcessCommunicationResponder responseCommunicator;
81     private @Nullable ManagementProcedures managementProcedures;
82     private @Nullable ManagementClient managementClient;
83     private @Nullable KNXNetworkLink link;
84     private @Nullable DeviceInfoClient deviceInfoClient;
85     private @Nullable ScheduledFuture<?> busJob;
86     private @Nullable ScheduledFuture<?> connectJob;
87
88     private final Set<GroupAddressListener> groupAddressListeners = new CopyOnWriteArraySet<>();
89     private final LinkedBlockingQueue<ReadDatapoint> readDatapoints = new LinkedBlockingQueue<>();
90
91     @FunctionalInterface
92     private interface ListenerNotification {
93         void apply(BusMessageListener listener, IndividualAddress source, GroupAddress destination, byte[] asdu);
94     }
95
96     @NonNullByDefault({})
97     private final ProcessListener processListener = new ProcessListener() {
98
99         @Override
100         public void detached(DetachEvent e) {
101             logger.debug("The KNX network link was detached from the process communicator");
102         }
103
104         @Override
105         public void groupWrite(ProcessEvent e) {
106             processEvent("Group Write", e, (listener, source, destination, asdu) -> {
107                 listener.onGroupWrite(AbstractKNXClient.this, source, destination, asdu);
108             });
109         }
110
111         @Override
112         public void groupReadRequest(ProcessEvent e) {
113             processEvent("Group Read Request", e, (listener, source, destination, asdu) -> {
114                 listener.onGroupRead(AbstractKNXClient.this, source, destination, asdu);
115             });
116         }
117
118         @Override
119         public void groupReadResponse(ProcessEvent e) {
120             processEvent("Group Read Response", e, (listener, source, destination, asdu) -> {
121                 listener.onGroupReadResponse(AbstractKNXClient.this, source, destination, asdu);
122             });
123         }
124     };
125
126     public AbstractKNXClient(int autoReconnectPeriod, ThingUID thingUID, int responseTimeout, int readingPause,
127             int readRetriesLimit, ScheduledExecutorService knxScheduler, StatusUpdateCallback statusUpdateCallback) {
128         this.autoReconnectPeriod = autoReconnectPeriod;
129         this.thingUID = thingUID;
130         this.responseTimeout = responseTimeout;
131         this.readingPause = readingPause;
132         this.readRetriesLimit = readRetriesLimit;
133         this.knxScheduler = knxScheduler;
134         this.statusUpdateCallback = statusUpdateCallback;
135     }
136
137     public void initialize() {
138         if (!scheduleReconnectJob()) {
139             connect();
140         }
141     }
142
143     private boolean scheduleReconnectJob() {
144         if (autoReconnectPeriod > 0) {
145             connectJob = knxScheduler.schedule(this::connect, autoReconnectPeriod, TimeUnit.SECONDS);
146             return true;
147         } else {
148             return false;
149         }
150     }
151
152     private void cancelReconnectJob() {
153         ScheduledFuture<?> currentReconnectJob = connectJob;
154         if (currentReconnectJob != null) {
155             currentReconnectJob.cancel(true);
156             connectJob = null;
157         }
158     }
159
160     protected abstract KNXNetworkLink establishConnection() throws KNXException, InterruptedException;
161
162     private synchronized boolean connectIfNotAutomatic() {
163         if (!isConnected()) {
164             return connectJob != null ? false : connect();
165         }
166         return true;
167     }
168
169     private synchronized boolean connect() {
170         if (isConnected()) {
171             return true;
172         }
173         try {
174             releaseConnection();
175
176             logger.debug("Bridge {} is connecting to the KNX bus", thingUID);
177
178             KNXNetworkLink link = establishConnection();
179             this.link = link;
180
181             managementProcedures = new ManagementProceduresImpl(link);
182
183             ManagementClient managementClient = new ManagementClientImpl(link);
184             managementClient.setResponseTimeout(responseTimeout);
185             this.managementClient = managementClient;
186
187             deviceInfoClient = new DeviceInfoClientImpl(managementClient);
188
189             ProcessCommunicator processCommunicator = new ProcessCommunicatorImpl(link);
190             processCommunicator.setResponseTimeout(responseTimeout);
191             processCommunicator.addProcessListener(processListener);
192             this.processCommunicator = processCommunicator;
193
194             ProcessCommunicationResponder responseCommunicator = new ProcessCommunicationResponder(link);
195             this.responseCommunicator = responseCommunicator;
196
197             link.addLinkListener(this);
198
199             busJob = knxScheduler.scheduleWithFixedDelay(() -> readNextQueuedDatapoint(), 0, readingPause,
200                     TimeUnit.MILLISECONDS);
201
202             statusUpdateCallback.updateStatus(ThingStatus.ONLINE);
203             connectJob = null;
204             return true;
205         } catch (KNXException | InterruptedException e) {
206             logger.debug("Error connecting to the bus: {}", e.getMessage(), e);
207             disconnect(e);
208             scheduleReconnectJob();
209             return false;
210         }
211     }
212
213     private void disconnect(@Nullable Exception e) {
214         releaseConnection();
215         if (e != null) {
216             String message = e.getLocalizedMessage();
217             statusUpdateCallback.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
218                     message != null ? message : "");
219         } else {
220             statusUpdateCallback.updateStatus(ThingStatus.OFFLINE);
221         }
222     }
223
224     @SuppressWarnings("null")
225     private void releaseConnection() {
226         logger.debug("Bridge {} is disconnecting from the KNX bus", thingUID);
227         readDatapoints.clear();
228         busJob = nullify(busJob, j -> j.cancel(true));
229         deviceInfoClient = null;
230         managementProcedures = nullify(managementProcedures, mp -> mp.detach());
231         managementClient = nullify(managementClient, mc -> mc.detach());
232         link = nullify(link, l -> l.close());
233         processCommunicator = nullify(processCommunicator, pc -> {
234             pc.removeProcessListener(processListener);
235             pc.detach();
236         });
237         responseCommunicator = nullify(responseCommunicator, rc -> {
238             rc.removeProcessListener(processListener);
239             rc.detach();
240         });
241     }
242
243     private <T> T nullify(T target, @Nullable Consumer<T> lastWill) {
244         if (target != null && lastWill != null) {
245             lastWill.accept(target);
246         }
247         return null;
248     }
249
250     private void processEvent(String task, ProcessEvent event, ListenerNotification action) {
251         GroupAddress destination = event.getDestination();
252         IndividualAddress source = event.getSourceAddr();
253         byte[] asdu = event.getASDU();
254         logger.trace("Received a {} telegram from '{}' to '{}' with value '{}'", task, source, destination, asdu);
255         for (GroupAddressListener listener : groupAddressListeners) {
256             if (listener.listensTo(destination)) {
257                 knxScheduler.schedule(() -> action.apply(listener, source, destination, asdu), 0, TimeUnit.SECONDS);
258             }
259         }
260     }
261
262     /**
263      * Transforms a {@link Type} into a datapoint type value for the KNX bus.
264      *
265      * @param type the {@link Type} to transform
266      * @param dpt the datapoint type to which should be converted
267      * @return the corresponding KNX datapoint type value as a string
268      */
269     @Nullable
270     private String toDPTValue(Type type, String dpt) {
271         return typeHelper.toDPTValue(type, dpt);
272     }
273
274     @SuppressWarnings("null")
275     private void readNextQueuedDatapoint() {
276         if (!connectIfNotAutomatic()) {
277             return;
278         }
279         ProcessCommunicator processCommunicator = this.processCommunicator;
280         if (processCommunicator == null) {
281             return;
282         }
283         ReadDatapoint datapoint = readDatapoints.poll();
284         if (datapoint != null) {
285             datapoint.incrementRetries();
286             try {
287                 logger.trace("Sending a Group Read Request telegram for {}", datapoint.getDatapoint().getMainAddress());
288                 processCommunicator.read(datapoint.getDatapoint());
289             } catch (KNXException e) {
290                 if (datapoint.getRetries() < datapoint.getLimit()) {
291                     readDatapoints.add(datapoint);
292                     logger.debug("Could not read value for datapoint {}: {}. Going to retry.",
293                             datapoint.getDatapoint().getMainAddress(), e.getMessage());
294                 } else {
295                     logger.warn("Giving up reading datapoint {}, the number of maximum retries ({}) is reached.",
296                             datapoint.getDatapoint().getMainAddress(), datapoint.getLimit());
297                 }
298             } catch (InterruptedException e) {
299                 logger.debug("Interrupted sending KNX read request");
300                 return;
301             }
302         }
303     }
304
305     public void dispose() {
306         cancelReconnectJob();
307         disconnect(null);
308     }
309
310     @Override
311     public void linkClosed(@Nullable CloseEvent closeEvent) {
312         KNXNetworkLink link = this.link;
313         if (link == null || closeEvent == null) {
314             return;
315         }
316         if (!link.isOpen() && CloseEvent.USER_REQUEST != closeEvent.getInitiator()) {
317             statusUpdateCallback.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
318                     closeEvent.getReason());
319             logger.debug("KNX link has been lost (reason: {} on object {})", closeEvent.getReason(),
320                     closeEvent.getSource().toString());
321             scheduleReconnectJob();
322         }
323     }
324
325     @Override
326     public void indication(@Nullable FrameEvent e) {
327         // no-op
328     }
329
330     @Override
331     public void confirmation(@Nullable FrameEvent e) {
332         // no-op
333     }
334
335     @Override
336     public final synchronized boolean isReachable(@Nullable IndividualAddress address) throws KNXException {
337         ManagementProcedures managementProcedures = this.managementProcedures;
338         if (managementProcedures == null || address == null) {
339             return false;
340         }
341         try {
342             return managementProcedures.isAddressOccupied(address);
343         } catch (InterruptedException e) {
344             logger.debug("Interrupted pinging KNX device '{}'", address);
345         }
346         return false;
347     }
348
349     @Override
350     public final synchronized void restartNetworkDevice(@Nullable IndividualAddress address) {
351         ManagementClient managementClient = this.managementClient;
352         if (address == null || managementClient == null) {
353             return;
354         }
355         Destination destination = null;
356         try {
357             destination = managementClient.createDestination(address, true);
358             managementClient.restart(destination);
359         } catch (KNXException e) {
360             logger.warn("Could not reset device with address '{}': {}", address, e.getMessage());
361         } catch (InterruptedException e) { // ignored as in Calimero pre-2.4.0
362         } finally {
363             if (destination != null) {
364                 destination.destroy();
365             }
366         }
367     }
368
369     @Override
370     public void readDatapoint(Datapoint datapoint) {
371         synchronized (this) {
372             ReadDatapoint retryDatapoint = new ReadDatapoint(datapoint, readRetriesLimit);
373             if (!readDatapoints.contains(retryDatapoint)) {
374                 readDatapoints.add(retryDatapoint);
375             }
376         }
377     }
378
379     @Override
380     public final boolean registerGroupAddressListener(GroupAddressListener listener) {
381         return groupAddressListeners.add(listener);
382     }
383
384     @Override
385     public final boolean unregisterGroupAddressListener(GroupAddressListener listener) {
386         return groupAddressListeners.remove(listener);
387     }
388
389     @Override
390     public boolean isConnected() {
391         return link != null && link.isOpen();
392     }
393
394     @Override
395     public DeviceInfoClient getDeviceInfoClient() {
396         DeviceInfoClient deviceInfoClient = this.deviceInfoClient;
397         if (deviceInfoClient != null) {
398             return deviceInfoClient;
399         } else {
400             throw new IllegalStateException();
401         }
402     }
403
404     @Override
405     public void writeToKNX(OutboundSpec commandSpec) throws KNXException {
406         ProcessCommunicator processCommunicator = this.processCommunicator;
407         KNXNetworkLink link = this.link;
408         if (processCommunicator == null || link == null) {
409             logger.debug("Cannot write to the KNX bus (processCommuicator: {}, link: {})",
410                     processCommunicator == null ? "Not OK" : "OK",
411                     link == null ? "Not OK" : (link.isOpen() ? "Open" : "Closed"));
412             return;
413         }
414         GroupAddress groupAddress = commandSpec.getGroupAddress();
415
416         logger.trace("writeToKNX groupAddress '{}', commandSpec '{}'", groupAddress, commandSpec);
417
418         if (groupAddress != null) {
419             sendToKNX(processCommunicator, link, groupAddress, commandSpec.getDPT(), commandSpec.getType());
420         }
421     }
422
423     @Override
424     public void respondToKNX(OutboundSpec responseSpec) throws KNXException {
425         ProcessCommunicationResponder responseCommunicator = this.responseCommunicator;
426         KNXNetworkLink link = this.link;
427         if (responseCommunicator == null || link == null) {
428             logger.debug("Cannot write to the KNX bus (responseCommunicator: {}, link: {})",
429                     responseCommunicator == null ? "Not OK" : "OK",
430                     link == null ? "Not OK" : (link.isOpen() ? "Open" : "Closed"));
431             return;
432         }
433         GroupAddress groupAddress = responseSpec.getGroupAddress();
434
435         logger.trace("respondToKNX groupAddress '{}', responseSpec '{}'", groupAddress, responseSpec);
436
437         if (groupAddress != null) {
438             sendToKNX(responseCommunicator, link, groupAddress, responseSpec.getDPT(), responseSpec.getType());
439         }
440     }
441
442     private void sendToKNX(ProcessCommunicationBase communicator, KNXNetworkLink link, GroupAddress groupAddress,
443             String dpt, Type type) throws KNXException {
444         if (!connectIfNotAutomatic()) {
445             return;
446         }
447
448         Datapoint datapoint = new CommandDP(groupAddress, thingUID.toString(), 0, dpt);
449         String mappedValue = toDPTValue(type, dpt);
450
451         logger.trace("sendToKNX mappedValue: '{}' groupAddress: '{}'", mappedValue, groupAddress);
452
453         if (mappedValue == null) {
454             logger.debug("Value '{}' cannot be mapped to datapoint '{}'", type, datapoint);
455             return;
456         }
457         for (int i = 0; i < MAX_SEND_ATTEMPTS; i++) {
458             try {
459                 communicator.write(datapoint, mappedValue);
460                 logger.debug("Wrote value '{}' to datapoint '{}' ({}. attempt).", type, datapoint, i);
461                 break;
462             } catch (KNXException e) {
463                 if (i < MAX_SEND_ATTEMPTS - 1) {
464                     logger.debug("Value '{}' could not be sent to the KNX bus using datapoint '{}': {}. Will retry.",
465                             type, datapoint, e.getLocalizedMessage());
466                 } else {
467                     logger.warn("Value '{}' could not be sent to the KNX bus using datapoint '{}': {}. Giving up now.",
468                             type, datapoint, e.getLocalizedMessage());
469                     throw e;
470                 }
471             }
472         }
473     }
474 }