2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.enocean.internal.transceiver;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.OutputStream;
18 import java.util.HashMap;
19 import java.util.HashSet;
21 import java.util.Queue;
22 import java.util.TooManyListenersException;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.TimeUnit;
28 import org.openhab.binding.enocean.internal.EnOceanBindingConstants;
29 import org.openhab.binding.enocean.internal.EnOceanException;
30 import org.openhab.binding.enocean.internal.messages.BasePacket;
31 import org.openhab.binding.enocean.internal.messages.ERP1Message;
32 import org.openhab.binding.enocean.internal.messages.ERP1Message.RORG;
33 import org.openhab.binding.enocean.internal.messages.Response;
34 import org.openhab.core.io.transport.serial.PortInUseException;
35 import org.openhab.core.io.transport.serial.SerialPort;
36 import org.openhab.core.io.transport.serial.SerialPortEvent;
37 import org.openhab.core.io.transport.serial.SerialPortEventListener;
38 import org.openhab.core.io.transport.serial.SerialPortIdentifier;
39 import org.openhab.core.io.transport.serial.SerialPortManager;
40 import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
41 import org.openhab.core.util.HexUtils;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
47 * @author Daniel Weber - Initial contribution
49 public abstract class EnOceanTransceiver implements SerialPortEventListener {
51 public static final int ENOCEAN_MAX_DATA = 65790;
54 protected Future<?> readingTask = null;
55 private Future<?> timeOut = null;
57 protected Logger logger = LoggerFactory.getLogger(EnOceanTransceiver.class);
59 private SerialPortManager serialPortManager;
60 private static final int ENOCEAN_DEFAULT_BAUD = 57600;
61 protected String path;
62 SerialPort serialPort;
65 BasePacket RequestPacket;
67 Response ResponsePacket;
68 ResponseListener<? extends Response> ResponseListener;
71 private class RequestQueue {
72 private Queue<Request> queue = new LinkedBlockingQueue<>();
73 private ScheduledExecutorService scheduler;
75 public RequestQueue(ScheduledExecutorService scheduler) {
76 this.scheduler = scheduler;
79 public synchronized void enqueRequest(Request request) throws IOException {
80 boolean wasEmpty = queue.isEmpty();
82 if (queue.offer(request)) {
87 logger.error("Transmit queue overflow. Lost message: {}", request);
91 private synchronized void sendNext() throws IOException {
96 private synchronized void send() throws IOException {
97 if (!queue.isEmpty()) {
98 currentRequest = queue.peek();
100 if (currentRequest != null && currentRequest.RequestPacket != null) {
101 synchronized (currentRequest) {
102 logger.debug("Sending data, type {}, payload {}{}",
103 currentRequest.RequestPacket.getPacketType().name(),
104 HexUtils.bytesToHex(currentRequest.RequestPacket.getPayload()),
105 HexUtils.bytesToHex(currentRequest.RequestPacket.getOptionalPayload()));
107 byte[] b = serializePacket(currentRequest.RequestPacket);
108 logger.trace("Sending raw data: {}", HexUtils.bytesToHex(b));
109 outputStream.write(b);
110 outputStream.flush();
112 if (timeOut != null) {
113 timeOut.cancel(true);
116 // slowdown sending of message to avoid hickups at receivers
117 // Todo tweak sending intervall (250 ist just a first try)
118 timeOut = scheduler.schedule(() -> {
121 } catch (IOException e) {
122 errorListener.ErrorOccured(e);
125 }, 250, TimeUnit.MILLISECONDS);
130 } catch (EnOceanException e) {
131 logger.error("exception while sending data", e);
137 RequestQueue requestQueue;
138 Request currentRequest = null;
140 protected Map<Long, HashSet<PacketListener>> listeners;
141 protected PacketListener teachInListener;
143 protected InputStream inputStream;
144 protected OutputStream outputStream;
146 private byte[] filteredDeviceId;
147 TransceiverErrorListener errorListener;
149 public EnOceanTransceiver(String path, TransceiverErrorListener errorListener, ScheduledExecutorService scheduler,
150 SerialPortManager serialPortManager) {
151 requestQueue = new RequestQueue(scheduler);
153 listeners = new HashMap<>();
154 teachInListener = null;
156 this.errorListener = errorListener;
157 this.serialPortManager = serialPortManager;
161 public void Initialize()
162 throws UnsupportedCommOperationException, PortInUseException, IOException, TooManyListenersException {
163 SerialPortIdentifier id = serialPortManager.getIdentifier(path);
165 throw new IOException("Could not find a gateway on given path '" + path + "', "
166 + serialPortManager.getIdentifiers().count() + " ports available.");
169 serialPort = id.open(EnOceanBindingConstants.BINDING_ID, 1000);
170 serialPort.setSerialPortParams(ENOCEAN_DEFAULT_BAUD, SerialPort.DATABITS_8, SerialPort.STOPBITS_1,
171 SerialPort.PARITY_NONE);
174 serialPort.enableReceiveThreshold(1);
175 serialPort.enableReceiveTimeout(100); // In ms. Small values mean faster shutdown but more cpu usage.
176 } catch (UnsupportedCommOperationException e) {
177 // rfc connections do not allow a ReceiveThreshold
180 inputStream = serialPort.getInputStream();
181 outputStream = serialPort.getOutputStream();
183 logger.info("EnOceanSerialTransceiver initialized");
186 public void StartReceiving(ScheduledExecutorService scheduler) {
187 if (readingTask == null || readingTask.isCancelled()) {
188 readingTask = scheduler.submit(new Runnable() {
197 public void ShutDown() {
198 logger.debug("shutting down transceiver");
199 logger.debug("Interrupt rx Thread");
201 if (timeOut != null) {
202 timeOut.cancel(true);
205 if (readingTask != null) {
206 readingTask.cancel(true);
209 } catch (Exception e) {
216 teachInListener = null;
217 errorListener = null;
219 if (outputStream != null) {
220 logger.debug("Closing serial output stream");
222 outputStream.close();
223 } catch (IOException e) {
224 logger.debug("Error while closing the output stream: {}", e.getMessage());
227 if (inputStream != null) {
228 logger.debug("Closeing serial input stream");
231 } catch (IOException e) {
232 logger.debug("Error while closing the input stream: {}", e.getMessage());
236 if (serialPort != null) {
237 logger.debug("Closing serial port");
245 logger.info("Transceiver shutdown");
248 private void receivePackets() {
249 byte[] buffer = new byte[1];
251 while (readingTask != null && !readingTask.isCancelled()) {
252 int bytesRead = read(buffer, 1);
254 processMessage(buffer[0]);
259 protected abstract void processMessage(byte firstByte);
261 protected int read(byte[] buffer, int length) {
263 return this.inputStream.read(buffer, 0, length);
264 } catch (IOException e) {
269 protected void informListeners(ERP1Message msg) {
271 byte[] senderId = msg.getSenderId();
273 if (senderId != null) {
274 if (filteredDeviceId != null && senderId[0] == filteredDeviceId[0] && senderId[1] == filteredDeviceId[1]
275 && senderId[2] == filteredDeviceId[2]) {
276 // filter away own messages which are received through a repeater
280 if (teachInListener != null) {
281 if (msg.getIsTeachIn() || (msg.getRORG() == RORG.RPS)) {
282 logger.info("Received teach in message from {}", HexUtils.bytesToHex(msg.getSenderId()));
283 teachInListener.packetReceived(msg);
287 if (msg.getIsTeachIn()) {
288 logger.info("Discard message because this is a teach-in telegram from {}!",
289 HexUtils.bytesToHex(msg.getSenderId()));
294 long s = Long.parseLong(HexUtils.bytesToHex(senderId), 16);
295 HashSet<PacketListener> pl = listeners.get(s);
297 pl.forEach(l -> l.packetReceived(msg));
300 } catch (Exception e) {
301 logger.error("Exception in informListeners", e);
305 protected void handleResponse(Response response) throws IOException {
306 if (currentRequest != null) {
307 if (currentRequest.ResponseListener != null) {
308 currentRequest.ResponsePacket = response;
310 currentRequest.ResponseListener.handleResponse(response);
311 } catch (Exception e) {
312 logger.debug("Exception during response handling");
314 logger.trace("Response handled");
317 logger.trace("Response without listener");
320 logger.trace("Response without request");
324 public void sendBasePacket(BasePacket packet, ResponseListener<? extends Response> responseCallback)
326 if (packet == null) {
330 logger.debug("Enqueue new send request with ESP3 type {} {} callback", packet.getPacketType().name(),
331 responseCallback == null ? "without" : "with");
332 Request r = new Request();
333 r.RequestPacket = packet;
334 r.ResponseListener = responseCallback;
336 requestQueue.enqueRequest(r);
339 protected abstract byte[] serializePacket(BasePacket packet) throws EnOceanException;
341 public void addPacketListener(PacketListener listener, long senderIdToListenTo) {
342 if (listeners.computeIfAbsent(senderIdToListenTo, k -> new HashSet<>()).add(listener)) {
343 logger.debug("Listener added: {}", senderIdToListenTo);
347 public void removePacketListener(PacketListener listener, long senderIdToListenTo) {
348 HashSet<PacketListener> pl = listeners.get(senderIdToListenTo);
352 listeners.remove(senderIdToListenTo);
357 public void startDiscovery(PacketListener teachInListener) {
358 this.teachInListener = teachInListener;
361 public void stopDiscovery() {
362 this.teachInListener = null;
365 public void setFilteredDeviceId(byte[] filteredDeviceId) {
366 if (filteredDeviceId != null) {
367 System.arraycopy(filteredDeviceId, 0, filteredDeviceId, 0, filteredDeviceId.length);
372 public void serialEvent(SerialPortEvent event) {
373 if (event.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
374 synchronized (this) {