]> git.basschouten.com Git - openhab-addons.git/blob
6fac447240d0359a3e362a4237552d1c7fdc51ba
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.enocean.internal.transceiver;
14
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.OutputStream;
18 import java.util.HashMap;
19 import java.util.HashSet;
20 import java.util.Map;
21 import java.util.Queue;
22 import java.util.TooManyListenersException;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.TimeUnit;
27
28 import org.openhab.binding.enocean.internal.EnOceanBindingConstants;
29 import org.openhab.binding.enocean.internal.EnOceanException;
30 import org.openhab.binding.enocean.internal.Helper;
31 import org.openhab.binding.enocean.internal.messages.BasePacket;
32 import org.openhab.binding.enocean.internal.messages.BasePacket.ESPPacketType;
33 import org.openhab.binding.enocean.internal.messages.ERP1Message;
34 import org.openhab.binding.enocean.internal.messages.ERP1Message.RORG;
35 import org.openhab.binding.enocean.internal.messages.EventMessage;
36 import org.openhab.binding.enocean.internal.messages.EventMessage.EventMessageType;
37 import org.openhab.binding.enocean.internal.messages.Response;
38 import org.openhab.core.io.transport.serial.PortInUseException;
39 import org.openhab.core.io.transport.serial.SerialPort;
40 import org.openhab.core.io.transport.serial.SerialPortEvent;
41 import org.openhab.core.io.transport.serial.SerialPortEventListener;
42 import org.openhab.core.io.transport.serial.SerialPortIdentifier;
43 import org.openhab.core.io.transport.serial.SerialPortManager;
44 import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
45 import org.openhab.core.util.HexUtils;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  *
51  * @author Daniel Weber - Initial contribution
52  */
53 public abstract class EnOceanTransceiver implements SerialPortEventListener {
54
55     public static final int ENOCEAN_MAX_DATA = 65790;
56
57     // Thread management
58     protected Future<?> readingTask = null;
59     private Future<?> timeOut = null;
60
61     protected Logger logger = LoggerFactory.getLogger(EnOceanTransceiver.class);
62
63     private SerialPortManager serialPortManager;
64     private static final int ENOCEAN_DEFAULT_BAUD = 57600;
65     protected String path;
66     SerialPort serialPort;
67
68     class Request {
69         BasePacket RequestPacket;
70
71         Response ResponsePacket;
72         ResponseListener<? extends Response> ResponseListener;
73     }
74
75     private class RequestQueue {
76         private Queue<Request> queue = new LinkedBlockingQueue<>();
77         private ScheduledExecutorService scheduler;
78
79         public RequestQueue(ScheduledExecutorService scheduler) {
80             this.scheduler = scheduler;
81         }
82
83         public synchronized void enqueRequest(Request request) throws IOException {
84             boolean wasEmpty = queue.isEmpty();
85
86             if (queue.offer(request)) {
87                 if (wasEmpty) {
88                     send();
89                 }
90             } else {
91                 logger.error("Transmit queue overflow. Lost message: {}", request);
92             }
93         }
94
95         private synchronized void sendNext() throws IOException {
96             queue.poll();
97             send();
98         }
99
100         private synchronized void send() throws IOException {
101             if (!queue.isEmpty()) {
102                 currentRequest = queue.peek();
103                 try {
104                     if (currentRequest != null && currentRequest.RequestPacket != null) {
105                         synchronized (currentRequest) {
106                             logger.debug("Sending data, type {}, payload {}{}",
107                                     currentRequest.RequestPacket.getPacketType().name(),
108                                     HexUtils.bytesToHex(currentRequest.RequestPacket.getPayload()),
109                                     HexUtils.bytesToHex(currentRequest.RequestPacket.getOptionalPayload()));
110
111                             byte[] b = serializePacket(currentRequest.RequestPacket);
112                             logger.trace("Sending raw data: {}", HexUtils.bytesToHex(b));
113                             outputStream.write(b);
114                             outputStream.flush();
115
116                             if (timeOut != null) {
117                                 timeOut.cancel(true);
118                             }
119
120                             // slowdown sending of message to avoid hickups at receivers
121                             // Todo tweak sending intervall (250 ist just a first try)
122                             timeOut = scheduler.schedule(() -> {
123                                 try {
124                                     sendNext();
125                                 } catch (IOException e) {
126                                     errorListener.ErrorOccured(e);
127                                     return;
128                                 }
129                             }, 250, TimeUnit.MILLISECONDS);
130                         }
131                     } else {
132                         sendNext();
133                     }
134                 } catch (EnOceanException e) {
135                     logger.error("exception while sending data", e);
136                 }
137             }
138         }
139     }
140
141     RequestQueue requestQueue;
142     Request currentRequest = null;
143
144     protected Map<Long, HashSet<PacketListener>> listeners;
145     protected HashSet<EventListener> eventListeners;
146     protected TeachInListener teachInListener;
147
148     protected InputStream inputStream;
149     protected OutputStream outputStream;
150
151     private byte[] filteredDeviceId;
152     TransceiverErrorListener errorListener;
153
154     public EnOceanTransceiver(String path, TransceiverErrorListener errorListener, ScheduledExecutorService scheduler,
155             SerialPortManager serialPortManager) {
156         requestQueue = new RequestQueue(scheduler);
157
158         listeners = new HashMap<>();
159         eventListeners = new HashSet<>();
160         teachInListener = null;
161
162         this.errorListener = errorListener;
163         this.serialPortManager = serialPortManager;
164         this.path = path;
165     }
166
167     public void Initialize()
168             throws UnsupportedCommOperationException, PortInUseException, IOException, TooManyListenersException {
169         SerialPortIdentifier id = serialPortManager.getIdentifier(path);
170         if (id == null) {
171             throw new IOException("Could not find a gateway on given path '" + path + "', "
172                     + serialPortManager.getIdentifiers().count() + " ports available.");
173         }
174
175         serialPort = id.open(EnOceanBindingConstants.BINDING_ID, 1000);
176         serialPort.setSerialPortParams(ENOCEAN_DEFAULT_BAUD, SerialPort.DATABITS_8, SerialPort.STOPBITS_1,
177                 SerialPort.PARITY_NONE);
178
179         try {
180             serialPort.enableReceiveThreshold(1);
181             serialPort.enableReceiveTimeout(100); // In ms. Small values mean faster shutdown but more cpu usage.
182         } catch (UnsupportedCommOperationException e) {
183             // rfc connections do not allow a ReceiveThreshold
184         }
185
186         inputStream = serialPort.getInputStream();
187         outputStream = serialPort.getOutputStream();
188
189         logger.info("EnOceanSerialTransceiver initialized");
190     }
191
192     public void StartReceiving(ScheduledExecutorService scheduler) {
193         if (readingTask == null || readingTask.isCancelled()) {
194             readingTask = scheduler.submit(new Runnable() {
195                 @Override
196                 public void run() {
197                     receivePackets();
198                 }
199             });
200         }
201         logger.info("EnOceanSerialTransceiver RX thread started");
202     }
203
204     public void ShutDown() {
205         logger.debug("shutting down transceiver");
206         logger.debug("Interrupt rx Thread");
207
208         if (timeOut != null) {
209             timeOut.cancel(true);
210         }
211
212         if (readingTask != null) {
213             readingTask.cancel(true);
214             try {
215                 inputStream.close();
216             } catch (Exception e) {
217             }
218         }
219
220         readingTask = null;
221         timeOut = null;
222         listeners.clear();
223         eventListeners.clear();
224         teachInListener = null;
225         errorListener = null;
226
227         if (outputStream != null) {
228             logger.debug("Closing serial output stream");
229             try {
230                 outputStream.close();
231             } catch (IOException e) {
232                 logger.debug("Error while closing the output stream: {}", e.getMessage());
233             }
234         }
235         if (inputStream != null) {
236             logger.debug("Closeing serial input stream");
237             try {
238                 inputStream.close();
239             } catch (IOException e) {
240                 logger.debug("Error while closing the input stream: {}", e.getMessage());
241             }
242         }
243
244         if (serialPort != null) {
245             logger.debug("Closing serial port");
246             serialPort.close();
247         }
248
249         serialPort = null;
250         outputStream = null;
251         inputStream = null;
252
253         logger.info("Transceiver shutdown");
254     }
255
256     private void receivePackets() {
257         byte[] buffer = new byte[1];
258
259         while (readingTask != null && !readingTask.isCancelled()) {
260             int bytesRead = read(buffer, 1);
261             if (bytesRead > 0) {
262                 processMessage(buffer[0]);
263             }
264         }
265     }
266
267     protected abstract void processMessage(byte firstByte);
268
269     protected int read(byte[] buffer, int length) {
270         try {
271             return this.inputStream.read(buffer, 0, length);
272         } catch (IOException e) {
273             return 0;
274         }
275     }
276
277     protected void informListeners(BasePacket packet) {
278         try {
279             if (packet.getPacketType() == ESPPacketType.RADIO_ERP1) {
280                 ERP1Message msg = (ERP1Message) packet;
281                 byte[] senderId = msg.getSenderId();
282                 byte[] d = Helper.concatAll(msg.getPayload(), msg.getOptionalPayload());
283
284                 logger.debug("{} with RORG {} for {} payload {} received", packet.getPacketType().name(),
285                         msg.getRORG().name(), HexUtils.bytesToHex(msg.getSenderId()), HexUtils.bytesToHex(d));
286
287                 if (msg.getRORG() != RORG.Unknown) {
288                     if (senderId != null) {
289                         if (filteredDeviceId != null && senderId[0] == filteredDeviceId[0]
290                                 && senderId[1] == filteredDeviceId[1] && senderId[2] == filteredDeviceId[2]) {
291                             // filter away own messages which are received through a repeater
292                             return;
293                         }
294
295                         if (teachInListener != null && (msg.getIsTeachIn() || msg.getRORG() == RORG.RPS)) {
296                             logger.info("Received teach in message from {}", HexUtils.bytesToHex(msg.getSenderId()));
297                             teachInListener.packetReceived(msg);
298                             return;
299                         } else if (teachInListener == null && msg.getIsTeachIn()) {
300                             logger.info("Discard message because this is a teach-in telegram from {}!",
301                                     HexUtils.bytesToHex(msg.getSenderId()));
302                             return;
303                         }
304
305                         long s = Long.parseLong(HexUtils.bytesToHex(senderId), 16);
306                         synchronized (this) {
307                             HashSet<PacketListener> pl = listeners.get(s);
308                             if (pl != null) {
309                                 pl.forEach(l -> l.packetReceived(msg));
310                             }
311                         }
312                     }
313                 } else {
314                     logger.debug("Received unknown RORG");
315                 }
316             } else if (packet.getPacketType() == ESPPacketType.EVENT) {
317                 EventMessage event = (EventMessage) packet;
318
319                 byte[] d = Helper.concatAll(packet.getPayload(), packet.getOptionalPayload());
320                 logger.debug("{} with type {} payload {} received", ESPPacketType.EVENT.name(),
321                         event.getEventMessageType().name(), HexUtils.bytesToHex(d));
322
323                 if (event.getEventMessageType() == EventMessageType.SA_CONFIRM_LEARN) {
324                     byte[] senderId = event.getPayload(EventMessageType.SA_CONFIRM_LEARN.getDataLength() - 5, 4);
325
326                     if (teachInListener != null) {
327                         logger.info("Received smart teach in from {}", HexUtils.bytesToHex(senderId));
328                         teachInListener.eventReceived(event);
329                         return;
330                     } else {
331                         logger.info("Discard message because this is a smart teach-in telegram from {}!",
332                                 HexUtils.bytesToHex(senderId));
333                         return;
334                     }
335                 }
336
337                 synchronized (this) {
338                     eventListeners.forEach(l -> l.eventReceived(event));
339                 }
340             }
341         } catch (Exception e) {
342             logger.error("Exception in informListeners", e);
343         }
344     }
345
346     protected void handleResponse(Response response) throws IOException {
347         if (currentRequest != null) {
348             if (currentRequest.ResponseListener != null) {
349                 currentRequest.ResponsePacket = response;
350                 try {
351                     currentRequest.ResponseListener.handleResponse(response);
352                 } catch (Exception e) {
353                     logger.debug("Exception during response handling");
354                 } finally {
355                     logger.trace("Response handled");
356                 }
357             } else {
358                 logger.trace("Response without listener");
359             }
360         } else {
361             logger.trace("Response without request");
362         }
363     }
364
365     public void sendBasePacket(BasePacket packet, ResponseListener<? extends Response> responseCallback)
366             throws IOException {
367         if (packet == null) {
368             return;
369         }
370
371         logger.debug("Enqueue new send request with ESP3 type {} {} callback", packet.getPacketType().name(),
372                 responseCallback == null ? "without" : "with");
373         Request r = new Request();
374         r.RequestPacket = packet;
375         r.ResponseListener = responseCallback;
376
377         requestQueue.enqueRequest(r);
378     }
379
380     protected abstract byte[] serializePacket(BasePacket packet) throws EnOceanException;
381
382     public synchronized void addPacketListener(PacketListener listener, long senderIdToListenTo) {
383         if (listeners.computeIfAbsent(senderIdToListenTo, k -> new HashSet<>()).add(listener)) {
384             logger.debug("Listener added: {}", senderIdToListenTo);
385         }
386     }
387
388     public synchronized void removePacketListener(PacketListener listener, long senderIdToListenTo) {
389         HashSet<PacketListener> pl = listeners.get(senderIdToListenTo);
390         if (pl != null) {
391             pl.remove(listener);
392             if (pl.isEmpty()) {
393                 listeners.remove(senderIdToListenTo);
394             }
395         }
396     }
397
398     public synchronized void addEventMessageListener(EventListener listener) {
399         eventListeners.add(listener);
400     }
401
402     public synchronized void removeEventMessageListener(EventListener listener) {
403         eventListeners.remove(listener);
404     }
405
406     public void startDiscovery(TeachInListener teachInListener) {
407         this.teachInListener = teachInListener;
408     }
409
410     public void stopDiscovery() {
411         this.teachInListener = null;
412     }
413
414     public void setFilteredDeviceId(byte[] filteredDeviceId) {
415         if (filteredDeviceId != null) {
416             System.arraycopy(filteredDeviceId, 0, filteredDeviceId, 0, filteredDeviceId.length);
417         }
418     }
419
420     @Override
421     public void serialEvent(SerialPortEvent event) {
422         if (event.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
423             synchronized (this) {
424                 this.notify();
425             }
426         }
427     }
428 }