]> git.basschouten.com Git - openhab-addons.git/blob
55be4988b02598bd094cae4e0fe60e2493cae465
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.enocean.internal.transceiver;
14
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.OutputStream;
18 import java.util.HashMap;
19 import java.util.HashSet;
20 import java.util.Map;
21 import java.util.Queue;
22 import java.util.TooManyListenersException;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.TimeUnit;
27
28 import org.openhab.binding.enocean.internal.EnOceanBindingConstants;
29 import org.openhab.binding.enocean.internal.EnOceanException;
30 import org.openhab.binding.enocean.internal.Helper;
31 import org.openhab.binding.enocean.internal.messages.BasePacket;
32 import org.openhab.binding.enocean.internal.messages.BasePacket.ESPPacketType;
33 import org.openhab.binding.enocean.internal.messages.ERP1Message;
34 import org.openhab.binding.enocean.internal.messages.ERP1Message.RORG;
35 import org.openhab.binding.enocean.internal.messages.EventMessage;
36 import org.openhab.binding.enocean.internal.messages.EventMessage.EventMessageType;
37 import org.openhab.binding.enocean.internal.messages.Response;
38 import org.openhab.core.io.transport.serial.PortInUseException;
39 import org.openhab.core.io.transport.serial.SerialPort;
40 import org.openhab.core.io.transport.serial.SerialPortEvent;
41 import org.openhab.core.io.transport.serial.SerialPortEventListener;
42 import org.openhab.core.io.transport.serial.SerialPortIdentifier;
43 import org.openhab.core.io.transport.serial.SerialPortManager;
44 import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
45 import org.openhab.core.util.HexUtils;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  *
51  * @author Daniel Weber - Initial contribution
52  */
53 public abstract class EnOceanTransceiver implements SerialPortEventListener {
54
55     public static final int ENOCEAN_MAX_DATA = 65790;
56
57     // Thread management
58     protected Future<?> readingTask = null;
59     private Future<?> timeOut = null;
60
61     protected Logger logger = LoggerFactory.getLogger(EnOceanTransceiver.class);
62
63     private SerialPortManager serialPortManager;
64     private static final int ENOCEAN_DEFAULT_BAUD = 57600;
65     protected String path;
66     SerialPort serialPort;
67
68     class Request {
69         BasePacket RequestPacket;
70
71         Response ResponsePacket;
72         ResponseListener<? extends Response> ResponseListener;
73     }
74
75     private class RequestQueue {
76         private Queue<Request> queue = new LinkedBlockingQueue<>();
77         private ScheduledExecutorService scheduler;
78
79         public RequestQueue(ScheduledExecutorService scheduler) {
80             this.scheduler = scheduler;
81         }
82
83         public synchronized void enqueRequest(Request request) throws IOException {
84             boolean wasEmpty = queue.isEmpty();
85
86             if (queue.offer(request)) {
87                 if (wasEmpty) {
88                     send();
89                 }
90             } else {
91                 logger.error("Transmit queue overflow. Lost message: {}", request);
92             }
93         }
94
95         private synchronized void sendNext() throws IOException {
96             queue.poll();
97             send();
98         }
99
100         private synchronized void send() throws IOException {
101             if (!queue.isEmpty()) {
102                 currentRequest = queue.peek();
103                 try {
104                     if (currentRequest != null && currentRequest.RequestPacket != null) {
105                         synchronized (currentRequest) {
106                             logger.debug("Sending data, type {}, payload {}{}",
107                                     currentRequest.RequestPacket.getPacketType().name(),
108                                     HexUtils.bytesToHex(currentRequest.RequestPacket.getPayload()),
109                                     HexUtils.bytesToHex(currentRequest.RequestPacket.getOptionalPayload()));
110
111                             byte[] b = serializePacket(currentRequest.RequestPacket);
112                             logger.trace("Sending raw data: {}", HexUtils.bytesToHex(b));
113                             outputStream.write(b);
114                             outputStream.flush();
115
116                             if (timeOut != null) {
117                                 timeOut.cancel(true);
118                             }
119
120                             // slowdown sending of message to avoid hickups at receivers
121                             // Todo tweak sending intervall (250 ist just a first try)
122                             timeOut = scheduler.schedule(() -> {
123                                 try {
124                                     sendNext();
125                                 } catch (IOException e) {
126                                     errorListener.ErrorOccured(e);
127                                     return;
128                                 }
129                             }, 250, TimeUnit.MILLISECONDS);
130                         }
131                     } else {
132                         sendNext();
133                     }
134                 } catch (EnOceanException e) {
135                     logger.error("exception while sending data", e);
136                 }
137             }
138         }
139     }
140
141     RequestQueue requestQueue;
142     Request currentRequest = null;
143
144     protected Map<Long, HashSet<PacketListener>> listeners;
145     protected HashSet<EventListener> eventListeners;
146     protected TeachInListener teachInListener;
147
148     protected InputStream inputStream;
149     protected OutputStream outputStream;
150
151     private byte[] filteredDeviceId;
152     TransceiverErrorListener errorListener;
153
154     public EnOceanTransceiver(String path, TransceiverErrorListener errorListener, ScheduledExecutorService scheduler,
155             SerialPortManager serialPortManager) {
156         requestQueue = new RequestQueue(scheduler);
157
158         listeners = new HashMap<>();
159         eventListeners = new HashSet<>();
160         teachInListener = null;
161
162         this.errorListener = errorListener;
163         this.serialPortManager = serialPortManager;
164         this.path = path;
165     }
166
167     public void Initialize()
168             throws UnsupportedCommOperationException, PortInUseException, IOException, TooManyListenersException {
169         SerialPortIdentifier id = serialPortManager.getIdentifier(path);
170         if (id == null) {
171             throw new IOException("Could not find a gateway on given path '" + path + "', "
172                     + serialPortManager.getIdentifiers().count() + " ports available.");
173         }
174
175         serialPort = id.open(EnOceanBindingConstants.BINDING_ID, 1000);
176         serialPort.setSerialPortParams(ENOCEAN_DEFAULT_BAUD, SerialPort.DATABITS_8, SerialPort.STOPBITS_1,
177                 SerialPort.PARITY_NONE);
178
179         try {
180             serialPort.enableReceiveThreshold(1);
181             serialPort.enableReceiveTimeout(100); // In ms. Small values mean faster shutdown but more cpu usage.
182         } catch (UnsupportedCommOperationException e) {
183             // rfc connections do not allow a ReceiveThreshold
184         }
185
186         inputStream = serialPort.getInputStream();
187         outputStream = serialPort.getOutputStream();
188
189         logger.info("EnOceanSerialTransceiver initialized");
190     }
191
192     public void StartReceiving(ScheduledExecutorService scheduler) {
193         if (readingTask == null || readingTask.isCancelled()) {
194             readingTask = scheduler.submit(new Runnable() {
195                 @Override
196                 public void run() {
197                     receivePackets();
198                 }
199             });
200         }
201         logger.info("EnOceanSerialTransceiver RX thread started");
202     }
203
204     public void ShutDown() {
205         logger.debug("shutting down transceiver");
206         logger.debug("Interrupt rx Thread");
207
208         if (timeOut != null) {
209             timeOut.cancel(true);
210         }
211
212         if (readingTask != null) {
213             readingTask.cancel(true);
214             try {
215                 inputStream.close();
216             } catch (Exception e) {
217             }
218         }
219
220         readingTask = null;
221         timeOut = null;
222         listeners.clear();
223         teachInListener = null;
224         errorListener = null;
225
226         if (outputStream != null) {
227             logger.debug("Closing serial output stream");
228             try {
229                 outputStream.close();
230             } catch (IOException e) {
231                 logger.debug("Error while closing the output stream: {}", e.getMessage());
232             }
233         }
234         if (inputStream != null) {
235             logger.debug("Closeing serial input stream");
236             try {
237                 inputStream.close();
238             } catch (IOException e) {
239                 logger.debug("Error while closing the input stream: {}", e.getMessage());
240             }
241         }
242
243         if (serialPort != null) {
244             logger.debug("Closing serial port");
245             serialPort.close();
246         }
247
248         serialPort = null;
249         outputStream = null;
250         inputStream = null;
251
252         logger.info("Transceiver shutdown");
253     }
254
255     private void receivePackets() {
256         byte[] buffer = new byte[1];
257
258         while (readingTask != null && !readingTask.isCancelled()) {
259             int bytesRead = read(buffer, 1);
260             if (bytesRead > 0) {
261                 processMessage(buffer[0]);
262             }
263         }
264     }
265
266     protected abstract void processMessage(byte firstByte);
267
268     protected int read(byte[] buffer, int length) {
269         try {
270             return this.inputStream.read(buffer, 0, length);
271         } catch (IOException e) {
272             return 0;
273         }
274     }
275
276     protected void informListeners(BasePacket packet) {
277         try {
278             if (packet.getPacketType() == ESPPacketType.RADIO_ERP1) {
279                 ERP1Message msg = (ERP1Message) packet;
280                 byte[] senderId = msg.getSenderId();
281                 byte[] d = Helper.concatAll(msg.getPayload(), msg.getOptionalPayload());
282
283                 logger.debug("{} with RORG {} for {} payload {} received", packet.getPacketType().name(),
284                         msg.getRORG().name(), HexUtils.bytesToHex(msg.getSenderId()), HexUtils.bytesToHex(d));
285
286                 if (msg.getRORG() != RORG.Unknown) {
287                     if (senderId != null) {
288                         if (filteredDeviceId != null && senderId[0] == filteredDeviceId[0]
289                                 && senderId[1] == filteredDeviceId[1] && senderId[2] == filteredDeviceId[2]) {
290                             // filter away own messages which are received through a repeater
291                             return;
292                         }
293
294                         if (teachInListener != null && (msg.getIsTeachIn() || msg.getRORG() == RORG.RPS)) {
295                             logger.info("Received teach in message from {}", HexUtils.bytesToHex(msg.getSenderId()));
296                             teachInListener.packetReceived(msg);
297                             return;
298                         } else if (teachInListener == null && msg.getIsTeachIn()) {
299                             logger.info("Discard message because this is a teach-in telegram from {}!",
300                                     HexUtils.bytesToHex(msg.getSenderId()));
301                             return;
302                         }
303
304                         long s = Long.parseLong(HexUtils.bytesToHex(senderId), 16);
305                         HashSet<PacketListener> pl = listeners.get(s);
306                         if (pl != null) {
307                             pl.forEach(l -> l.packetReceived(msg));
308                         }
309                     }
310                 } else {
311                     logger.debug("Received unknown RORG");
312                 }
313             } else if (packet.getPacketType() == ESPPacketType.EVENT) {
314                 EventMessage event = (EventMessage) packet;
315
316                 byte[] d = Helper.concatAll(packet.getPayload(), packet.getOptionalPayload());
317                 logger.debug("{} with type {} payload {} received", ESPPacketType.EVENT.name(),
318                         event.getEventMessageType().name(), HexUtils.bytesToHex(d));
319
320                 if (event.getEventMessageType() == EventMessageType.SA_CONFIRM_LEARN) {
321                     byte[] senderId = event.getPayload(EventMessageType.SA_CONFIRM_LEARN.getDataLength() - 5, 4);
322
323                     if (teachInListener != null) {
324                         logger.info("Received smart teach in from {}", HexUtils.bytesToHex(senderId));
325                         teachInListener.eventReceived(event);
326                         return;
327                     } else {
328                         logger.info("Discard message because this is a smart teach-in telegram from {}!",
329                                 HexUtils.bytesToHex(senderId));
330                         return;
331                     }
332                 }
333
334                 eventListeners.forEach(l -> l.eventReceived(event));
335             }
336         } catch (Exception e) {
337             logger.error("Exception in informListeners", e);
338         }
339     }
340
341     protected void handleResponse(Response response) throws IOException {
342         if (currentRequest != null) {
343             if (currentRequest.ResponseListener != null) {
344                 currentRequest.ResponsePacket = response;
345                 try {
346                     currentRequest.ResponseListener.handleResponse(response);
347                 } catch (Exception e) {
348                     logger.debug("Exception during response handling");
349                 } finally {
350                     logger.trace("Response handled");
351                 }
352             } else {
353                 logger.trace("Response without listener");
354             }
355         } else {
356             logger.trace("Response without request");
357         }
358     }
359
360     public void sendBasePacket(BasePacket packet, ResponseListener<? extends Response> responseCallback)
361             throws IOException {
362         if (packet == null) {
363             return;
364         }
365
366         logger.debug("Enqueue new send request with ESP3 type {} {} callback", packet.getPacketType().name(),
367                 responseCallback == null ? "without" : "with");
368         Request r = new Request();
369         r.RequestPacket = packet;
370         r.ResponseListener = responseCallback;
371
372         requestQueue.enqueRequest(r);
373     }
374
375     protected abstract byte[] serializePacket(BasePacket packet) throws EnOceanException;
376
377     public void addPacketListener(PacketListener listener, long senderIdToListenTo) {
378         if (listeners.computeIfAbsent(senderIdToListenTo, k -> new HashSet<>()).add(listener)) {
379             logger.debug("Listener added: {}", senderIdToListenTo);
380         }
381     }
382
383     public void removePacketListener(PacketListener listener, long senderIdToListenTo) {
384         HashSet<PacketListener> pl = listeners.get(senderIdToListenTo);
385         if (pl != null) {
386             pl.remove(listener);
387             if (pl.isEmpty()) {
388                 listeners.remove(senderIdToListenTo);
389             }
390         }
391     }
392
393     public void addEventMessageListener(EventListener listener) {
394         eventListeners.add(listener);
395     }
396
397     public void removeEventMessageListener(EventListener listener) {
398         eventListeners.remove(listener);
399     }
400
401     public void startDiscovery(TeachInListener teachInListener) {
402         this.teachInListener = teachInListener;
403     }
404
405     public void stopDiscovery() {
406         this.teachInListener = null;
407     }
408
409     public void setFilteredDeviceId(byte[] filteredDeviceId) {
410         if (filteredDeviceId != null) {
411             System.arraycopy(filteredDeviceId, 0, filteredDeviceId, 0, filteredDeviceId.length);
412         }
413     }
414
415     @Override
416     public void serialEvent(SerialPortEvent event) {
417         if (event.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
418             synchronized (this) {
419                 this.notify();
420             }
421         }
422     }
423 }