]> git.basschouten.com Git - openhab-addons.git/blob
298d1b1f06122baf295f4ac693f45fdd195f7641
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.enocean.internal.transceiver;
14
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.OutputStream;
18 import java.util.HashMap;
19 import java.util.HashSet;
20 import java.util.Map;
21 import java.util.Queue;
22 import java.util.TooManyListenersException;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.TimeUnit;
27
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.openhab.binding.enocean.internal.EnOceanBindingConstants;
31 import org.openhab.binding.enocean.internal.EnOceanException;
32 import org.openhab.binding.enocean.internal.Helper;
33 import org.openhab.binding.enocean.internal.messages.BasePacket;
34 import org.openhab.binding.enocean.internal.messages.BasePacket.ESPPacketType;
35 import org.openhab.binding.enocean.internal.messages.ERP1Message;
36 import org.openhab.binding.enocean.internal.messages.ERP1Message.RORG;
37 import org.openhab.binding.enocean.internal.messages.EventMessage;
38 import org.openhab.binding.enocean.internal.messages.EventMessage.EventMessageType;
39 import org.openhab.binding.enocean.internal.messages.Response;
40 import org.openhab.core.io.transport.serial.PortInUseException;
41 import org.openhab.core.io.transport.serial.SerialPort;
42 import org.openhab.core.io.transport.serial.SerialPortEvent;
43 import org.openhab.core.io.transport.serial.SerialPortEventListener;
44 import org.openhab.core.io.transport.serial.SerialPortIdentifier;
45 import org.openhab.core.io.transport.serial.SerialPortManager;
46 import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
47 import org.openhab.core.util.HexUtils;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 /**
52  *
53  * @author Daniel Weber - Initial contribution
54  */
55 @NonNullByDefault
56 public abstract class EnOceanTransceiver implements SerialPortEventListener {
57
58     public static final int ENOCEAN_MAX_DATA = 65790;
59
60     // Thread management
61     protected @Nullable Future<?> readingTask = null;
62     private @Nullable Future<?> timeOut = null;
63
64     protected Logger logger = LoggerFactory.getLogger(EnOceanTransceiver.class);
65
66     private @Nullable SerialPortManager serialPortManager;
67     private static final int ENOCEAN_DEFAULT_BAUD = 57600;
68     protected String path;
69     private @Nullable SerialPort serialPort;
70
71     class Request {
72         @Nullable
73         BasePacket requestPacket;
74         @Nullable
75         Response responsePacket;
76         @Nullable
77         ResponseListener<? extends @Nullable Response> responseListener;
78     }
79
80     private class RequestQueue {
81         private Queue<Request> queue = new LinkedBlockingQueue<>();
82         private ScheduledExecutorService scheduler;
83
84         public RequestQueue(ScheduledExecutorService scheduler) {
85             this.scheduler = scheduler;
86         }
87
88         public synchronized void enqueRequest(Request request) throws IOException {
89             boolean wasEmpty = queue.isEmpty();
90
91             if (queue.offer(request)) {
92                 if (wasEmpty) {
93                     send();
94                 }
95             } else {
96                 logger.error("Transmit queue overflow. Lost message: {}", request);
97             }
98         }
99
100         private synchronized void sendNext() throws IOException {
101             queue.poll();
102             send();
103         }
104
105         private synchronized void send() throws IOException {
106             if (!queue.isEmpty()) {
107                 currentRequest = queue.peek();
108                 try {
109                     Request localCurrentRequest = currentRequest;
110                     if (localCurrentRequest != null && localCurrentRequest.requestPacket != null) {
111                         synchronized (localCurrentRequest) {
112                             BasePacket rqPacket = localCurrentRequest.requestPacket;
113                             if (currentRequest != null && rqPacket != null) {
114                                 logger.debug("Sending data, type {}, payload {}{}", rqPacket.getPacketType().name(),
115                                         HexUtils.bytesToHex(rqPacket.getPayload()),
116                                         HexUtils.bytesToHex(rqPacket.getOptionalPayload()));
117                                 byte[] b = serializePacket(rqPacket);
118                                 logger.trace("Sending raw data: {}", HexUtils.bytesToHex(b));
119                                 OutputStream localOutPutStream = outputStream;
120                                 if (localOutPutStream != null) {
121                                     localOutPutStream.write(b);
122                                     localOutPutStream.flush();
123                                 }
124                                 Future<?> localTimeOut = timeOut;
125                                 if (localTimeOut != null) {
126                                     localTimeOut.cancel(true);
127                                 }
128
129                                 // slowdown sending of message to avoid hickups at receivers
130                                 // Todo tweak sending intervall (250 ist just a first try)
131                                 timeOut = scheduler.schedule(() -> {
132                                     try {
133                                         sendNext();
134                                     } catch (IOException e) {
135                                         logger.trace("Unable to process message", e);
136                                         TransceiverErrorListener localListener = errorListener;
137                                         if (localListener != null) {
138                                             localListener.errorOccured(e);
139                                         }
140                                         return;
141                                     }
142                                 }, 250, TimeUnit.MILLISECONDS);
143                             }
144                         }
145                     } else {
146                         sendNext();
147                     }
148                 } catch (EnOceanException e) {
149                     logger.error("exception while sending data", e);
150                 }
151             }
152         }
153     }
154
155     RequestQueue requestQueue;
156     @Nullable
157     Request currentRequest = null;
158
159     protected Map<Long, HashSet<PacketListener>> listeners;
160     protected HashSet<EventListener> eventListeners;
161     protected @Nullable TeachInListener teachInListener;
162
163     protected @Nullable InputStream inputStream;
164     protected @Nullable OutputStream outputStream;
165
166     private byte[] filteredDeviceId = new byte[0];
167     @Nullable
168     TransceiverErrorListener errorListener;
169
170     public EnOceanTransceiver(String path, TransceiverErrorListener errorListener, ScheduledExecutorService scheduler,
171             @Nullable SerialPortManager serialPortManager) {
172         requestQueue = new RequestQueue(scheduler);
173
174         listeners = new HashMap<>();
175         eventListeners = new HashSet<>();
176         teachInListener = null;
177
178         this.errorListener = errorListener;
179         this.serialPortManager = serialPortManager;
180         this.path = path;
181     }
182
183     public void initilize()
184             throws UnsupportedCommOperationException, PortInUseException, IOException, TooManyListenersException {
185         SerialPortManager localSerialPortManager = serialPortManager;
186         if (localSerialPortManager == null) {
187             throw new IOException("Could access the SerialPortManager, it was null");
188         }
189         SerialPortIdentifier id = localSerialPortManager.getIdentifier(path);
190         if (id == null) {
191             throw new IOException("Could not find a gateway on given path '" + path + "', "
192                     + localSerialPortManager.getIdentifiers().count() + " ports available.");
193         }
194
195         try {
196             serialPort = id.open(EnOceanBindingConstants.BINDING_ID, 1000);
197         } catch (PortInUseException e) {
198             logger.warn("EnOceanSerialTransceiver not initialized, port allready in use", e);
199             return;
200         }
201         SerialPort localSerialPort = serialPort;
202         if (localSerialPort == null) {
203             logger.debug("EnOceanSerialTransceiver not initialized, serialPort was null");
204             return;
205         }
206         localSerialPort.setSerialPortParams(ENOCEAN_DEFAULT_BAUD, SerialPort.DATABITS_8, SerialPort.STOPBITS_1,
207                 SerialPort.PARITY_NONE);
208
209         try {
210             localSerialPort.enableReceiveThreshold(1);
211             localSerialPort.enableReceiveTimeout(100); // In ms. Small values mean faster shutdown but more cpu usage.
212         } catch (UnsupportedCommOperationException e) {
213             // rfc connections do not allow a ReceiveThreshold
214             logger.debug("EnOceanSerialTransceiver encountered an UnsupportedCommOperationException while initilizing",
215                     e);
216         }
217
218         inputStream = localSerialPort.getInputStream();
219         outputStream = localSerialPort.getOutputStream();
220         logger.info("EnOceanSerialTransceiver initialized");
221     }
222
223     public void startReceiving(ScheduledExecutorService scheduler) {
224         @Nullable
225         Future<?> localReadingTask = readingTask;
226         if (localReadingTask == null || localReadingTask.isCancelled()) {
227             localReadingTask = scheduler.submit(new Runnable() {
228                 @Override
229                 public void run() {
230                     receivePackets();
231                 }
232             });
233         }
234         logger.info("EnOceanSerialTransceiver RX thread started");
235     }
236
237     public void shutDown() {
238         logger.debug("shutting down transceiver");
239         logger.debug("Interrupt rx Thread");
240
241         Future<?> localTimeOut = timeOut;
242         if (localTimeOut != null) {
243             localTimeOut.cancel(true);
244         }
245
246         Future<?> localReadingTask = readingTask;
247         if (localReadingTask != null) {
248             localReadingTask.cancel(true);
249
250             InputStream localInputStream = inputStream;
251             if (localInputStream != null) {
252                 try {
253                     localInputStream.close();
254                 } catch (IOException e) {
255                     logger.debug("IOException occured while closing the stream", e);
256                 }
257             }
258         }
259
260         readingTask = null;
261         timeOut = null;
262         listeners.clear();
263         eventListeners.clear();
264         teachInListener = null;
265         errorListener = null;
266
267         OutputStream localOutputStream = outputStream;
268         if (localOutputStream != null) {
269             try {
270                 localOutputStream.close();
271             } catch (IOException e) {
272                 logger.debug("IOException occured while closing the output stream", e);
273             }
274         }
275
276         InputStream localInputStream = inputStream;
277         if (localInputStream != null) {
278             try {
279                 localInputStream.close();
280             } catch (IOException e) {
281                 logger.debug("IOException occured while closing the input stream", e);
282             }
283         }
284
285         SerialPort localSerialPort = serialPort;
286         if (localSerialPort != null) {
287             logger.debug("Closing the serial port");
288             localSerialPort.close();
289         }
290
291         serialPort = null;
292         outputStream = null;
293         inputStream = null;
294
295         logger.info("Transceiver shutdown");
296     }
297
298     private void receivePackets() {
299         byte[] buffer = new byte[1];
300
301         Future<?> localReadingTask = readingTask;
302         while (localReadingTask != null && !localReadingTask.isCancelled()) {
303             int bytesRead = read(buffer, 1);
304             if (bytesRead > 0) {
305                 processMessage(buffer[0]);
306             }
307         }
308     }
309
310     protected abstract void processMessage(byte firstByte);
311
312     protected int read(byte[] buffer, int length) {
313         InputStream localInputStream = inputStream;
314         if (localInputStream != null) {
315             try {
316                 localInputStream.read(buffer, 0, length);
317             } catch (IOException e) {
318                 logger.debug("IOException occured while reading the input stream", e);
319                 return 0;
320             }
321         }
322         return 0;
323     }
324
325     protected void informListeners(BasePacket packet) {
326         try {
327             if (packet.getPacketType() == ESPPacketType.RADIO_ERP1) {
328                 ERP1Message msg = (ERP1Message) packet;
329                 byte[] senderId = msg.getSenderId();
330                 byte[] d = Helper.concatAll(msg.getPayload(), msg.getOptionalPayload());
331
332                 logger.debug("{} with RORG {} for {} payload {} received", packet.getPacketType().name(),
333                         msg.getRORG().name(), HexUtils.bytesToHex(msg.getSenderId()), HexUtils.bytesToHex(d));
334
335                 if (msg.getRORG() != RORG.Unknown) {
336                     if (senderId.length > 0) {
337                         if (senderId.length > 2 && filteredDeviceId.length > 2 && senderId[0] == filteredDeviceId[0]
338                                 && senderId[1] == filteredDeviceId[1] && senderId[2] == filteredDeviceId[2]) {
339                             // filter away own messages which are received through a repeater
340                             return;
341                         }
342
343                         if (teachInListener != null && (msg.getIsTeachIn() || msg.getRORG() == RORG.RPS)) {
344                             logger.info("Received teach in message from {}", HexUtils.bytesToHex(msg.getSenderId()));
345
346                             TeachInListener localListener = teachInListener;
347                             if (localListener != null) {
348                                 localListener.packetReceived(msg);
349                             }
350                             return;
351                         } else if (teachInListener == null && msg.getIsTeachIn()) {
352                             logger.info("Discard message because this is a teach-in telegram from {}!",
353                                     HexUtils.bytesToHex(msg.getSenderId()));
354                             return;
355                         }
356
357                         long s = Long.parseLong(HexUtils.bytesToHex(senderId), 16);
358                         synchronized (this) {
359                             HashSet<PacketListener> pl = listeners.get(s);
360                             if (pl != null) {
361                                 pl.forEach(l -> l.packetReceived(msg));
362                             }
363                         }
364                     }
365                 } else {
366                     logger.debug("Received unknown RORG");
367                 }
368             } else if (packet.getPacketType() == ESPPacketType.EVENT) {
369                 EventMessage event = (EventMessage) packet;
370
371                 byte[] d = Helper.concatAll(packet.getPayload(), packet.getOptionalPayload());
372                 logger.debug("{} with type {} payload {} received", ESPPacketType.EVENT.name(),
373                         event.getEventMessageType().name(), HexUtils.bytesToHex(d));
374
375                 if (event.getEventMessageType() == EventMessageType.SA_CONFIRM_LEARN) {
376                     byte[] senderId = event.getPayload(EventMessageType.SA_CONFIRM_LEARN.getDataLength() - 5, 4);
377
378                     if (teachInListener != null) {
379                         logger.info("Received smart teach in from {}", HexUtils.bytesToHex(senderId));
380                         TeachInListener localListener = teachInListener;
381                         if (localListener != null) {
382                             localListener.eventReceived(event);
383                         }
384                         return;
385                     } else {
386                         logger.info("Discard message because this is a smart teach-in telegram from {}!",
387                                 HexUtils.bytesToHex(senderId));
388                         return;
389                     }
390                 }
391
392                 synchronized (this) {
393                     eventListeners.forEach(l -> l.eventReceived(event));
394                 }
395             }
396         } catch (Exception e) {
397             logger.error("Exception in informListeners", e);
398         }
399     }
400
401     protected void handleResponse(Response response) throws IOException {
402         Request localCurrentRequest = currentRequest;
403         if (localCurrentRequest != null) {
404             ResponseListener<? extends @Nullable Response> listener = localCurrentRequest.responseListener;
405             if (listener != null) {
406                 localCurrentRequest.responsePacket = response;
407                 try {
408                     listener.handleResponse(response);
409                 } catch (Exception e) {
410                     logger.debug("Exception during response handling");
411                 } finally {
412                     logger.trace("Response handled");
413                 }
414             } else {
415                 logger.trace("Response without listener");
416             }
417         } else {
418             logger.trace("Response without request");
419         }
420     }
421
422     public void sendBasePacket(@Nullable BasePacket packet,
423             @Nullable ResponseListener<? extends @Nullable Response> responseCallback) throws IOException {
424         if (packet == null) {
425             return;
426         }
427
428         logger.debug("Enqueue new send request with ESP3 type {} {} callback", packet.getPacketType().name(),
429                 responseCallback == null ? "without" : "with");
430         Request r = new Request();
431         r.requestPacket = packet;
432         r.responseListener = responseCallback;
433
434         requestQueue.enqueRequest(r);
435     }
436
437     protected abstract byte[] serializePacket(BasePacket packet) throws EnOceanException;
438
439     public synchronized void addPacketListener(PacketListener listener, long senderIdToListenTo) {
440         if (listeners.computeIfAbsent(senderIdToListenTo, k -> new HashSet<>()).add(listener)) {
441             logger.debug("Listener added: {}", senderIdToListenTo);
442         }
443     }
444
445     public synchronized void removePacketListener(PacketListener listener, long senderIdToListenTo) {
446         HashSet<PacketListener> pl = listeners.get(senderIdToListenTo);
447         if (pl != null) {
448             pl.remove(listener);
449             if (pl.isEmpty()) {
450                 listeners.remove(senderIdToListenTo);
451             }
452         }
453     }
454
455     public synchronized void addEventMessageListener(EventListener listener) {
456         eventListeners.add(listener);
457     }
458
459     public synchronized void removeEventMessageListener(EventListener listener) {
460         eventListeners.remove(listener);
461     }
462
463     public void startDiscovery(TeachInListener teachInListener) {
464         this.teachInListener = teachInListener;
465     }
466
467     public void stopDiscovery() {
468         this.teachInListener = null;
469     }
470
471     public void setFilteredDeviceId(byte[] filteredDeviceId) {
472         System.arraycopy(filteredDeviceId, 0, filteredDeviceId, 0, filteredDeviceId.length);
473     }
474
475     @Override
476     public void serialEvent(SerialPortEvent event) {
477         if (event.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
478             synchronized (this) {
479                 this.notify();
480             }
481         }
482     }
483 }