2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.enocean.internal.transceiver;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.OutputStream;
18 import java.util.HashMap;
19 import java.util.HashSet;
21 import java.util.Queue;
22 import java.util.TooManyListenersException;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.TimeUnit;
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.openhab.binding.enocean.internal.EnOceanBindingConstants;
31 import org.openhab.binding.enocean.internal.EnOceanException;
32 import org.openhab.binding.enocean.internal.Helper;
33 import org.openhab.binding.enocean.internal.messages.BasePacket;
34 import org.openhab.binding.enocean.internal.messages.BasePacket.ESPPacketType;
35 import org.openhab.binding.enocean.internal.messages.ERP1Message;
36 import org.openhab.binding.enocean.internal.messages.ERP1Message.RORG;
37 import org.openhab.binding.enocean.internal.messages.EventMessage;
38 import org.openhab.binding.enocean.internal.messages.EventMessage.EventMessageType;
39 import org.openhab.binding.enocean.internal.messages.Response;
40 import org.openhab.core.io.transport.serial.PortInUseException;
41 import org.openhab.core.io.transport.serial.SerialPort;
42 import org.openhab.core.io.transport.serial.SerialPortEvent;
43 import org.openhab.core.io.transport.serial.SerialPortEventListener;
44 import org.openhab.core.io.transport.serial.SerialPortIdentifier;
45 import org.openhab.core.io.transport.serial.SerialPortManager;
46 import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
47 import org.openhab.core.util.HexUtils;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
53 * @author Daniel Weber - Initial contribution
56 public abstract class EnOceanTransceiver implements SerialPortEventListener {
58 public static final int ENOCEAN_MAX_DATA = 65790;
61 protected @Nullable Future<?> readingTask = null;
62 private @Nullable Future<?> timeOut = null;
64 protected Logger logger = LoggerFactory.getLogger(EnOceanTransceiver.class);
66 private @Nullable SerialPortManager serialPortManager;
67 private static final int ENOCEAN_DEFAULT_BAUD = 57600;
68 protected String path;
69 private @Nullable SerialPort serialPort;
73 BasePacket requestPacket;
75 Response responsePacket;
77 ResponseListener<? extends @Nullable Response> responseListener;
80 private class RequestQueue {
81 private Queue<Request> queue = new LinkedBlockingQueue<>();
82 private ScheduledExecutorService scheduler;
84 public RequestQueue(ScheduledExecutorService scheduler) {
85 this.scheduler = scheduler;
88 public synchronized void enqueRequest(Request request) throws IOException {
89 boolean wasEmpty = queue.isEmpty();
91 if (queue.offer(request)) {
96 logger.error("Transmit queue overflow. Lost message: {}", request);
100 private synchronized void sendNext() throws IOException {
105 private synchronized void send() throws IOException {
106 if (!queue.isEmpty()) {
107 currentRequest = queue.peek();
109 Request localCurrentRequest = currentRequest;
110 if (localCurrentRequest != null && localCurrentRequest.requestPacket != null) {
111 synchronized (localCurrentRequest) {
112 BasePacket rqPacket = localCurrentRequest.requestPacket;
113 if (currentRequest != null && rqPacket != null) {
114 logger.debug("Sending data, type {}, payload {}{}", rqPacket.getPacketType().name(),
115 HexUtils.bytesToHex(rqPacket.getPayload()),
116 HexUtils.bytesToHex(rqPacket.getOptionalPayload()));
117 byte[] b = serializePacket(rqPacket);
118 logger.trace("Sending raw data: {}", HexUtils.bytesToHex(b));
119 OutputStream localOutPutStream = outputStream;
120 if (localOutPutStream != null) {
121 localOutPutStream.write(b);
122 localOutPutStream.flush();
124 Future<?> localTimeOut = timeOut;
125 if (localTimeOut != null) {
126 localTimeOut.cancel(true);
129 // slowdown sending of message to avoid hickups at receivers
130 // Todo tweak sending intervall (250 ist just a first try)
131 timeOut = scheduler.schedule(() -> {
134 } catch (IOException e) {
135 logger.trace("Unable to process message", e);
136 TransceiverErrorListener localListener = errorListener;
137 if (localListener != null) {
138 localListener.errorOccured(e);
142 }, 250, TimeUnit.MILLISECONDS);
148 } catch (EnOceanException e) {
149 logger.error("exception while sending data", e);
155 RequestQueue requestQueue;
157 Request currentRequest = null;
159 protected Map<Long, HashSet<PacketListener>> listeners;
160 protected HashSet<EventListener> eventListeners;
161 protected @Nullable TeachInListener teachInListener;
163 protected @Nullable InputStream inputStream;
164 protected @Nullable OutputStream outputStream;
166 private byte[] filteredDeviceId = new byte[0];
168 TransceiverErrorListener errorListener;
170 public EnOceanTransceiver(String path, TransceiverErrorListener errorListener, ScheduledExecutorService scheduler,
171 @Nullable SerialPortManager serialPortManager) {
172 requestQueue = new RequestQueue(scheduler);
174 listeners = new HashMap<>();
175 eventListeners = new HashSet<>();
176 teachInListener = null;
178 this.errorListener = errorListener;
179 this.serialPortManager = serialPortManager;
183 public void initialize()
184 throws UnsupportedCommOperationException, PortInUseException, IOException, TooManyListenersException {
185 SerialPortManager localSerialPortManager = serialPortManager;
186 if (localSerialPortManager == null) {
187 throw new IOException("Could access the SerialPortManager, it was null");
189 SerialPortIdentifier id = localSerialPortManager.getIdentifier(path);
191 throw new IOException("Could not find a gateway on given path '" + path + "', "
192 + localSerialPortManager.getIdentifiers().count() + " ports available.");
196 serialPort = id.open(EnOceanBindingConstants.BINDING_ID, 1000);
197 } catch (PortInUseException e) {
198 logger.warn("EnOceanSerialTransceiver not initialized, port allready in use", e);
201 SerialPort localSerialPort = serialPort;
202 if (localSerialPort == null) {
203 logger.debug("EnOceanSerialTransceiver not initialized, serialPort was null");
206 localSerialPort.setSerialPortParams(ENOCEAN_DEFAULT_BAUD, SerialPort.DATABITS_8, SerialPort.STOPBITS_1,
207 SerialPort.PARITY_NONE);
210 localSerialPort.enableReceiveThreshold(1);
211 localSerialPort.enableReceiveTimeout(100); // In ms. Small values mean faster shutdown but more cpu usage.
212 } catch (UnsupportedCommOperationException e) {
213 // rfc connections do not allow a ReceiveThreshold
214 logger.debug("EnOceanSerialTransceiver encountered an UnsupportedCommOperationException while initilizing",
218 inputStream = localSerialPort.getInputStream();
219 outputStream = localSerialPort.getOutputStream();
220 logger.info("EnOceanSerialTransceiver initialized");
223 public void startReceiving(ScheduledExecutorService scheduler) {
225 Future<?> localReadingTask = readingTask;
226 if (localReadingTask == null || localReadingTask.isCancelled()) {
227 readingTask = scheduler.submit(new Runnable() {
234 logger.info("EnOceanSerialTransceiver RX thread started");
237 public void shutDown() {
238 logger.debug("shutting down transceiver");
239 logger.debug("Interrupt rx Thread");
241 Future<?> localTimeOut = timeOut;
242 if (localTimeOut != null) {
243 localTimeOut.cancel(true);
246 Future<?> localReadingTask = readingTask;
247 if (localReadingTask != null) {
248 localReadingTask.cancel(true);
250 InputStream localInputStream = inputStream;
251 if (localInputStream != null) {
253 localInputStream.close();
254 } catch (IOException e) {
255 logger.debug("IOException occured while closing the stream", e);
263 eventListeners.clear();
264 teachInListener = null;
265 errorListener = null;
267 OutputStream localOutputStream = outputStream;
268 if (localOutputStream != null) {
270 localOutputStream.close();
271 } catch (IOException e) {
272 logger.debug("IOException occured while closing the output stream", e);
276 InputStream localInputStream = inputStream;
277 if (localInputStream != null) {
279 localInputStream.close();
280 } catch (IOException e) {
281 logger.debug("IOException occured while closing the input stream", e);
285 SerialPort localSerialPort = serialPort;
286 if (localSerialPort != null) {
287 logger.debug("Closing the serial port");
288 localSerialPort.close();
295 logger.info("Transceiver shutdown");
298 private void receivePackets() {
299 byte[] buffer = new byte[1];
301 Future<?> localReadingTask = readingTask;
302 while (localReadingTask != null && !localReadingTask.isCancelled()) {
303 int bytesRead = read(buffer, 1);
305 processMessage(buffer[0]);
310 protected abstract void processMessage(byte firstByte);
312 protected int read(byte[] buffer, int length) {
313 InputStream localInputStream = inputStream;
314 if (localInputStream != null) {
316 return localInputStream.read(buffer, 0, length);
317 } catch (IOException e) {
318 logger.debug("IOException occured while reading the input stream", e);
322 logger.warn("Cannot read from null stream");
323 Future<?> localReadingTask = readingTask;
324 if (localReadingTask != null) {
325 localReadingTask.cancel(true);
328 TransceiverErrorListener localListener = errorListener;
329 if (localListener != null) {
330 localListener.errorOccured(new IOException("Cannot read from null stream"));
336 protected void informListeners(BasePacket packet) {
338 if (packet.getPacketType() == ESPPacketType.RADIO_ERP1) {
339 ERP1Message msg = (ERP1Message) packet;
340 byte[] senderId = msg.getSenderId();
341 byte[] d = Helper.concatAll(msg.getPayload(), msg.getOptionalPayload());
343 logger.debug("{} with RORG {} for {} payload {} received", packet.getPacketType().name(),
344 msg.getRORG().name(), HexUtils.bytesToHex(msg.getSenderId()), HexUtils.bytesToHex(d));
346 if (msg.getRORG() != RORG.Unknown) {
347 if (senderId.length > 0) {
348 if (senderId.length > 2 && filteredDeviceId.length > 2 && senderId[0] == filteredDeviceId[0]
349 && senderId[1] == filteredDeviceId[1] && senderId[2] == filteredDeviceId[2]) {
350 // filter away own messages which are received through a repeater
354 if (teachInListener != null && (msg.getIsTeachIn() || msg.getRORG() == RORG.RPS)) {
355 logger.info("Received teach in message from {}", HexUtils.bytesToHex(msg.getSenderId()));
357 TeachInListener localListener = teachInListener;
358 if (localListener != null) {
359 localListener.packetReceived(msg);
362 } else if (teachInListener == null && msg.getIsTeachIn()) {
363 logger.info("Discard message because this is a teach-in telegram from {}!",
364 HexUtils.bytesToHex(msg.getSenderId()));
368 long s = Long.parseLong(HexUtils.bytesToHex(senderId), 16);
369 synchronized (this) {
370 HashSet<PacketListener> pl = listeners.get(s);
372 pl.forEach(l -> l.packetReceived(msg));
377 logger.debug("Received unknown RORG");
379 } else if (packet.getPacketType() == ESPPacketType.EVENT) {
380 EventMessage event = (EventMessage) packet;
382 byte[] d = Helper.concatAll(packet.getPayload(), packet.getOptionalPayload());
383 logger.debug("{} with type {} payload {} received", ESPPacketType.EVENT.name(),
384 event.getEventMessageType().name(), HexUtils.bytesToHex(d));
386 if (event.getEventMessageType() == EventMessageType.SA_CONFIRM_LEARN) {
387 byte[] senderId = event.getPayload(EventMessageType.SA_CONFIRM_LEARN.getDataLength() - 5, 4);
389 if (teachInListener != null) {
390 logger.info("Received smart teach in from {}", HexUtils.bytesToHex(senderId));
391 TeachInListener localListener = teachInListener;
392 if (localListener != null) {
393 localListener.eventReceived(event);
397 logger.info("Discard message because this is a smart teach-in telegram from {}!",
398 HexUtils.bytesToHex(senderId));
403 synchronized (this) {
404 eventListeners.forEach(l -> l.eventReceived(event));
407 } catch (Exception e) {
408 logger.error("Exception in informListeners", e);
412 protected void handleResponse(Response response) throws IOException {
413 Request localCurrentRequest = currentRequest;
414 if (localCurrentRequest != null) {
415 ResponseListener<? extends @Nullable Response> listener = localCurrentRequest.responseListener;
416 if (listener != null) {
417 localCurrentRequest.responsePacket = response;
419 listener.handleResponse(response);
420 } catch (Exception e) {
421 logger.debug("Exception during response handling");
423 logger.trace("Response handled");
426 logger.trace("Response without listener");
429 logger.trace("Response without request");
433 public void sendBasePacket(@Nullable BasePacket packet,
434 @Nullable ResponseListener<? extends @Nullable Response> responseCallback) throws IOException {
435 if (packet == null) {
439 logger.debug("Enqueue new send request with ESP3 type {} {} callback", packet.getPacketType().name(),
440 responseCallback == null ? "without" : "with");
441 Request r = new Request();
442 r.requestPacket = packet;
443 r.responseListener = responseCallback;
445 requestQueue.enqueRequest(r);
448 protected abstract byte[] serializePacket(BasePacket packet) throws EnOceanException;
450 public synchronized void addPacketListener(PacketListener listener, long senderIdToListenTo) {
451 if (listeners.computeIfAbsent(senderIdToListenTo, k -> new HashSet<>()).add(listener)) {
452 logger.debug("Listener added: {}", senderIdToListenTo);
456 public synchronized void removePacketListener(PacketListener listener, long senderIdToListenTo) {
457 HashSet<PacketListener> pl = listeners.get(senderIdToListenTo);
461 listeners.remove(senderIdToListenTo);
466 public synchronized void addEventMessageListener(EventListener listener) {
467 eventListeners.add(listener);
470 public synchronized void removeEventMessageListener(EventListener listener) {
471 eventListeners.remove(listener);
474 public void startDiscovery(TeachInListener teachInListener) {
475 this.teachInListener = teachInListener;
478 public void stopDiscovery() {
479 this.teachInListener = null;
482 public void setFilteredDeviceId(byte[] filteredDeviceId) {
483 System.arraycopy(filteredDeviceId, 0, filteredDeviceId, 0, filteredDeviceId.length);
487 public void serialEvent(SerialPortEvent event) {
488 if (event.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
489 synchronized (this) {