]> git.basschouten.com Git - openhab-addons.git/blob
63fb802e0df6b6efc428633aa2efdeb1b505d922
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.enocean.internal.transceiver;
14
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.OutputStream;
18 import java.util.HashMap;
19 import java.util.HashSet;
20 import java.util.Map;
21 import java.util.Queue;
22 import java.util.TooManyListenersException;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.TimeUnit;
27
28 import org.openhab.binding.enocean.internal.EnOceanBindingConstants;
29 import org.openhab.binding.enocean.internal.EnOceanException;
30 import org.openhab.binding.enocean.internal.messages.BasePacket;
31 import org.openhab.binding.enocean.internal.messages.ERP1Message;
32 import org.openhab.binding.enocean.internal.messages.ERP1Message.RORG;
33 import org.openhab.binding.enocean.internal.messages.Response;
34 import org.openhab.core.io.transport.serial.PortInUseException;
35 import org.openhab.core.io.transport.serial.SerialPort;
36 import org.openhab.core.io.transport.serial.SerialPortEvent;
37 import org.openhab.core.io.transport.serial.SerialPortEventListener;
38 import org.openhab.core.io.transport.serial.SerialPortIdentifier;
39 import org.openhab.core.io.transport.serial.SerialPortManager;
40 import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
41 import org.openhab.core.util.HexUtils;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  *
47  * @author Daniel Weber - Initial contribution
48  */
49 public abstract class EnOceanTransceiver implements SerialPortEventListener {
50
51     public static final int ENOCEAN_MAX_DATA = 65790;
52
53     // Thread management
54     protected Future<?> readingTask = null;
55     private Future<?> timeOut = null;
56
57     protected Logger logger = LoggerFactory.getLogger(EnOceanTransceiver.class);
58
59     private SerialPortManager serialPortManager;
60     private static final int ENOCEAN_DEFAULT_BAUD = 57600;
61     protected String path;
62     SerialPort serialPort;
63
64     class Request {
65         BasePacket RequestPacket;
66
67         Response ResponsePacket;
68         ResponseListener<? extends Response> ResponseListener;
69     }
70
71     private class RequestQueue {
72         private Queue<Request> queue = new LinkedBlockingQueue<>();
73         private ScheduledExecutorService scheduler;
74
75         public RequestQueue(ScheduledExecutorService scheduler) {
76             this.scheduler = scheduler;
77         }
78
79         public synchronized void enqueRequest(Request request) throws IOException {
80             boolean wasEmpty = queue.isEmpty();
81
82             if (queue.offer(request)) {
83                 if (wasEmpty) {
84                     send();
85                 }
86             } else {
87                 logger.error("Transmit queue overflow. Lost message: {}", request);
88             }
89         }
90
91         private synchronized void sendNext() throws IOException {
92             queue.poll();
93             send();
94         }
95
96         private synchronized void send() throws IOException {
97             if (!queue.isEmpty()) {
98                 currentRequest = queue.peek();
99                 try {
100                     if (currentRequest != null && currentRequest.RequestPacket != null) {
101                         synchronized (currentRequest) {
102                             logger.debug("Sending data, type {}, payload {}{}",
103                                     currentRequest.RequestPacket.getPacketType().name(),
104                                     HexUtils.bytesToHex(currentRequest.RequestPacket.getPayload()),
105                                     HexUtils.bytesToHex(currentRequest.RequestPacket.getOptionalPayload()));
106
107                             byte[] b = serializePacket(currentRequest.RequestPacket);
108                             logger.trace("Sending raw data: {}", HexUtils.bytesToHex(b));
109                             outputStream.write(b);
110                             outputStream.flush();
111
112                             if (timeOut != null) {
113                                 timeOut.cancel(true);
114                             }
115
116                             // slowdown sending of message to avoid hickups at receivers
117                             // Todo tweak sending intervall (250 ist just a first try)
118                             timeOut = scheduler.schedule(() -> {
119                                 try {
120                                     sendNext();
121                                 } catch (IOException e) {
122                                     errorListener.ErrorOccured(e);
123                                     return;
124                                 }
125                             }, 250, TimeUnit.MILLISECONDS);
126                         }
127                     } else {
128                         sendNext();
129                     }
130                 } catch (EnOceanException e) {
131                     logger.error("exception while sending data", e);
132                 }
133             }
134         }
135     }
136
137     RequestQueue requestQueue;
138     Request currentRequest = null;
139
140     protected Map<Long, HashSet<PacketListener>> listeners;
141     protected PacketListener teachInListener;
142
143     protected InputStream inputStream;
144     protected OutputStream outputStream;
145
146     private byte[] filteredDeviceId;
147     TransceiverErrorListener errorListener;
148
149     public EnOceanTransceiver(String path, TransceiverErrorListener errorListener, ScheduledExecutorService scheduler,
150             SerialPortManager serialPortManager) {
151         requestQueue = new RequestQueue(scheduler);
152
153         listeners = new HashMap<>();
154         teachInListener = null;
155
156         this.errorListener = errorListener;
157         this.serialPortManager = serialPortManager;
158         this.path = path;
159     }
160
161     public void Initialize()
162             throws UnsupportedCommOperationException, PortInUseException, IOException, TooManyListenersException {
163         SerialPortIdentifier id = serialPortManager.getIdentifier(path);
164         if (id == null) {
165             throw new IOException("Could not find a gateway on given path '" + path + "', "
166                     + serialPortManager.getIdentifiers().count() + " ports available.");
167         }
168
169         serialPort = id.open(EnOceanBindingConstants.BINDING_ID, 1000);
170         serialPort.setSerialPortParams(ENOCEAN_DEFAULT_BAUD, SerialPort.DATABITS_8, SerialPort.STOPBITS_1,
171                 SerialPort.PARITY_NONE);
172
173         try {
174             serialPort.enableReceiveThreshold(1);
175             serialPort.enableReceiveTimeout(100); // In ms. Small values mean faster shutdown but more cpu usage.
176         } catch (UnsupportedCommOperationException e) {
177             // rfc connections do not allow a ReceiveThreshold
178         }
179
180         inputStream = serialPort.getInputStream();
181         outputStream = serialPort.getOutputStream();
182
183         logger.info("EnOceanSerialTransceiver initialized");
184     }
185
186     public void StartReceiving(ScheduledExecutorService scheduler) {
187         if (readingTask == null || readingTask.isCancelled()) {
188             readingTask = scheduler.submit(new Runnable() {
189                 @Override
190                 public void run() {
191                     receivePackets();
192                 }
193             });
194         }
195     }
196
197     public void ShutDown() {
198         logger.debug("shutting down transceiver");
199         logger.debug("Interrupt rx Thread");
200
201         if (timeOut != null) {
202             timeOut.cancel(true);
203         }
204
205         if (readingTask != null) {
206             readingTask.cancel(true);
207             try {
208                 inputStream.close();
209             } catch (Exception e) {
210             }
211         }
212
213         readingTask = null;
214         timeOut = null;
215         listeners.clear();
216         teachInListener = null;
217         errorListener = null;
218
219         if (outputStream != null) {
220             logger.debug("Closing serial output stream");
221             try {
222                 outputStream.close();
223             } catch (IOException e) {
224                 logger.debug("Error while closing the output stream: {}", e.getMessage());
225             }
226         }
227         if (inputStream != null) {
228             logger.debug("Closeing serial input stream");
229             try {
230                 inputStream.close();
231             } catch (IOException e) {
232                 logger.debug("Error while closing the input stream: {}", e.getMessage());
233             }
234         }
235
236         if (serialPort != null) {
237             logger.debug("Closing serial port");
238             serialPort.close();
239         }
240
241         serialPort = null;
242         outputStream = null;
243         inputStream = null;
244
245         logger.info("Transceiver shutdown");
246     }
247
248     private void receivePackets() {
249         byte[] buffer = new byte[1];
250
251         while (readingTask != null && !readingTask.isCancelled()) {
252             int bytesRead = read(buffer, 1);
253             if (bytesRead > 0) {
254                 processMessage(buffer[0]);
255             }
256         }
257     }
258
259     protected abstract void processMessage(byte firstByte);
260
261     protected int read(byte[] buffer, int length) {
262         try {
263             return this.inputStream.read(buffer, 0, length);
264         } catch (IOException e) {
265             return 0;
266         }
267     }
268
269     protected void informListeners(ERP1Message msg) {
270         try {
271             byte[] senderId = msg.getSenderId();
272
273             if (senderId != null) {
274                 if (filteredDeviceId != null && senderId[0] == filteredDeviceId[0] && senderId[1] == filteredDeviceId[1]
275                         && senderId[2] == filteredDeviceId[2]) {
276                     // filter away own messages which are received through a repeater
277                     return;
278                 }
279
280                 if (teachInListener != null) {
281                     if (msg.getIsTeachIn() || (msg.getRORG() == RORG.RPS)) {
282                         logger.info("Received teach in message from {}", HexUtils.bytesToHex(msg.getSenderId()));
283                         teachInListener.packetReceived(msg);
284                         return;
285                     }
286                 } else {
287                     if (msg.getIsTeachIn()) {
288                         logger.info("Discard message because this is a teach-in telegram from {}!",
289                                 HexUtils.bytesToHex(msg.getSenderId()));
290                         return;
291                     }
292                 }
293
294                 long s = Long.parseLong(HexUtils.bytesToHex(senderId), 16);
295                 HashSet<PacketListener> pl = listeners.get(s);
296                 if (pl != null) {
297                     pl.forEach(l -> l.packetReceived(msg));
298                 }
299             }
300         } catch (Exception e) {
301             logger.error("Exception in informListeners", e);
302         }
303     }
304
305     protected void handleResponse(Response response) throws IOException {
306         if (currentRequest != null) {
307             if (currentRequest.ResponseListener != null) {
308                 currentRequest.ResponsePacket = response;
309                 try {
310                     currentRequest.ResponseListener.handleResponse(response);
311                 } catch (Exception e) {
312                     logger.debug("Exception during response handling");
313                 } finally {
314                     logger.trace("Response handled");
315                 }
316             } else {
317                 logger.trace("Response without listener");
318             }
319         } else {
320             logger.trace("Response without request");
321         }
322     }
323
324     public void sendBasePacket(BasePacket packet, ResponseListener<? extends Response> responseCallback)
325             throws IOException {
326         if (packet == null) {
327             return;
328         }
329
330         logger.debug("Enqueue new send request with ESP3 type {} {} callback", packet.getPacketType().name(),
331                 responseCallback == null ? "without" : "with");
332         Request r = new Request();
333         r.RequestPacket = packet;
334         r.ResponseListener = responseCallback;
335
336         requestQueue.enqueRequest(r);
337     }
338
339     protected abstract byte[] serializePacket(BasePacket packet) throws EnOceanException;
340
341     public void addPacketListener(PacketListener listener, long senderIdToListenTo) {
342         if (listeners.computeIfAbsent(senderIdToListenTo, k -> new HashSet<>()).add(listener)) {
343             logger.debug("Listener added: {}", senderIdToListenTo);
344         }
345     }
346
347     public void removePacketListener(PacketListener listener, long senderIdToListenTo) {
348         HashSet<PacketListener> pl = listeners.get(senderIdToListenTo);
349         if (pl != null) {
350             pl.remove(listener);
351             if (pl.isEmpty()) {
352                 listeners.remove(senderIdToListenTo);
353             }
354         }
355     }
356
357     public void startDiscovery(PacketListener teachInListener) {
358         this.teachInListener = teachInListener;
359     }
360
361     public void stopDiscovery() {
362         this.teachInListener = null;
363     }
364
365     public void setFilteredDeviceId(byte[] filteredDeviceId) {
366         if (filteredDeviceId != null) {
367             System.arraycopy(filteredDeviceId, 0, filteredDeviceId, 0, filteredDeviceId.length);
368         }
369     }
370
371     @Override
372     public void serialEvent(SerialPortEvent event) {
373         if (event.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
374             synchronized (this) {
375                 this.notify();
376             }
377         }
378     }
379 }