]> git.basschouten.com Git - openhab-addons.git/blob
d31def2073b47bde3e5015e3b0fc414b1e44a465
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2022 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.digiplex.internal.handler;
14
15 import static org.openhab.binding.digiplex.internal.DigiplexBindingConstants.*;
16
17 import java.io.IOException;
18 import java.io.InputStream;
19 import java.io.OutputStream;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.Optional;
23 import java.util.Set;
24 import java.util.TooManyListenersException;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.ScheduledFuture;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicLong;
31
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.binding.digiplex.internal.DigiplexBridgeConfiguration;
35 import org.openhab.binding.digiplex.internal.communication.CommunicationStatus;
36 import org.openhab.binding.digiplex.internal.communication.DigiplexMessageHandler;
37 import org.openhab.binding.digiplex.internal.communication.DigiplexRequest;
38 import org.openhab.binding.digiplex.internal.communication.DigiplexResponse;
39 import org.openhab.binding.digiplex.internal.communication.DigiplexResponseResolver;
40 import org.openhab.binding.digiplex.internal.communication.events.AbstractEvent;
41 import org.openhab.binding.digiplex.internal.communication.events.TroubleEvent;
42 import org.openhab.binding.digiplex.internal.communication.events.TroubleStatus;
43 import org.openhab.binding.digiplex.internal.discovery.DigiplexDiscoveryService;
44 import org.openhab.core.io.transport.serial.PortInUseException;
45 import org.openhab.core.io.transport.serial.SerialPort;
46 import org.openhab.core.io.transport.serial.SerialPortEvent;
47 import org.openhab.core.io.transport.serial.SerialPortEventListener;
48 import org.openhab.core.io.transport.serial.SerialPortIdentifier;
49 import org.openhab.core.io.transport.serial.SerialPortManager;
50 import org.openhab.core.io.transport.serial.UnsupportedCommOperationException;
51 import org.openhab.core.library.types.DecimalType;
52 import org.openhab.core.library.types.OnOffType;
53 import org.openhab.core.thing.Bridge;
54 import org.openhab.core.thing.ChannelUID;
55 import org.openhab.core.thing.ThingStatus;
56 import org.openhab.core.thing.ThingStatusDetail;
57 import org.openhab.core.thing.binding.BaseBridgeHandler;
58 import org.openhab.core.thing.binding.ThingHandlerService;
59 import org.openhab.core.types.Command;
60 import org.openhab.core.types.RefreshType;
61 import org.openhab.core.types.State;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 /**
66  * The {@link DigiplexBridgeHandler} is responsible for handling communication with PRT3 module
67  *
68  * @author Robert Michalak - Initial contribution
69  */
70 @NonNullByDefault
71 public class DigiplexBridgeHandler extends BaseBridgeHandler implements SerialPortEventListener {
72
73     private static final int REINITIALIZE_DELAY = 1; // in minutes
74     private static final int STALLED_MESSAGES_THRESHOLD = 5;
75     private static final int END_OF_MESSAGE = '\r';
76     private static final int END_OF_STREAM = -1;
77
78     private final Logger logger = LoggerFactory.getLogger(DigiplexBridgeHandler.class);
79
80     private @Nullable DigiplexBridgeConfiguration config;
81     private @Nullable SerialPort serialPort;
82     private @Nullable DigiplexReceiverThread receiverThread;
83     private @Nullable DigiplexSenderThread senderThread;
84     private final BlockingQueue<DigiplexRequest> sendQueue = new LinkedBlockingQueue<>();
85     private final SerialPortManager serialPortManager;
86     private final Set<DigiplexMessageHandler> handlers = ConcurrentHashMap.newKeySet();
87
88     @Nullable
89     private ScheduledFuture<?> reinitializeTask;
90
91     private AtomicLong messagesSent = new AtomicLong(0);
92     private AtomicLong responsesReceived = new AtomicLong(0);
93     private AtomicLong eventsReceived = new AtomicLong(0);
94
95     public DigiplexBridgeHandler(Bridge bridge, SerialPortManager serialPortManager) {
96         super(bridge);
97         this.serialPortManager = serialPortManager;
98     }
99
100     @SuppressWarnings("null")
101     @Override
102     public void initialize() {
103         config = getConfigAs(DigiplexBridgeConfiguration.class);
104         if (config.port == null) {
105             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.CONFIGURATION_ERROR, "Port must be set!");
106             return;
107         }
108
109         SerialPortIdentifier portId = serialPortManager.getIdentifier(config.port);
110         if (portId == null) {
111             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.CONFIGURATION_ERROR,
112                     "No such port: " + config.port);
113             return;
114         }
115
116         try {
117             serialPort = initializeSerialPort(portId);
118
119             InputStream inputStream = serialPort.getInputStream();
120             OutputStream outputStream = serialPort.getOutputStream();
121
122             if (inputStream == null || outputStream == null) {
123                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
124                         "Input/Output stream null");
125                 return;
126             }
127
128             receiverThread = new DigiplexReceiverThread(inputStream);
129             senderThread = new DigiplexSenderThread(outputStream);
130
131             registerMessageHandler(new BridgeMessageHandler());
132
133             messagesSent.set(0);
134             responsesReceived.set(0);
135             eventsReceived.set(0);
136
137             receiverThread.start();
138             senderThread.start();
139
140             updateStatus(ThingStatus.ONLINE);
141         } catch (PortInUseException e) {
142             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
143                     "Port in use: " + config.port);
144         } catch (Exception e) {
145             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
146                     "Communication error: " + e.getMessage());
147         }
148     }
149
150     @SuppressWarnings("null")
151     private @Nullable SerialPort initializeSerialPort(SerialPortIdentifier portId)
152             throws PortInUseException, TooManyListenersException, UnsupportedCommOperationException {
153         SerialPort serialPort = portId.open(getThing().getUID().toString(), 2000);
154         serialPort.setSerialPortParams(config.baudrate, SerialPort.DATABITS_8, SerialPort.STOPBITS_1,
155                 SerialPort.PARITY_NONE);
156         serialPort.enableReceiveThreshold(0);
157         serialPort.enableReceiveTimeout(1000);
158
159         // RXTX serial port library causes high CPU load
160         // Start event listener, which will just sleep and slow down event loop
161         serialPort.addEventListener(this);
162         serialPort.notifyOnDataAvailable(true);
163
164         return serialPort;
165     }
166
167     @Override
168     public void handleCommand(ChannelUID channelUID, Command command) {
169         if (command == RefreshType.REFRESH && isLinked(channelUID.getId())) {
170             switch (channelUID.getId()) {
171                 case BRIDGE_MESSAGES_SENT:
172                     updateState(BRIDGE_MESSAGES_SENT, new DecimalType(messagesSent.get()));
173                     break;
174                 case BRIDGE_RESPONSES_RECEIVED:
175                     updateState(BRIDGE_RESPONSES_RECEIVED, new DecimalType(responsesReceived.get()));
176                     break;
177                 case BRIDGE_EVENTS_RECEIVED:
178                     updateState(BRIDGE_EVENTS_RECEIVED, new DecimalType(eventsReceived.get()));
179                     break;
180             }
181         }
182     }
183
184     public void sendRequest(DigiplexRequest request) {
185         sendQueue.add(request);
186     }
187
188     public void handleResponse(String message) {
189         DigiplexResponse response = DigiplexResponseResolver.resolveResponse(message);
190         handlers.forEach(visitor -> response.accept(visitor));
191         if (response instanceof AbstractEvent) {
192             updateState(BRIDGE_EVENTS_RECEIVED, new DecimalType(eventsReceived.incrementAndGet()));
193         } else {
194             updateState(BRIDGE_RESPONSES_RECEIVED, new DecimalType(responsesReceived.incrementAndGet()));
195         }
196     }
197
198     public void registerMessageHandler(DigiplexMessageHandler handler) {
199         handlers.add(handler);
200     }
201
202     public void unregisterMessageHandler(DigiplexMessageHandler handler) {
203         handlers.remove(handler);
204     }
205
206     /**
207      * Closes the connection to the PRT3 module.
208      */
209     @SuppressWarnings("null")
210     @Override
211     public void dispose() {
212         stopThread(senderThread);
213         stopThread(receiverThread);
214         senderThread = null;
215         receiverThread = null;
216         if (serialPort != null) {
217             try {
218                 InputStream inputStream = serialPort.getInputStream();
219                 if (inputStream != null) {
220                     inputStream.close();
221                 }
222             } catch (IOException e) {
223                 logger.debug("Error closing input stream", e);
224             }
225
226             try {
227                 OutputStream outputStream = serialPort.getOutputStream();
228                 if (outputStream != null) {
229                     outputStream.close();
230                 }
231             } catch (IOException e) {
232                 logger.debug("Error closing output stream", e);
233             }
234
235             serialPort.close();
236             serialPort = null;
237         }
238         logger.info("Stopped Digiplex serial handler");
239
240         super.dispose();
241     }
242
243     private void stopThread(@Nullable Thread thread) {
244         if (thread != null) {
245             thread.interrupt();
246             try {
247                 thread.join(1000);
248             } catch (InterruptedException e) {
249             }
250         }
251     }
252
253     public void handleCommunicationError() {
254         updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
255         if (reinitializeTask == null) {
256             reinitializeTask = scheduler.schedule(() -> {
257                 logger.info("Reconnecting to PRT3 device...");
258                 thingUpdated(getThing());
259                 reinitializeTask = null;
260             }, REINITIALIZE_DELAY, TimeUnit.MINUTES);
261         }
262     }
263
264     @Override
265     public void serialEvent(@Nullable SerialPortEvent arg0) {
266         try {
267             logger.trace("RXTX library CPU load workaround, sleep forever");
268             Thread.sleep(Long.MAX_VALUE);
269         } catch (InterruptedException ignored) {
270         }
271     }
272
273     @Override
274     public Collection<Class<? extends ThingHandlerService>> getServices() {
275         return Collections.singletonList(DigiplexDiscoveryService.class);
276     }
277
278     private class BridgeMessageHandler implements DigiplexMessageHandler {
279
280         @Override
281         public void handleCommunicationStatus(CommunicationStatus response) {
282             if (response.success) {
283                 updateStatus(ThingStatus.ONLINE);
284             } else {
285                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
286             }
287         }
288
289         @Override
290         public void handleTroubleEvent(TroubleEvent troubleEvent) {
291             if (troubleEvent.getAreaNo() == GLOBAL_AREA_NO) {
292                 String channel = troubleEvent.getType().getBridgeChannel();
293                 State state = OnOffType.from(troubleEvent.getStatus() == TroubleStatus.TROUBLE_STARTED);
294                 updateState(channel, state);
295             }
296         }
297     }
298
299     private class DigiplexReceiverThread extends Thread {
300
301         private final Logger logger = LoggerFactory.getLogger(DigiplexReceiverThread.class);
302
303         private final InputStream stream;
304
305         DigiplexReceiverThread(InputStream stream) {
306             super("DigiplexReceiveThread");
307             this.stream = stream;
308         }
309
310         @Override
311         public void run() {
312             logger.debug("Receiver thread started");
313             while (!interrupted()) {
314                 try {
315                     Optional<String> message = readLineBlocking();
316                     message.ifPresent(m -> {
317                         logger.debug("message received: '{}'", m);
318                         handleResponse(m);
319                     });
320                     if (messagesSent.get() - responsesReceived.get() > STALLED_MESSAGES_THRESHOLD) {
321                         throw new IOException("PRT3 module is not responding!");
322                     }
323
324                 } catch (IOException e) {
325                     handleCommunicationError();
326                     break;
327                 }
328             }
329             logger.debug("Receiver thread finished");
330         }
331
332         private Optional<String> readLineBlocking() throws IOException {
333             StringBuilder s = new StringBuilder();
334             while (true) {
335                 int c = stream.read();
336                 if (c == END_OF_STREAM) {
337                     return Optional.empty();
338                 }
339                 if (c == END_OF_MESSAGE) {
340                     break;
341                 }
342                 s.append((char) c);
343             }
344             return Optional.of(s.toString());
345         }
346     }
347
348     private class DigiplexSenderThread extends Thread {
349
350         private static final int SLEEP_TIME = 150;
351
352         private final Logger logger = LoggerFactory.getLogger(DigiplexSenderThread.class);
353
354         private OutputStream stream;
355
356         public DigiplexSenderThread(OutputStream stream) {
357             super("DigiplexSenderThread");
358             this.stream = stream;
359         }
360
361         @Override
362         public void run() {
363             logger.debug("Sender thread started");
364             while (!interrupted()) {
365                 try {
366                     DigiplexRequest request = sendQueue.take();
367                     stream.write(request.getSerialMessage().getBytes());
368                     stream.flush();
369                     updateState(BRIDGE_MESSAGES_SENT, new DecimalType(messagesSent.incrementAndGet()));
370                     logger.debug("message sent: '{}'", request.getSerialMessage().replaceAll("\r", ""));
371                     Thread.sleep(SLEEP_TIME); // do not flood PRT3 with messages as it creates unpredictable responses
372                 } catch (IOException e) {
373                     handleCommunicationError();
374                     break;
375                 } catch (InterruptedException e) {
376                     break;
377                 }
378             }
379             logger.debug("Sender thread finished");
380         }
381     }
382 }