2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.digiplex.internal.handler;
15 import static org.openhab.binding.digiplex.internal.DigiplexBindingConstants.*;
17 import java.io.IOException;
18 import java.io.InputStream;
19 import java.io.OutputStream;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Optional;
24 import java.util.TooManyListenersException;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.ScheduledFuture;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicLong;
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.binding.digiplex.internal.DigiplexBridgeConfiguration;
35 import org.openhab.binding.digiplex.internal.communication.CommunicationStatus;
36 import org.openhab.binding.digiplex.internal.communication.DigiplexMessageHandler;
37 import org.openhab.binding.digiplex.internal.communication.DigiplexRequest;
38 import org.openhab.binding.digiplex.internal.communication.DigiplexResponse;
39 import org.openhab.binding.digiplex.internal.communication.DigiplexResponseResolver;
40 import org.openhab.binding.digiplex.internal.communication.events.AbstractEvent;
41 import org.openhab.binding.digiplex.internal.communication.events.TroubleEvent;
42 import org.openhab.binding.digiplex.internal.communication.events.TroubleStatus;
43 import org.openhab.binding.digiplex.internal.discovery.DigiplexDiscoveryService;
44 import org.openhab.core.io.transport.serial.PortInUseException;
45 import org.openhab.core.io.transport.serial.SerialPort;
46 import org.openhab.core.io.transport.serial.SerialPortEvent;
47 import org.openhab.core.io.transport.serial.SerialPortEventListener;
48 import org.openhab.core.io.transport.serial.SerialPortIdentifier;
49 import org.openhab.core.io.transport.serial.SerialPortManager;
50 import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
51 import org.openhab.core.library.types.DecimalType;
52 import org.openhab.core.library.types.OnOffType;
53 import org.openhab.core.thing.Bridge;
54 import org.openhab.core.thing.ChannelUID;
55 import org.openhab.core.thing.ThingStatus;
56 import org.openhab.core.thing.ThingStatusDetail;
57 import org.openhab.core.thing.binding.BaseBridgeHandler;
58 import org.openhab.core.thing.binding.ThingHandlerService;
59 import org.openhab.core.types.Command;
60 import org.openhab.core.types.RefreshType;
61 import org.openhab.core.types.State;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
66 * The {@link DigiplexBridgeHandler} is responsible for handling communication with PRT3 module
68 * @author Robert Michalak - Initial contribution
71 public class DigiplexBridgeHandler extends BaseBridgeHandler implements SerialPortEventListener {
73 private static final int REINITIALIZE_DELAY = 1; // in minutes
74 private static final int STALLED_MESSAGES_THRESHOLD = 5;
75 private static final int END_OF_MESSAGE = '\r';
76 private static final int END_OF_STREAM = -1;
78 private final Logger logger = LoggerFactory.getLogger(DigiplexBridgeHandler.class);
80 private @Nullable DigiplexBridgeConfiguration config;
81 private @Nullable SerialPort serialPort;
82 private @Nullable DigiplexReceiverThread receiverThread;
83 private @Nullable DigiplexSenderThread senderThread;
84 private final BlockingQueue<DigiplexRequest> sendQueue = new LinkedBlockingQueue<>();
85 private final SerialPortManager serialPortManager;
86 private final Set<DigiplexMessageHandler> handlers = ConcurrentHashMap.newKeySet();
89 private ScheduledFuture<?> reinitializeTask;
91 private AtomicLong messagesSent = new AtomicLong(0);
92 private AtomicLong responsesReceived = new AtomicLong(0);
93 private AtomicLong eventsReceived = new AtomicLong(0);
95 public DigiplexBridgeHandler(Bridge bridge, SerialPortManager serialPortManager) {
97 this.serialPortManager = serialPortManager;
100 @SuppressWarnings("null")
102 public void initialize() {
103 config = getConfigAs(DigiplexBridgeConfiguration.class);
104 if (config.port == null) {
105 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.CONFIGURATION_ERROR, "Port must be set!");
109 SerialPortIdentifier portId = serialPortManager.getIdentifier(config.port);
110 if (portId == null) {
111 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.CONFIGURATION_ERROR,
112 "No such port: " + config.port);
117 serialPort = initializeSerialPort(portId);
119 InputStream inputStream = serialPort.getInputStream();
120 OutputStream outputStream = serialPort.getOutputStream();
122 if (inputStream == null || outputStream == null) {
123 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
124 "Input/Output stream null");
128 receiverThread = new DigiplexReceiverThread(inputStream);
129 senderThread = new DigiplexSenderThread(outputStream);
131 registerMessageHandler(new BridgeMessageHandler());
134 responsesReceived.set(0);
135 eventsReceived.set(0);
137 receiverThread.start();
138 senderThread.start();
140 updateStatus(ThingStatus.ONLINE);
141 } catch (PortInUseException e) {
142 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
143 "Port in use: " + config.port);
144 } catch (Exception e) {
145 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
146 "Communication error: " + e.getMessage());
150 @SuppressWarnings("null")
151 private @Nullable SerialPort initializeSerialPort(SerialPortIdentifier portId)
152 throws PortInUseException, TooManyListenersException, UnsupportedCommOperationException {
153 SerialPort serialPort = portId.open(getThing().getUID().toString(), 2000);
154 serialPort.setSerialPortParams(config.baudrate, SerialPort.DATABITS_8, SerialPort.STOPBITS_1,
155 SerialPort.PARITY_NONE);
156 serialPort.enableReceiveThreshold(0);
157 serialPort.enableReceiveTimeout(1000);
159 // RXTX serial port library causes high CPU load
160 // Start event listener, which will just sleep and slow down event loop
161 serialPort.addEventListener(this);
162 serialPort.notifyOnDataAvailable(true);
168 public void handleCommand(ChannelUID channelUID, Command command) {
169 if (command == RefreshType.REFRESH && isLinked(channelUID.getId())) {
170 switch (channelUID.getId()) {
171 case BRIDGE_MESSAGES_SENT:
172 updateState(BRIDGE_MESSAGES_SENT, new DecimalType(messagesSent.get()));
174 case BRIDGE_RESPONSES_RECEIVED:
175 updateState(BRIDGE_RESPONSES_RECEIVED, new DecimalType(responsesReceived.get()));
177 case BRIDGE_EVENTS_RECEIVED:
178 updateState(BRIDGE_EVENTS_RECEIVED, new DecimalType(eventsReceived.get()));
184 public void sendRequest(DigiplexRequest request) {
185 sendQueue.add(request);
188 public void handleResponse(String message) {
189 DigiplexResponse response = DigiplexResponseResolver.resolveResponse(message);
190 handlers.forEach(visitor -> response.accept(visitor));
191 if (response instanceof AbstractEvent) {
192 updateState(BRIDGE_EVENTS_RECEIVED, new DecimalType(eventsReceived.incrementAndGet()));
194 updateState(BRIDGE_RESPONSES_RECEIVED, new DecimalType(responsesReceived.incrementAndGet()));
198 public void registerMessageHandler(DigiplexMessageHandler handler) {
199 handlers.add(handler);
202 public void unregisterMessageHandler(DigiplexMessageHandler handler) {
203 handlers.remove(handler);
207 * Closes the connection to the PRT3 module.
209 @SuppressWarnings("null")
211 public void dispose() {
212 stopThread(senderThread);
213 stopThread(receiverThread);
215 receiverThread = null;
216 if (serialPort != null) {
218 InputStream inputStream = serialPort.getInputStream();
219 if (inputStream != null) {
222 } catch (IOException e) {
223 logger.debug("Error closing input stream", e);
227 OutputStream outputStream = serialPort.getOutputStream();
228 if (outputStream != null) {
229 outputStream.close();
231 } catch (IOException e) {
232 logger.debug("Error closing output stream", e);
238 logger.info("Stopped Digiplex serial handler");
243 private void stopThread(@Nullable Thread thread) {
244 if (thread != null) {
248 } catch (InterruptedException e) {
253 public void handleCommunicationError() {
254 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
255 if (reinitializeTask == null) {
256 reinitializeTask = scheduler.schedule(() -> {
257 logger.info("Reconnecting to PRT3 device...");
258 thingUpdated(getThing());
259 reinitializeTask = null;
260 }, REINITIALIZE_DELAY, TimeUnit.MINUTES);
265 public void serialEvent(@Nullable SerialPortEvent arg0) {
267 logger.trace("RXTX library CPU load workaround, sleep forever");
268 Thread.sleep(Long.MAX_VALUE);
269 } catch (InterruptedException ignored) {
274 public Collection<Class<? extends ThingHandlerService>> getServices() {
275 return List.of(DigiplexDiscoveryService.class);
278 private class BridgeMessageHandler implements DigiplexMessageHandler {
281 public void handleCommunicationStatus(CommunicationStatus response) {
282 if (response.success) {
283 updateStatus(ThingStatus.ONLINE);
285 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
290 public void handleTroubleEvent(TroubleEvent troubleEvent) {
291 if (troubleEvent.getAreaNo() == GLOBAL_AREA_NO) {
292 String channel = troubleEvent.getType().getBridgeChannel();
293 State state = OnOffType.from(troubleEvent.getStatus() == TroubleStatus.TROUBLE_STARTED);
294 updateState(channel, state);
299 private class DigiplexReceiverThread extends Thread {
301 private final Logger logger = LoggerFactory.getLogger(DigiplexReceiverThread.class);
303 private final InputStream stream;
305 DigiplexReceiverThread(InputStream stream) {
306 super("DigiplexReceiveThread");
307 this.stream = stream;
312 logger.debug("Receiver thread started");
313 while (!interrupted()) {
315 Optional<String> message = readLineBlocking();
316 message.ifPresent(m -> {
317 logger.debug("message received: '{}'", m);
320 if (messagesSent.get() - responsesReceived.get() > STALLED_MESSAGES_THRESHOLD) {
321 throw new IOException("PRT3 module is not responding!");
324 } catch (IOException e) {
325 handleCommunicationError();
329 logger.debug("Receiver thread finished");
332 private Optional<String> readLineBlocking() throws IOException {
333 StringBuilder s = new StringBuilder();
335 int c = stream.read();
336 if (c == END_OF_STREAM) {
337 return Optional.empty();
339 if (c == END_OF_MESSAGE) {
344 return Optional.of(s.toString());
348 private class DigiplexSenderThread extends Thread {
350 private static final int SLEEP_TIME = 150;
352 private final Logger logger = LoggerFactory.getLogger(DigiplexSenderThread.class);
354 private OutputStream stream;
356 public DigiplexSenderThread(OutputStream stream) {
357 super("DigiplexSenderThread");
358 this.stream = stream;
363 logger.debug("Sender thread started");
364 while (!interrupted()) {
366 DigiplexRequest request = sendQueue.take();
367 stream.write(request.getSerialMessage().getBytes());
369 updateState(BRIDGE_MESSAGES_SENT, new DecimalType(messagesSent.incrementAndGet()));
370 logger.debug("message sent: '{}'", request.getSerialMessage().replace("\r", ""));
371 Thread.sleep(SLEEP_TIME); // do not flood PRT3 with messages as it creates unpredictable responses
372 } catch (IOException e) {
373 handleCommunicationError();
375 } catch (InterruptedException e) {
379 logger.debug("Sender thread finished");