2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.keba.internal.handler;
15 import java.io.IOException;
16 import java.io.UnsupportedEncodingException;
17 import java.net.InetSocketAddress;
18 import java.net.PortUnreachableException;
19 import java.nio.ByteBuffer;
20 import java.nio.channels.CancelledKeyException;
21 import java.nio.channels.ClosedChannelException;
22 import java.nio.channels.ClosedSelectorException;
23 import java.nio.channels.DatagramChannel;
24 import java.nio.channels.NotYetConnectedException;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.Iterator;
33 import java.util.concurrent.locks.ReentrantLock;
35 import org.openhab.core.thing.ThingStatus;
36 import org.openhab.core.thing.ThingStatusDetail;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
41 * The {@link KeContactTransceiver} is responsible for receiving UDP broadcast messages sent by the KEBA Charging
42 * Stations. {@link KeContactHandler} willing to receive these messages have to register themselves with the
43 * {@link KeContactTransceiver}
45 * @author Karel Goderis - Initial contribution
48 public class KeContactTransceiver {
50 public static final int LISTENER_PORT_NUMBER = 7090;
51 public static final int LISTENING_INTERVAL = 100;
52 public static final int BUFFER_SIZE = 1024;
54 private DatagramChannel broadcastChannel;
55 private SelectionKey broadcastKey;
56 private Selector selector;
57 private Thread transceiverThread;
58 private boolean isStarted = false;
59 private Set<KeContactHandler> handlers = Collections.synchronizedSet(new HashSet<>());
60 private Map<KeContactHandler, DatagramChannel> datagramChannels = Collections.synchronizedMap(new HashMap<>());
61 private Map<KeContactHandler, ByteBuffer> buffers = Collections.synchronizedMap(new HashMap<>());
62 private Map<KeContactHandler, ReentrantLock> locks = Collections.synchronizedMap(new HashMap<>());
63 private Map<KeContactHandler, Boolean> flags = Collections.synchronizedMap(new HashMap<>());
65 private final Logger logger = LoggerFactory.getLogger(KeContactTransceiver.class);
69 logger.debug("Starting the the KEBA KeContact transceiver");
71 selector = Selector.open();
73 if (transceiverThread == null) {
74 transceiverThread = new Thread(transceiverRunnable, "OH-binding-Keba-Transceiver");
75 transceiverThread.start();
78 broadcastChannel = DatagramChannel.open();
79 broadcastChannel.socket().bind(new InetSocketAddress(LISTENER_PORT_NUMBER));
80 broadcastChannel.configureBlocking(false);
82 logger.info("Listening for incoming data on {}", broadcastChannel.getLocalAddress());
84 synchronized (selector) {
86 broadcastKey = broadcastChannel.register(selector, broadcastChannel.validOps());
89 for (KeContactHandler listener : handlers) {
90 establishConnection(listener);
94 } catch (ClosedSelectorException | CancelledKeyException | IOException e) {
95 logger.error("An exception occurred while registering the selector: {}", e.getMessage());
102 for (KeContactHandler listener : handlers) {
103 this.removeConnection(listener);
107 broadcastChannel.close();
108 } catch (IOException e) {
109 logger.error("An exception occurred while closing the broadcast channel on port number {} : '{}'",
110 LISTENER_PORT_NUMBER, e.getMessage(), e);
115 } catch (IOException e) {
116 logger.error("An exception occurred while closing the selector: '{}'", e.getMessage(), e);
119 logger.debug("Stopping the the KEBA KeContact transceiver");
120 if (transceiverThread != null) {
121 transceiverThread.interrupt();
123 transceiverThread.join();
124 } catch (InterruptedException e) {
125 Thread.currentThread().interrupt();
127 transceiverThread = null;
137 private void reset() {
143 public void registerHandler(KeContactHandler handler) {
144 if (handler != null) {
145 handlers.add(handler);
146 locks.put(handler, new ReentrantLock());
148 if (logger.isTraceEnabled()) {
149 logger.trace("There are now {} KEBA KeContact handlers registered with the transceiver",
153 if (handlers.size() == 1) {
157 if (!isConnected(handler)) {
158 establishConnection(handler);
163 public void unRegisterHandler(KeContactHandler handler) {
164 if (handler != null) {
165 locks.remove(handler);
166 handlers.remove(handler);
168 if (logger.isTraceEnabled()) {
169 logger.trace("There are now {} KEBA KeContact handlers registered with the transceiver",
173 if (handlers.isEmpty()) {
179 protected ByteBuffer send(String message, KeContactHandler handler) {
180 ReentrantLock handlerLock = locks.get(handler);
182 if (handlerLock != null) {
185 ByteBuffer buffer = ByteBuffer.allocate(message.getBytes().length);
186 buffer.put(message.getBytes("ASCII"));
188 flags.put(handler, Boolean.TRUE);
189 buffers.put(handler, buffer);
191 synchronized (handlerLock) {
192 if (logger.isTraceEnabled()) {
193 logger.trace("{} waiting on handerLock {}", Thread.currentThread().getName(),
194 handlerLock.toString());
196 handlerLock.wait(KeContactHandler.REPORT_INTERVAL);
199 return buffers.remove(handler);
200 } catch (UnsupportedEncodingException | InterruptedException e) {
201 Thread.currentThread().interrupt();
202 handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getMessage());
204 handlerLock.unlock();
207 if (logger.isDebugEnabled()) {
208 logger.debug("The handler for '{}' is not yet registered with the KeContactTransceiver",
209 handler.getThing().getUID());
215 public Runnable transceiverRunnable = () -> {
218 synchronized (selector) {
220 selector.selectNow();
221 } catch (IOException e) {
222 logger.error("An exception occurred while selecting: {}", e.getMessage());
225 Iterator<SelectionKey> it = selector.selectedKeys().iterator();
226 while (it.hasNext()) {
227 SelectionKey selKey = it.next();
230 if (selKey.isValid() && selKey.isWritable()) {
231 DatagramChannel theChannel = (DatagramChannel) selKey.channel();
232 KeContactHandler theHandler = null;
233 boolean error = false;
235 for (KeContactHandler handler : handlers) {
236 if (theChannel.equals(datagramChannels.get(handler))) {
237 theHandler = handler;
242 if (theHandler != null) {
243 ReentrantLock theLock = locks.get(theHandler);
244 Boolean theFlag = flags.get(theHandler);
245 if (theLock != null && theLock.isLocked() && theFlag != null
246 && theFlag.equals(Boolean.TRUE)) {
247 ByteBuffer theBuffer = buffers.remove(theHandler);
248 flags.put(theHandler, Boolean.FALSE);
250 if (theBuffer != null) {
253 logger.debug("Sending '{}' on the channel '{}'->'{}'",
254 new Object[] { new String(theBuffer.array()),
255 theChannel.getLocalAddress(),
256 theChannel.getRemoteAddress() });
257 theChannel.write(theBuffer);
258 } catch (NotYetConnectedException e) {
259 theHandler.updateStatus(ThingStatus.OFFLINE,
260 ThingStatusDetail.COMMUNICATION_ERROR,
261 "The remote host is not yet connected");
263 } catch (ClosedChannelException e) {
264 theHandler.updateStatus(ThingStatus.OFFLINE,
265 ThingStatusDetail.COMMUNICATION_ERROR,
266 "The connection to the remote host is closed");
268 } catch (IOException e) {
269 theHandler.updateStatus(ThingStatus.OFFLINE,
270 ThingStatusDetail.COMMUNICATION_ERROR, "An IO exception occurred");
275 removeConnection(theHandler);
276 establishConnection(theHandler);
283 if (selKey.isValid() && selKey.isReadable()) {
284 int numberBytesRead = 0;
285 InetSocketAddress clientAddress = null;
286 ByteBuffer readBuffer = null;
287 boolean error = false;
289 if (selKey.equals(broadcastKey)) {
291 readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
292 clientAddress = (InetSocketAddress) broadcastChannel.receive(readBuffer);
293 logger.debug("Received {} from {} on the transceiver listener port ",
294 new String(readBuffer.array()), clientAddress);
295 numberBytesRead = readBuffer.position();
296 } catch (IOException e) {
298 "An exception occurred while receiving data on the transceiver listener port: '{}'",
303 if (numberBytesRead == -1) {
309 if (readBuffer.remaining() > 0) {
310 for (KeContactHandler handler : handlers) {
311 if (clientAddress != null && handler.getIPAddress()
312 .equals(clientAddress.getAddress().getHostAddress())) {
313 ReentrantLock theLock = locks.get(handler);
314 if (theLock != null && theLock.isLocked()) {
315 buffers.put(handler, readBuffer);
316 synchronized (theLock) {
317 if (logger.isTraceEnabled()) {
318 logger.trace("{} notifyall on handerLock {}",
319 Thread.currentThread().getName(),
325 handler.onData(readBuffer);
331 handlers.forEach(listener -> listener.updateStatus(ThingStatus.OFFLINE,
332 ThingStatusDetail.COMMUNICATION_ERROR, "The transceiver is offline"));
336 DatagramChannel theChannel = (DatagramChannel) selKey.channel();
337 KeContactHandler theHandler = null;
339 for (KeContactHandler handlers : handlers) {
340 if (datagramChannels.get(handlers).equals(theChannel)) {
341 theHandler = handlers;
346 if (theHandler != null) {
348 readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
349 numberBytesRead = theChannel.read(readBuffer);
350 logger.debug("Received {} from {} on the transceiver listener port ",
351 new String(readBuffer.array()), theChannel.getRemoteAddress());
352 } catch (NotYetConnectedException e) {
353 theHandler.updateStatus(ThingStatus.OFFLINE,
354 ThingStatusDetail.COMMUNICATION_ERROR,
355 "The remote host is not yet connected");
357 } catch (PortUnreachableException e) {
358 theHandler.updateStatus(ThingStatus.OFFLINE,
359 ThingStatusDetail.CONFIGURATION_ERROR,
360 "The remote host is probably not a KEBA KeContact");
362 } catch (IOException e) {
363 theHandler.updateStatus(ThingStatus.OFFLINE,
364 ThingStatusDetail.COMMUNICATION_ERROR, "An IO exception occurred");
368 if (numberBytesRead == -1) {
374 if (readBuffer.remaining() > 0) {
375 ReentrantLock theLock = locks.get(theHandler);
376 if (theLock != null && theLock.isLocked()) {
377 buffers.put(theHandler, readBuffer);
378 synchronized (theLock) {
384 removeConnection(theHandler);
385 establishConnection(theHandler);
393 if (!Thread.currentThread().isInterrupted()) {
394 Thread.sleep(LISTENING_INTERVAL);
398 } catch (InterruptedException | ClosedSelectorException e) {
399 Thread.currentThread().interrupt();
405 private void establishConnection(KeContactHandler handler) {
406 String ipAddress = handler.getIPAddress();
407 if (handler.getThing().getStatusInfo().getStatusDetail() != ThingStatusDetail.CONFIGURATION_ERROR
408 && !"".equals(ipAddress)) {
409 logger.debug("Establishing the connection to the KEBA KeContact '{}'", handler.getThing().getUID());
411 DatagramChannel datagramChannel = null;
413 datagramChannel = DatagramChannel.open();
414 } catch (Exception e2) {
415 handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
416 "An exception occurred while opening a datagram channel");
419 if (datagramChannel != null) {
420 datagramChannels.put(handler, datagramChannel);
423 datagramChannel.configureBlocking(false);
424 } catch (IOException e2) {
425 handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
426 "An exception occurred while configuring a datagram channel");
429 synchronized (selector) {
431 int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
433 datagramChannel.register(selector, interestSet);
434 } catch (ClosedChannelException e1) {
435 handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
436 "An exception occurred while registering a selector");
439 InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, LISTENER_PORT_NUMBER);
442 if (logger.isTraceEnabled()) {
443 logger.trace("Connecting the channel for {} ", remoteAddress);
445 datagramChannel.connect(remoteAddress);
447 handler.updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE, "");
448 } catch (Exception e) {
449 logger.debug("An exception occurred while connecting connecting to '{}:{}' : {}",
450 new Object[] { ipAddress, LISTENER_PORT_NUMBER, e.getMessage() });
451 handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
452 "An exception occurred while connecting");
457 handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
458 handler.getThing().getStatusInfo().getDescription());
462 private void removeConnection(KeContactHandler handler) {
463 logger.debug("Tearing down the connection to the KEBA KeContact '{}'", handler.getThing().getUID());
464 DatagramChannel datagramChannel = datagramChannels.remove(handler);
466 if (datagramChannel != null) {
467 synchronized (selector) {
469 datagramChannel.keyFor(selector).cancel();
470 datagramChannel.close();
471 handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE, "");
472 } catch (Exception e) {
473 logger.debug("An exception occurred while closing the datagramchannel for '{}': {}",
474 handler.getThing().getUID(), e.getMessage());
475 handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
476 "An exception occurred while closing the datagramchannel");
482 private boolean isConnected(KeContactHandler handler) {
483 return datagramChannels.get(handler) != null ? true : false;