2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.enocean.internal.transceiver;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.OutputStream;
18 import java.util.HashMap;
19 import java.util.HashSet;
21 import java.util.Queue;
22 import java.util.TooManyListenersException;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.TimeUnit;
28 import org.openhab.binding.enocean.internal.EnOceanBindingConstants;
29 import org.openhab.binding.enocean.internal.EnOceanException;
30 import org.openhab.binding.enocean.internal.Helper;
31 import org.openhab.binding.enocean.internal.messages.BasePacket;
32 import org.openhab.binding.enocean.internal.messages.BasePacket.ESPPacketType;
33 import org.openhab.binding.enocean.internal.messages.ERP1Message;
34 import org.openhab.binding.enocean.internal.messages.ERP1Message.RORG;
35 import org.openhab.binding.enocean.internal.messages.EventMessage;
36 import org.openhab.binding.enocean.internal.messages.EventMessage.EventMessageType;
37 import org.openhab.binding.enocean.internal.messages.Response;
38 import org.openhab.core.io.transport.serial.PortInUseException;
39 import org.openhab.core.io.transport.serial.SerialPort;
40 import org.openhab.core.io.transport.serial.SerialPortEvent;
41 import org.openhab.core.io.transport.serial.SerialPortEventListener;
42 import org.openhab.core.io.transport.serial.SerialPortIdentifier;
43 import org.openhab.core.io.transport.serial.SerialPortManager;
44 import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
45 import org.openhab.core.util.HexUtils;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
51 * @author Daniel Weber - Initial contribution
53 public abstract class EnOceanTransceiver implements SerialPortEventListener {
55 public static final int ENOCEAN_MAX_DATA = 65790;
58 protected Future<?> readingTask = null;
59 private Future<?> timeOut = null;
61 protected Logger logger = LoggerFactory.getLogger(EnOceanTransceiver.class);
63 private SerialPortManager serialPortManager;
64 private static final int ENOCEAN_DEFAULT_BAUD = 57600;
65 protected String path;
66 SerialPort serialPort;
69 BasePacket RequestPacket;
71 Response ResponsePacket;
72 ResponseListener<? extends Response> ResponseListener;
75 private class RequestQueue {
76 private Queue<Request> queue = new LinkedBlockingQueue<>();
77 private ScheduledExecutorService scheduler;
79 public RequestQueue(ScheduledExecutorService scheduler) {
80 this.scheduler = scheduler;
83 public synchronized void enqueRequest(Request request) throws IOException {
84 boolean wasEmpty = queue.isEmpty();
86 if (queue.offer(request)) {
91 logger.error("Transmit queue overflow. Lost message: {}", request);
95 private synchronized void sendNext() throws IOException {
100 private synchronized void send() throws IOException {
101 if (!queue.isEmpty()) {
102 currentRequest = queue.peek();
104 if (currentRequest != null && currentRequest.RequestPacket != null) {
105 synchronized (currentRequest) {
106 logger.debug("Sending data, type {}, payload {}{}",
107 currentRequest.RequestPacket.getPacketType().name(),
108 HexUtils.bytesToHex(currentRequest.RequestPacket.getPayload()),
109 HexUtils.bytesToHex(currentRequest.RequestPacket.getOptionalPayload()));
111 byte[] b = serializePacket(currentRequest.RequestPacket);
112 logger.trace("Sending raw data: {}", HexUtils.bytesToHex(b));
113 outputStream.write(b);
114 outputStream.flush();
116 if (timeOut != null) {
117 timeOut.cancel(true);
120 // slowdown sending of message to avoid hickups at receivers
121 // Todo tweak sending intervall (250 ist just a first try)
122 timeOut = scheduler.schedule(() -> {
125 } catch (IOException e) {
126 errorListener.ErrorOccured(e);
129 }, 250, TimeUnit.MILLISECONDS);
134 } catch (EnOceanException e) {
135 logger.error("exception while sending data", e);
141 RequestQueue requestQueue;
142 Request currentRequest = null;
144 protected Map<Long, HashSet<PacketListener>> listeners;
145 protected HashSet<EventListener> eventListeners;
146 protected TeachInListener teachInListener;
148 protected InputStream inputStream;
149 protected OutputStream outputStream;
151 private byte[] filteredDeviceId;
152 TransceiverErrorListener errorListener;
154 public EnOceanTransceiver(String path, TransceiverErrorListener errorListener, ScheduledExecutorService scheduler,
155 SerialPortManager serialPortManager) {
156 requestQueue = new RequestQueue(scheduler);
158 listeners = new HashMap<>();
159 eventListeners = new HashSet<>();
160 teachInListener = null;
162 this.errorListener = errorListener;
163 this.serialPortManager = serialPortManager;
167 public void Initialize()
168 throws UnsupportedCommOperationException, PortInUseException, IOException, TooManyListenersException {
169 SerialPortIdentifier id = serialPortManager.getIdentifier(path);
171 throw new IOException("Could not find a gateway on given path '" + path + "', "
172 + serialPortManager.getIdentifiers().count() + " ports available.");
175 serialPort = id.open(EnOceanBindingConstants.BINDING_ID, 1000);
176 serialPort.setSerialPortParams(ENOCEAN_DEFAULT_BAUD, SerialPort.DATABITS_8, SerialPort.STOPBITS_1,
177 SerialPort.PARITY_NONE);
180 serialPort.enableReceiveThreshold(1);
181 serialPort.enableReceiveTimeout(100); // In ms. Small values mean faster shutdown but more cpu usage.
182 } catch (UnsupportedCommOperationException e) {
183 // rfc connections do not allow a ReceiveThreshold
186 inputStream = serialPort.getInputStream();
187 outputStream = serialPort.getOutputStream();
189 logger.info("EnOceanSerialTransceiver initialized");
192 public void StartReceiving(ScheduledExecutorService scheduler) {
193 if (readingTask == null || readingTask.isCancelled()) {
194 readingTask = scheduler.submit(new Runnable() {
201 logger.info("EnOceanSerialTransceiver RX thread started");
204 public void ShutDown() {
205 logger.debug("shutting down transceiver");
206 logger.debug("Interrupt rx Thread");
208 if (timeOut != null) {
209 timeOut.cancel(true);
212 if (readingTask != null) {
213 readingTask.cancel(true);
216 } catch (Exception e) {
223 eventListeners.clear();
224 teachInListener = null;
225 errorListener = null;
227 if (outputStream != null) {
228 logger.debug("Closing serial output stream");
230 outputStream.close();
231 } catch (IOException e) {
232 logger.debug("Error while closing the output stream: {}", e.getMessage());
235 if (inputStream != null) {
236 logger.debug("Closeing serial input stream");
239 } catch (IOException e) {
240 logger.debug("Error while closing the input stream: {}", e.getMessage());
244 if (serialPort != null) {
245 logger.debug("Closing serial port");
253 logger.info("Transceiver shutdown");
256 private void receivePackets() {
257 byte[] buffer = new byte[1];
259 while (readingTask != null && !readingTask.isCancelled()) {
260 int bytesRead = read(buffer, 1);
262 processMessage(buffer[0]);
267 protected abstract void processMessage(byte firstByte);
269 protected int read(byte[] buffer, int length) {
271 return this.inputStream.read(buffer, 0, length);
272 } catch (IOException e) {
277 protected void informListeners(BasePacket packet) {
279 if (packet.getPacketType() == ESPPacketType.RADIO_ERP1) {
280 ERP1Message msg = (ERP1Message) packet;
281 byte[] senderId = msg.getSenderId();
282 byte[] d = Helper.concatAll(msg.getPayload(), msg.getOptionalPayload());
284 logger.debug("{} with RORG {} for {} payload {} received", packet.getPacketType().name(),
285 msg.getRORG().name(), HexUtils.bytesToHex(msg.getSenderId()), HexUtils.bytesToHex(d));
287 if (msg.getRORG() != RORG.Unknown) {
288 if (senderId != null) {
289 if (filteredDeviceId != null && senderId[0] == filteredDeviceId[0]
290 && senderId[1] == filteredDeviceId[1] && senderId[2] == filteredDeviceId[2]) {
291 // filter away own messages which are received through a repeater
295 if (teachInListener != null && (msg.getIsTeachIn() || msg.getRORG() == RORG.RPS)) {
296 logger.info("Received teach in message from {}", HexUtils.bytesToHex(msg.getSenderId()));
297 teachInListener.packetReceived(msg);
299 } else if (teachInListener == null && msg.getIsTeachIn()) {
300 logger.info("Discard message because this is a teach-in telegram from {}!",
301 HexUtils.bytesToHex(msg.getSenderId()));
305 long s = Long.parseLong(HexUtils.bytesToHex(senderId), 16);
306 synchronized (this) {
307 HashSet<PacketListener> pl = listeners.get(s);
309 pl.forEach(l -> l.packetReceived(msg));
314 logger.debug("Received unknown RORG");
316 } else if (packet.getPacketType() == ESPPacketType.EVENT) {
317 EventMessage event = (EventMessage) packet;
319 byte[] d = Helper.concatAll(packet.getPayload(), packet.getOptionalPayload());
320 logger.debug("{} with type {} payload {} received", ESPPacketType.EVENT.name(),
321 event.getEventMessageType().name(), HexUtils.bytesToHex(d));
323 if (event.getEventMessageType() == EventMessageType.SA_CONFIRM_LEARN) {
324 byte[] senderId = event.getPayload(EventMessageType.SA_CONFIRM_LEARN.getDataLength() - 5, 4);
326 if (teachInListener != null) {
327 logger.info("Received smart teach in from {}", HexUtils.bytesToHex(senderId));
328 teachInListener.eventReceived(event);
331 logger.info("Discard message because this is a smart teach-in telegram from {}!",
332 HexUtils.bytesToHex(senderId));
337 synchronized (this) {
338 eventListeners.forEach(l -> l.eventReceived(event));
341 } catch (Exception e) {
342 logger.error("Exception in informListeners", e);
346 protected void handleResponse(Response response) throws IOException {
347 if (currentRequest != null) {
348 if (currentRequest.ResponseListener != null) {
349 currentRequest.ResponsePacket = response;
351 currentRequest.ResponseListener.handleResponse(response);
352 } catch (Exception e) {
353 logger.debug("Exception during response handling");
355 logger.trace("Response handled");
358 logger.trace("Response without listener");
361 logger.trace("Response without request");
365 public void sendBasePacket(BasePacket packet, ResponseListener<? extends Response> responseCallback)
367 if (packet == null) {
371 logger.debug("Enqueue new send request with ESP3 type {} {} callback", packet.getPacketType().name(),
372 responseCallback == null ? "without" : "with");
373 Request r = new Request();
374 r.RequestPacket = packet;
375 r.ResponseListener = responseCallback;
377 requestQueue.enqueRequest(r);
380 protected abstract byte[] serializePacket(BasePacket packet) throws EnOceanException;
382 public synchronized void addPacketListener(PacketListener listener, long senderIdToListenTo) {
383 if (listeners.computeIfAbsent(senderIdToListenTo, k -> new HashSet<>()).add(listener)) {
384 logger.debug("Listener added: {}", senderIdToListenTo);
388 public synchronized void removePacketListener(PacketListener listener, long senderIdToListenTo) {
389 HashSet<PacketListener> pl = listeners.get(senderIdToListenTo);
393 listeners.remove(senderIdToListenTo);
398 public synchronized void addEventMessageListener(EventListener listener) {
399 eventListeners.add(listener);
402 public synchronized void removeEventMessageListener(EventListener listener) {
403 eventListeners.remove(listener);
406 public void startDiscovery(TeachInListener teachInListener) {
407 this.teachInListener = teachInListener;
410 public void stopDiscovery() {
411 this.teachInListener = null;
414 public void setFilteredDeviceId(byte[] filteredDeviceId) {
415 if (filteredDeviceId != null) {
416 System.arraycopy(filteredDeviceId, 0, filteredDeviceId, 0, filteredDeviceId.length);
421 public void serialEvent(SerialPortEvent event) {
422 if (event.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
423 synchronized (this) {