2 * Copyright (c) 2010-2021 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.enocean.internal.transceiver;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.OutputStream;
18 import java.util.HashMap;
19 import java.util.HashSet;
21 import java.util.Queue;
22 import java.util.TooManyListenersException;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.TimeUnit;
28 import org.openhab.binding.enocean.internal.EnOceanBindingConstants;
29 import org.openhab.binding.enocean.internal.EnOceanException;
30 import org.openhab.binding.enocean.internal.Helper;
31 import org.openhab.binding.enocean.internal.messages.BasePacket;
32 import org.openhab.binding.enocean.internal.messages.BasePacket.ESPPacketType;
33 import org.openhab.binding.enocean.internal.messages.ERP1Message;
34 import org.openhab.binding.enocean.internal.messages.ERP1Message.RORG;
35 import org.openhab.binding.enocean.internal.messages.EventMessage;
36 import org.openhab.binding.enocean.internal.messages.EventMessage.EventMessageType;
37 import org.openhab.binding.enocean.internal.messages.Response;
38 import org.openhab.core.io.transport.serial.PortInUseException;
39 import org.openhab.core.io.transport.serial.SerialPort;
40 import org.openhab.core.io.transport.serial.SerialPortEvent;
41 import org.openhab.core.io.transport.serial.SerialPortEventListener;
42 import org.openhab.core.io.transport.serial.SerialPortIdentifier;
43 import org.openhab.core.io.transport.serial.SerialPortManager;
44 import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
45 import org.openhab.core.util.HexUtils;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
51 * @author Daniel Weber - Initial contribution
53 public abstract class EnOceanTransceiver implements SerialPortEventListener {
55 public static final int ENOCEAN_MAX_DATA = 65790;
58 protected Future<?> readingTask = null;
59 private Future<?> timeOut = null;
61 protected Logger logger = LoggerFactory.getLogger(EnOceanTransceiver.class);
63 private SerialPortManager serialPortManager;
64 private static final int ENOCEAN_DEFAULT_BAUD = 57600;
65 protected String path;
66 SerialPort serialPort;
69 BasePacket RequestPacket;
71 Response ResponsePacket;
72 ResponseListener<? extends Response> ResponseListener;
75 private class RequestQueue {
76 private Queue<Request> queue = new LinkedBlockingQueue<>();
77 private ScheduledExecutorService scheduler;
79 public RequestQueue(ScheduledExecutorService scheduler) {
80 this.scheduler = scheduler;
83 public synchronized void enqueRequest(Request request) throws IOException {
84 boolean wasEmpty = queue.isEmpty();
86 if (queue.offer(request)) {
91 logger.error("Transmit queue overflow. Lost message: {}", request);
95 private synchronized void sendNext() throws IOException {
100 private synchronized void send() throws IOException {
101 if (!queue.isEmpty()) {
102 currentRequest = queue.peek();
104 if (currentRequest != null && currentRequest.RequestPacket != null) {
105 synchronized (currentRequest) {
106 logger.debug("Sending data, type {}, payload {}{}",
107 currentRequest.RequestPacket.getPacketType().name(),
108 HexUtils.bytesToHex(currentRequest.RequestPacket.getPayload()),
109 HexUtils.bytesToHex(currentRequest.RequestPacket.getOptionalPayload()));
111 byte[] b = serializePacket(currentRequest.RequestPacket);
112 logger.trace("Sending raw data: {}", HexUtils.bytesToHex(b));
113 outputStream.write(b);
114 outputStream.flush();
116 if (timeOut != null) {
117 timeOut.cancel(true);
120 // slowdown sending of message to avoid hickups at receivers
121 // Todo tweak sending intervall (250 ist just a first try)
122 timeOut = scheduler.schedule(() -> {
125 } catch (IOException e) {
126 errorListener.ErrorOccured(e);
129 }, 250, TimeUnit.MILLISECONDS);
134 } catch (EnOceanException e) {
135 logger.error("exception while sending data", e);
141 RequestQueue requestQueue;
142 Request currentRequest = null;
144 protected Map<Long, HashSet<PacketListener>> listeners;
145 protected HashSet<EventListener> eventListeners;
146 protected TeachInListener teachInListener;
148 protected InputStream inputStream;
149 protected OutputStream outputStream;
151 private byte[] filteredDeviceId;
152 TransceiverErrorListener errorListener;
154 public EnOceanTransceiver(String path, TransceiverErrorListener errorListener, ScheduledExecutorService scheduler,
155 SerialPortManager serialPortManager) {
156 requestQueue = new RequestQueue(scheduler);
158 listeners = new HashMap<>();
159 eventListeners = new HashSet<>();
160 teachInListener = null;
162 this.errorListener = errorListener;
163 this.serialPortManager = serialPortManager;
167 public void Initialize()
168 throws UnsupportedCommOperationException, PortInUseException, IOException, TooManyListenersException {
169 SerialPortIdentifier id = serialPortManager.getIdentifier(path);
171 throw new IOException("Could not find a gateway on given path '" + path + "', "
172 + serialPortManager.getIdentifiers().count() + " ports available.");
175 serialPort = id.open(EnOceanBindingConstants.BINDING_ID, 1000);
176 serialPort.setSerialPortParams(ENOCEAN_DEFAULT_BAUD, SerialPort.DATABITS_8, SerialPort.STOPBITS_1,
177 SerialPort.PARITY_NONE);
180 serialPort.enableReceiveThreshold(1);
181 serialPort.enableReceiveTimeout(100); // In ms. Small values mean faster shutdown but more cpu usage.
182 } catch (UnsupportedCommOperationException e) {
183 // rfc connections do not allow a ReceiveThreshold
186 inputStream = serialPort.getInputStream();
187 outputStream = serialPort.getOutputStream();
189 logger.info("EnOceanSerialTransceiver initialized");
192 public void StartReceiving(ScheduledExecutorService scheduler) {
193 if (readingTask == null || readingTask.isCancelled()) {
194 readingTask = scheduler.submit(new Runnable() {
201 logger.info("EnOceanSerialTransceiver RX thread started");
204 public void ShutDown() {
205 logger.debug("shutting down transceiver");
206 logger.debug("Interrupt rx Thread");
208 if (timeOut != null) {
209 timeOut.cancel(true);
212 if (readingTask != null) {
213 readingTask.cancel(true);
216 } catch (Exception e) {
223 teachInListener = null;
224 errorListener = null;
226 if (outputStream != null) {
227 logger.debug("Closing serial output stream");
229 outputStream.close();
230 } catch (IOException e) {
231 logger.debug("Error while closing the output stream: {}", e.getMessage());
234 if (inputStream != null) {
235 logger.debug("Closeing serial input stream");
238 } catch (IOException e) {
239 logger.debug("Error while closing the input stream: {}", e.getMessage());
243 if (serialPort != null) {
244 logger.debug("Closing serial port");
252 logger.info("Transceiver shutdown");
255 private void receivePackets() {
256 byte[] buffer = new byte[1];
258 while (readingTask != null && !readingTask.isCancelled()) {
259 int bytesRead = read(buffer, 1);
261 processMessage(buffer[0]);
266 protected abstract void processMessage(byte firstByte);
268 protected int read(byte[] buffer, int length) {
270 return this.inputStream.read(buffer, 0, length);
271 } catch (IOException e) {
276 protected void informListeners(BasePacket packet) {
278 if (packet.getPacketType() == ESPPacketType.RADIO_ERP1) {
279 ERP1Message msg = (ERP1Message) packet;
280 byte[] senderId = msg.getSenderId();
281 byte[] d = Helper.concatAll(msg.getPayload(), msg.getOptionalPayload());
283 logger.debug("{} with RORG {} for {} payload {} received", packet.getPacketType().name(),
284 msg.getRORG().name(), HexUtils.bytesToHex(msg.getSenderId()), HexUtils.bytesToHex(d));
286 if (msg.getRORG() != RORG.Unknown) {
287 if (senderId != null) {
288 if (filteredDeviceId != null && senderId[0] == filteredDeviceId[0]
289 && senderId[1] == filteredDeviceId[1] && senderId[2] == filteredDeviceId[2]) {
290 // filter away own messages which are received through a repeater
294 if (teachInListener != null && (msg.getIsTeachIn() || msg.getRORG() == RORG.RPS)) {
295 logger.info("Received teach in message from {}", HexUtils.bytesToHex(msg.getSenderId()));
296 teachInListener.packetReceived(msg);
298 } else if (teachInListener == null && msg.getIsTeachIn()) {
299 logger.info("Discard message because this is a teach-in telegram from {}!",
300 HexUtils.bytesToHex(msg.getSenderId()));
304 long s = Long.parseLong(HexUtils.bytesToHex(senderId), 16);
305 HashSet<PacketListener> pl = listeners.get(s);
307 pl.forEach(l -> l.packetReceived(msg));
311 logger.debug("Received unknown RORG");
313 } else if (packet.getPacketType() == ESPPacketType.EVENT) {
314 EventMessage event = (EventMessage) packet;
316 byte[] d = Helper.concatAll(packet.getPayload(), packet.getOptionalPayload());
317 logger.debug("{} with type {} payload {} received", ESPPacketType.EVENT.name(),
318 event.getEventMessageType().name(), HexUtils.bytesToHex(d));
320 if (event.getEventMessageType() == EventMessageType.SA_CONFIRM_LEARN) {
321 byte[] senderId = event.getPayload(EventMessageType.SA_CONFIRM_LEARN.getDataLength() - 5, 4);
323 if (teachInListener != null) {
324 logger.info("Received smart teach in from {}", HexUtils.bytesToHex(senderId));
325 teachInListener.eventReceived(event);
328 logger.info("Discard message because this is a smart teach-in telegram from {}!",
329 HexUtils.bytesToHex(senderId));
334 eventListeners.forEach(l -> l.eventReceived(event));
336 } catch (Exception e) {
337 logger.error("Exception in informListeners", e);
341 protected void handleResponse(Response response) throws IOException {
342 if (currentRequest != null) {
343 if (currentRequest.ResponseListener != null) {
344 currentRequest.ResponsePacket = response;
346 currentRequest.ResponseListener.handleResponse(response);
347 } catch (Exception e) {
348 logger.debug("Exception during response handling");
350 logger.trace("Response handled");
353 logger.trace("Response without listener");
356 logger.trace("Response without request");
360 public void sendBasePacket(BasePacket packet, ResponseListener<? extends Response> responseCallback)
362 if (packet == null) {
366 logger.debug("Enqueue new send request with ESP3 type {} {} callback", packet.getPacketType().name(),
367 responseCallback == null ? "without" : "with");
368 Request r = new Request();
369 r.RequestPacket = packet;
370 r.ResponseListener = responseCallback;
372 requestQueue.enqueRequest(r);
375 protected abstract byte[] serializePacket(BasePacket packet) throws EnOceanException;
377 public void addPacketListener(PacketListener listener, long senderIdToListenTo) {
378 if (listeners.computeIfAbsent(senderIdToListenTo, k -> new HashSet<>()).add(listener)) {
379 logger.debug("Listener added: {}", senderIdToListenTo);
383 public void removePacketListener(PacketListener listener, long senderIdToListenTo) {
384 HashSet<PacketListener> pl = listeners.get(senderIdToListenTo);
388 listeners.remove(senderIdToListenTo);
393 public void addEventMessageListener(EventListener listener) {
394 eventListeners.add(listener);
397 public void removeEventMessageListener(EventListener listener) {
398 eventListeners.remove(listener);
401 public void startDiscovery(TeachInListener teachInListener) {
402 this.teachInListener = teachInListener;
405 public void stopDiscovery() {
406 this.teachInListener = null;
409 public void setFilteredDeviceId(byte[] filteredDeviceId) {
410 if (filteredDeviceId != null) {
411 System.arraycopy(filteredDeviceId, 0, filteredDeviceId, 0, filteredDeviceId.length);
416 public void serialEvent(SerialPortEvent event) {
417 if (event.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
418 synchronized (this) {