]> git.basschouten.com Git - openhab-addons.git/blob
07b2eda11658a060d45704fce2a723383561e75d
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.enocean.internal.transceiver;
14
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.OutputStream;
18 import java.util.HashMap;
19 import java.util.HashSet;
20 import java.util.Map;
21 import java.util.Queue;
22 import java.util.TooManyListenersException;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.TimeUnit;
27
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.openhab.binding.enocean.internal.EnOceanBindingConstants;
31 import org.openhab.binding.enocean.internal.EnOceanException;
32 import org.openhab.binding.enocean.internal.Helper;
33 import org.openhab.binding.enocean.internal.messages.BasePacket;
34 import org.openhab.binding.enocean.internal.messages.BasePacket.ESPPacketType;
35 import org.openhab.binding.enocean.internal.messages.ERP1Message;
36 import org.openhab.binding.enocean.internal.messages.ERP1Message.RORG;
37 import org.openhab.binding.enocean.internal.messages.EventMessage;
38 import org.openhab.binding.enocean.internal.messages.EventMessage.EventMessageType;
39 import org.openhab.binding.enocean.internal.messages.Response;
40 import org.openhab.core.io.transport.serial.PortInUseException;
41 import org.openhab.core.io.transport.serial.SerialPort;
42 import org.openhab.core.io.transport.serial.SerialPortEvent;
43 import org.openhab.core.io.transport.serial.SerialPortEventListener;
44 import org.openhab.core.io.transport.serial.SerialPortIdentifier;
45 import org.openhab.core.io.transport.serial.SerialPortManager;
46 import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
47 import org.openhab.core.util.HexUtils;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 /**
52  *
53  * @author Daniel Weber - Initial contribution
54  */
55 @NonNullByDefault
56 public abstract class EnOceanTransceiver implements SerialPortEventListener {
57
58     public static final int ENOCEAN_MAX_DATA = 65790;
59
60     // Thread management
61     protected @Nullable Future<?> readingTask = null;
62     private @Nullable Future<?> timeOutTask = null;
63
64     protected Logger logger = LoggerFactory.getLogger(EnOceanTransceiver.class);
65
66     private @Nullable SerialPortManager serialPortManager;
67     private static final int ENOCEAN_DEFAULT_BAUD = 57600;
68     protected String path;
69     private @Nullable SerialPort serialPort;
70
71     class Request {
72         @Nullable
73         BasePacket requestPacket;
74         @Nullable
75         Response responsePacket;
76         @Nullable
77         ResponseListener<? extends @Nullable Response> responseListener;
78     }
79
80     private class RequestQueue {
81         private Queue<Request> queue = new LinkedBlockingQueue<>();
82         private ScheduledExecutorService scheduler;
83
84         public RequestQueue(ScheduledExecutorService scheduler) {
85             this.scheduler = scheduler;
86         }
87
88         public synchronized void enqueRequest(Request request) throws IOException {
89             boolean wasEmpty = queue.isEmpty();
90
91             if (queue.offer(request)) {
92                 if (wasEmpty) {
93                     send();
94                 }
95             } else {
96                 logger.error("Transmit queue overflow. Lost message: {}", request);
97             }
98         }
99
100         private synchronized void sendNext() throws IOException {
101             queue.poll();
102             send();
103         }
104
105         private synchronized void send() throws IOException {
106             if (!queue.isEmpty()) {
107                 currentRequest = queue.peek();
108                 try {
109                     Request localCurrentRequest = currentRequest;
110                     if (localCurrentRequest != null && localCurrentRequest.requestPacket != null) {
111                         synchronized (localCurrentRequest) {
112                             BasePacket rqPacket = localCurrentRequest.requestPacket;
113                             if (currentRequest != null && rqPacket != null) {
114                                 logger.debug("Sending data, type {}, payload {}{}", rqPacket.getPacketType().name(),
115                                         HexUtils.bytesToHex(rqPacket.getPayload()),
116                                         HexUtils.bytesToHex(rqPacket.getOptionalPayload()));
117                                 byte[] b = serializePacket(rqPacket);
118                                 logger.trace("Sending raw data: {}", HexUtils.bytesToHex(b));
119                                 OutputStream localOutPutStream = outputStream;
120                                 if (localOutPutStream != null) {
121                                     localOutPutStream.write(b);
122                                     localOutPutStream.flush();
123                                 }
124                                 Future<?> localTimeOutTask = timeOutTask;
125                                 if (localTimeOutTask != null) {
126                                     localTimeOutTask.cancel(true);
127                                 }
128
129                                 // slowdown sending of message to avoid hickups at receivers
130                                 // Todo tweak sending intervall (250 ist just a first try)
131                                 timeOutTask = scheduler.schedule(() -> {
132                                     try {
133                                         sendNext();
134                                     } catch (IOException e) {
135                                         logger.trace("Unable to process message", e);
136                                         TransceiverErrorListener localListener = errorListener;
137                                         if (localListener != null) {
138                                             localListener.errorOccured(e);
139                                         }
140                                         return;
141                                     }
142                                 }, 250, TimeUnit.MILLISECONDS);
143                             }
144                         }
145                     } else {
146                         sendNext();
147                     }
148                 } catch (EnOceanException e) {
149                     logger.error("exception while sending data", e);
150                 }
151             }
152         }
153     }
154
155     RequestQueue requestQueue;
156     @Nullable
157     Request currentRequest = null;
158
159     protected Map<Long, HashSet<PacketListener>> listeners;
160     protected HashSet<EventListener> eventListeners;
161     protected @Nullable TeachInListener teachInListener;
162
163     protected @Nullable InputStream inputStream;
164     protected @Nullable OutputStream outputStream;
165
166     private byte[] filteredDeviceId = new byte[0];
167     @Nullable
168     TransceiverErrorListener errorListener;
169
170     public EnOceanTransceiver(String path, TransceiverErrorListener errorListener, ScheduledExecutorService scheduler,
171             @Nullable SerialPortManager serialPortManager) {
172         requestQueue = new RequestQueue(scheduler);
173
174         listeners = new HashMap<>();
175         eventListeners = new HashSet<>();
176         teachInListener = null;
177
178         this.errorListener = errorListener;
179         this.serialPortManager = serialPortManager;
180         this.path = path;
181     }
182
183     public void initialize()
184             throws UnsupportedCommOperationException, PortInUseException, IOException, TooManyListenersException {
185         SerialPortManager localSerialPortManager = serialPortManager;
186         if (localSerialPortManager == null) {
187             throw new IOException("Could access the SerialPortManager, it was null");
188         }
189         SerialPortIdentifier id = localSerialPortManager.getIdentifier(path);
190         if (id == null) {
191             throw new IOException("Could not find a gateway on given path '" + path + "', "
192                     + localSerialPortManager.getIdentifiers().count() + " ports available.");
193         }
194
195         try {
196             serialPort = id.open(EnOceanBindingConstants.BINDING_ID, 1000);
197         } catch (PortInUseException e) {
198             logger.warn("EnOceanSerialTransceiver not initialized, port allready in use", e);
199             return;
200         }
201         SerialPort localSerialPort = serialPort;
202         if (localSerialPort == null) {
203             logger.debug("EnOceanSerialTransceiver not initialized, serialPort was null");
204             return;
205         }
206         localSerialPort.setSerialPortParams(ENOCEAN_DEFAULT_BAUD, SerialPort.DATABITS_8, SerialPort.STOPBITS_1,
207                 SerialPort.PARITY_NONE);
208
209         try {
210             localSerialPort.enableReceiveThreshold(1);
211             localSerialPort.enableReceiveTimeout(100); // In ms. Small values mean faster shutdown but more cpu usage.
212         } catch (UnsupportedCommOperationException e) {
213             // rfc connections do not allow a ReceiveThreshold
214             logger.debug("EnOceanSerialTransceiver encountered an UnsupportedCommOperationException while initilizing",
215                     e);
216         }
217
218         inputStream = localSerialPort.getInputStream();
219         outputStream = localSerialPort.getOutputStream();
220         logger.info("EnOceanSerialTransceiver initialized");
221     }
222
223     public void startReceiving(ScheduledExecutorService scheduler) {
224         @Nullable
225         Future<?> readingTask = this.readingTask;
226         if (readingTask == null || readingTask.isCancelled()) {
227             this.readingTask = scheduler.submit(new Runnable() {
228                 @Override
229                 public void run() {
230                     receivePackets();
231                 }
232             });
233         }
234         logger.info("EnOceanSerialTransceiver RX thread started");
235     }
236
237     public void shutDown() {
238         logger.debug("shutting down transceiver");
239         logger.debug("Interrupt rx Thread");
240
241         Future<?> timeOutTask = this.timeOutTask;
242         if (timeOutTask != null) {
243             timeOutTask.cancel(true);
244             this.timeOutTask = null;
245         }
246
247         Future<?> readingTask = this.readingTask;
248         if (readingTask != null) {
249             readingTask.cancel(true);
250
251             InputStream localInputStream = inputStream;
252             if (localInputStream != null) {
253                 try {
254                     localInputStream.close();
255                 } catch (IOException e) {
256                     logger.debug("IOException occured while closing the stream", e);
257                 }
258             }
259             this.readingTask = null;
260         }
261
262         listeners.clear();
263         eventListeners.clear();
264         teachInListener = null;
265         errorListener = null;
266
267         OutputStream localOutputStream = outputStream;
268         if (localOutputStream != null) {
269             try {
270                 localOutputStream.close();
271             } catch (IOException e) {
272                 logger.debug("IOException occured while closing the output stream", e);
273             }
274         }
275
276         InputStream localInputStream = inputStream;
277         if (localInputStream != null) {
278             try {
279                 localInputStream.close();
280             } catch (IOException e) {
281                 logger.debug("IOException occured while closing the input stream", e);
282             }
283         }
284
285         SerialPort localSerialPort = serialPort;
286         if (localSerialPort != null) {
287             logger.debug("Closing the serial port");
288             localSerialPort.close();
289         }
290
291         serialPort = null;
292         outputStream = null;
293         inputStream = null;
294
295         logger.info("Transceiver shutdown");
296     }
297
298     private void receivePackets() {
299         byte[] buffer = new byte[1];
300
301         Future<?> readingTask = this.readingTask;
302         while (readingTask != null && !readingTask.isCancelled()) {
303             int bytesRead = read(buffer, 1);
304             if (bytesRead > 0) {
305                 processMessage(buffer[0]);
306             }
307         }
308     }
309
310     protected abstract void processMessage(byte firstByte);
311
312     protected int read(byte[] buffer, int length) {
313         InputStream localInputStream = inputStream;
314         if (localInputStream != null) {
315             try {
316                 return localInputStream.read(buffer, 0, length);
317             } catch (IOException e) {
318                 logger.debug("IOException occured while reading the input stream", e);
319                 return 0;
320             }
321         } else {
322             logger.warn("Cannot read from null stream");
323             Future<?> readingTask = this.readingTask;
324             if (readingTask != null) {
325                 readingTask.cancel(true);
326                 this.readingTask = null;
327             }
328             TransceiverErrorListener localListener = errorListener;
329             if (localListener != null) {
330                 localListener.errorOccured(new IOException("Cannot read from null stream"));
331             }
332             return 0;
333         }
334     }
335
336     protected void informListeners(BasePacket packet) {
337         try {
338             if (packet.getPacketType() == ESPPacketType.RADIO_ERP1) {
339                 ERP1Message msg = (ERP1Message) packet;
340                 byte[] senderId = msg.getSenderId();
341                 byte[] d = Helper.concatAll(msg.getPayload(), msg.getOptionalPayload());
342
343                 logger.debug("{} with RORG {} for {} payload {} received", packet.getPacketType().name(),
344                         msg.getRORG().name(), HexUtils.bytesToHex(msg.getSenderId()), HexUtils.bytesToHex(d));
345
346                 if (msg.getRORG() != RORG.Unknown) {
347                     if (senderId.length > 0) {
348                         if (senderId.length > 2 && filteredDeviceId.length > 2 && senderId[0] == filteredDeviceId[0]
349                                 && senderId[1] == filteredDeviceId[1] && senderId[2] == filteredDeviceId[2]) {
350                             // filter away own messages which are received through a repeater
351                             return;
352                         }
353
354                         if (teachInListener != null && (msg.getIsTeachIn() || msg.getRORG() == RORG.RPS)) {
355                             logger.info("Received teach in message from {}", HexUtils.bytesToHex(msg.getSenderId()));
356
357                             TeachInListener localListener = teachInListener;
358                             if (localListener != null) {
359                                 localListener.packetReceived(msg);
360                             }
361                             return;
362                         } else if (teachInListener == null && msg.getIsTeachIn()) {
363                             logger.info("Discard message because this is a teach-in telegram from {}!",
364                                     HexUtils.bytesToHex(msg.getSenderId()));
365                             return;
366                         }
367
368                         long s = Long.parseLong(HexUtils.bytesToHex(senderId), 16);
369                         synchronized (this) {
370                             HashSet<PacketListener> pl = listeners.get(s);
371                             if (pl != null) {
372                                 pl.forEach(l -> l.packetReceived(msg));
373                             }
374                         }
375                     }
376                 } else {
377                     logger.debug("Received unknown RORG");
378                 }
379             } else if (packet.getPacketType() == ESPPacketType.EVENT) {
380                 EventMessage event = (EventMessage) packet;
381
382                 byte[] d = Helper.concatAll(packet.getPayload(), packet.getOptionalPayload());
383                 logger.debug("{} with type {} payload {} received", ESPPacketType.EVENT.name(),
384                         event.getEventMessageType().name(), HexUtils.bytesToHex(d));
385
386                 if (event.getEventMessageType() == EventMessageType.SA_CONFIRM_LEARN) {
387                     byte[] senderId = event.getPayload(EventMessageType.SA_CONFIRM_LEARN.getDataLength() - 5, 4);
388
389                     if (teachInListener != null) {
390                         logger.info("Received smart teach in from {}", HexUtils.bytesToHex(senderId));
391                         TeachInListener localListener = teachInListener;
392                         if (localListener != null) {
393                             localListener.eventReceived(event);
394                         }
395                         return;
396                     } else {
397                         logger.info("Discard message because this is a smart teach-in telegram from {}!",
398                                 HexUtils.bytesToHex(senderId));
399                         return;
400                     }
401                 }
402
403                 synchronized (this) {
404                     eventListeners.forEach(l -> l.eventReceived(event));
405                 }
406             }
407         } catch (Exception e) {
408             logger.error("Exception in informListeners", e);
409         }
410     }
411
412     protected void handleResponse(Response response) throws IOException {
413         Request localCurrentRequest = currentRequest;
414         if (localCurrentRequest != null) {
415             ResponseListener<? extends @Nullable Response> listener = localCurrentRequest.responseListener;
416             if (listener != null) {
417                 localCurrentRequest.responsePacket = response;
418                 try {
419                     listener.handleResponse(response);
420                 } catch (Exception e) {
421                     logger.debug("Exception during response handling");
422                 } finally {
423                     logger.trace("Response handled");
424                 }
425             } else {
426                 logger.trace("Response without listener");
427             }
428         } else {
429             logger.trace("Response without request");
430         }
431     }
432
433     public void sendBasePacket(@Nullable BasePacket packet,
434             @Nullable ResponseListener<? extends @Nullable Response> responseCallback) throws IOException {
435         if (packet == null) {
436             return;
437         }
438
439         logger.debug("Enqueue new send request with ESP3 type {} {} callback", packet.getPacketType().name(),
440                 responseCallback == null ? "without" : "with");
441         Request r = new Request();
442         r.requestPacket = packet;
443         r.responseListener = responseCallback;
444
445         requestQueue.enqueRequest(r);
446     }
447
448     protected abstract byte[] serializePacket(BasePacket packet) throws EnOceanException;
449
450     public synchronized void addPacketListener(PacketListener listener, long senderIdToListenTo) {
451         if (listeners.computeIfAbsent(senderIdToListenTo, k -> new HashSet<>()).add(listener)) {
452             logger.debug("Listener added: {}", senderIdToListenTo);
453         }
454     }
455
456     public synchronized void removePacketListener(PacketListener listener, long senderIdToListenTo) {
457         HashSet<PacketListener> pl = listeners.get(senderIdToListenTo);
458         if (pl != null) {
459             pl.remove(listener);
460             if (pl.isEmpty()) {
461                 listeners.remove(senderIdToListenTo);
462             }
463         }
464     }
465
466     public synchronized void addEventMessageListener(EventListener listener) {
467         eventListeners.add(listener);
468     }
469
470     public synchronized void removeEventMessageListener(EventListener listener) {
471         eventListeners.remove(listener);
472     }
473
474     public void startDiscovery(TeachInListener teachInListener) {
475         this.teachInListener = teachInListener;
476     }
477
478     public void stopDiscovery() {
479         this.teachInListener = null;
480     }
481
482     public void setFilteredDeviceId(byte[] filteredDeviceId) {
483         System.arraycopy(filteredDeviceId, 0, filteredDeviceId, 0, filteredDeviceId.length);
484     }
485
486     @Override
487     public void serialEvent(SerialPortEvent event) {
488         if (event.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
489             synchronized (this) {
490                 this.notify();
491             }
492         }
493     }
494 }