2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.keba.internal.handler;
15 import java.io.IOException;
16 import java.io.UnsupportedEncodingException;
17 import java.net.InetSocketAddress;
18 import java.net.PortUnreachableException;
19 import java.nio.ByteBuffer;
20 import java.nio.channels.CancelledKeyException;
21 import java.nio.channels.ClosedChannelException;
22 import java.nio.channels.ClosedSelectorException;
23 import java.nio.channels.DatagramChannel;
24 import java.nio.channels.NotYetConnectedException;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.Iterator;
33 import java.util.concurrent.locks.ReentrantLock;
35 import org.openhab.core.thing.ThingStatus;
36 import org.openhab.core.thing.ThingStatusDetail;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
41 * The {@link KeContactTransceiver} is responsible for receiving UDP broadcast messages sent by the KEBA Charging
42 * Stations. {@link KeContactHandler} willing to receive these messages have to register themselves with the
43 * {@link KeContactTransceiver}
45 * @author Karel Goderis - Initial contribution
48 class KeContactTransceiver {
50 public static final int LISTENER_PORT_NUMBER = 7090;
51 public static final int REMOTE_PORT_NUMBER = 7090;
52 public static final int LISTENING_INTERVAL = 100;
53 public static final int BUFFER_SIZE = 1024;
54 public static final String IP_ADDRESS = "ipAddress";
55 public static final String POLLING_REFRESH_INTERVAL = "refreshInterval";
57 private DatagramChannel broadcastChannel;
58 private SelectionKey broadcastKey;
59 private Selector selector;
60 private Thread transceiverThread;
61 private boolean isStarted = false;
62 private Set<KeContactHandler> handlers = Collections.synchronizedSet(new HashSet<>());
63 private Map<KeContactHandler, DatagramChannel> datagramChannels = Collections.synchronizedMap(new HashMap<>());
64 private Map<KeContactHandler, ByteBuffer> buffers = Collections.synchronizedMap(new HashMap<>());
65 private Map<KeContactHandler, ReentrantLock> locks = Collections.synchronizedMap(new HashMap<>());
66 private Map<KeContactHandler, Boolean> flags = Collections.synchronizedMap(new HashMap<>());
68 private final Logger logger = LoggerFactory.getLogger(KeContactTransceiver.class);
72 logger.debug("Starting the the KEBA KeContact transceiver");
74 selector = Selector.open();
76 if (transceiverThread == null) {
77 transceiverThread = new Thread(transceiverRunnable, "openHAB-Keba-Transceiver");
78 transceiverThread.start();
81 broadcastChannel = DatagramChannel.open();
82 broadcastChannel.socket().bind(new InetSocketAddress(LISTENER_PORT_NUMBER));
83 broadcastChannel.configureBlocking(false);
85 logger.info("Listening for incoming data on {}", broadcastChannel.getLocalAddress());
87 synchronized (selector) {
89 broadcastKey = broadcastChannel.register(selector, broadcastChannel.validOps());
92 for (KeContactHandler listener : handlers) {
93 establishConnection(listener);
97 } catch (ClosedSelectorException | CancelledKeyException | IOException e) {
98 logger.error("An exception occurred while registering the selector: {}", e.getMessage());
105 for (KeContactHandler listener : handlers) {
106 this.removeConnection(listener);
110 broadcastChannel.close();
111 } catch (IOException e) {
112 logger.error("An exception occurred while closing the broadcast channel on port number {} : '{}'",
113 LISTENER_PORT_NUMBER, e.getMessage(), e);
118 } catch (IOException e) {
119 logger.error("An exception occurred while closing the selector: '{}'", e.getMessage(), e);
122 logger.debug("Stopping the the KEBA KeContact transceiver");
123 if (transceiverThread != null) {
124 transceiverThread.interrupt();
126 transceiverThread.join();
127 } catch (InterruptedException e) {
128 Thread.currentThread().interrupt();
130 transceiverThread = null;
140 private void reset() {
146 public void registerHandler(KeContactHandler handler) {
147 if (handler != null) {
148 handlers.add(handler);
149 locks.put(handler, new ReentrantLock());
151 if (logger.isTraceEnabled()) {
152 logger.trace("There are now {} KEBA KeContact handlers registered with the transceiver",
156 if (handlers.size() == 1) {
160 if (!isConnected(handler)) {
161 establishConnection(handler);
166 public void unRegisterHandler(KeContactHandler handler) {
167 if (handler != null) {
168 locks.remove(handler);
169 handlers.remove(handler);
171 if (logger.isTraceEnabled()) {
172 logger.trace("There are now {} KEBA KeContact handlers registered with the transceiver",
176 if (handlers.isEmpty()) {
182 protected ByteBuffer send(String message, KeContactHandler handler) {
183 ReentrantLock handlerLock = locks.get(handler);
185 if (handlerLock != null) {
188 ByteBuffer buffer = ByteBuffer.allocate(message.getBytes().length);
189 buffer.put(message.getBytes("ASCII"));
191 flags.put(handler, Boolean.TRUE);
192 buffers.put(handler, buffer);
194 synchronized (handlerLock) {
195 if (logger.isTraceEnabled()) {
196 logger.trace("{} waiting on handerLock {}", Thread.currentThread().getName(),
197 handlerLock.toString());
199 handlerLock.wait(KeContactHandler.REPORT_INTERVAL);
202 return buffers.remove(handler);
203 } catch (UnsupportedEncodingException | InterruptedException e) {
204 Thread.currentThread().interrupt();
205 handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getMessage());
207 handlerLock.unlock();
210 if (logger.isDebugEnabled()) {
211 logger.debug("The handler for '{}' is not yet registered with the KeContactTransceiver",
212 handler.getThing().getUID());
218 public Runnable transceiverRunnable = () -> {
221 synchronized (selector) {
223 selector.selectNow();
224 } catch (IOException e) {
225 logger.error("An exception occurred while selecting: {}", e.getMessage());
228 Iterator<SelectionKey> it = selector.selectedKeys().iterator();
229 while (it.hasNext()) {
230 SelectionKey selKey = it.next();
233 if (selKey.isValid() && selKey.isWritable()) {
234 DatagramChannel theChannel = (DatagramChannel) selKey.channel();
235 KeContactHandler theHandler = null;
236 boolean error = false;
238 for (KeContactHandler handler : handlers) {
239 if (theChannel.equals(datagramChannels.get(handler))) {
240 theHandler = handler;
245 if (theHandler != null) {
246 ReentrantLock theLock = locks.get(theHandler);
247 Boolean theFlag = flags.get(theHandler);
248 if (theLock != null && theLock.isLocked() && theFlag != null
249 && theFlag.equals(Boolean.TRUE)) {
250 ByteBuffer theBuffer = buffers.remove(theHandler);
251 flags.put(theHandler, Boolean.FALSE);
253 if (theBuffer != null) {
256 logger.debug("Sending '{}' on the channel '{}'->'{}'",
257 new Object[] { new String(theBuffer.array()),
258 theChannel.getLocalAddress(),
259 theChannel.getRemoteAddress() });
260 int byteswritten = theChannel.write(theBuffer);
261 } catch (NotYetConnectedException e) {
262 theHandler.updateStatus(ThingStatus.OFFLINE,
263 ThingStatusDetail.COMMUNICATION_ERROR,
264 "The remote host is not yet connected");
266 } catch (ClosedChannelException e) {
267 theHandler.updateStatus(ThingStatus.OFFLINE,
268 ThingStatusDetail.COMMUNICATION_ERROR,
269 "The connection to the remote host is closed");
271 } catch (IOException e) {
272 theHandler.updateStatus(ThingStatus.OFFLINE,
273 ThingStatusDetail.COMMUNICATION_ERROR, "An IO exception occurred");
278 removeConnection(theHandler);
279 establishConnection(theHandler);
286 if (selKey.isValid() && selKey.isReadable()) {
287 int numberBytesRead = 0;
288 InetSocketAddress clientAddress = null;
289 ByteBuffer readBuffer = null;
290 boolean error = false;
292 if (selKey.equals(broadcastKey)) {
294 readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
295 clientAddress = (InetSocketAddress) broadcastChannel.receive(readBuffer);
296 logger.debug("Received {} from {} on the transceiver listener port ",
297 new String(readBuffer.array()), clientAddress);
298 numberBytesRead = readBuffer.position();
299 } catch (IOException e) {
301 "An exception occurred while receiving data on the transceiver listener port: '{}'",
306 if (numberBytesRead == -1) {
312 if (readBuffer.remaining() > 0) {
313 for (KeContactHandler handler : handlers) {
314 if (clientAddress != null && handler.getIPAddress()
315 .equals(clientAddress.getAddress().getHostAddress())) {
316 ReentrantLock theLock = locks.get(handler);
317 if (theLock != null && theLock.isLocked()) {
318 buffers.put(handler, readBuffer);
319 synchronized (theLock) {
320 if (logger.isTraceEnabled()) {
321 logger.trace("{} notifyall on handerLock {}",
322 Thread.currentThread().getName(),
328 handler.onData(readBuffer);
334 handlers.forEach(listener -> listener.updateStatus(ThingStatus.OFFLINE,
335 ThingStatusDetail.COMMUNICATION_ERROR, "The transceiver is offline"));
339 DatagramChannel theChannel = (DatagramChannel) selKey.channel();
340 KeContactHandler theHandler = null;
342 for (KeContactHandler handlers : handlers) {
343 if (datagramChannels.get(handlers).equals(theChannel)) {
344 theHandler = handlers;
349 if (theHandler != null) {
351 readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
352 numberBytesRead = theChannel.read(readBuffer);
353 logger.debug("Received {} from {} on the transceiver listener port ",
354 new String(readBuffer.array()), theChannel.getRemoteAddress());
355 } catch (NotYetConnectedException e) {
356 theHandler.updateStatus(ThingStatus.OFFLINE,
357 ThingStatusDetail.COMMUNICATION_ERROR,
358 "The remote host is not yet connected");
360 } catch (PortUnreachableException e) {
361 theHandler.updateStatus(ThingStatus.OFFLINE,
362 ThingStatusDetail.CONFIGURATION_ERROR,
363 "The remote host is probably not a KEBA KeContact");
365 } catch (IOException e) {
366 theHandler.updateStatus(ThingStatus.OFFLINE,
367 ThingStatusDetail.COMMUNICATION_ERROR, "An IO exception occurred");
371 if (numberBytesRead == -1) {
377 if (readBuffer.remaining() > 0) {
378 ReentrantLock theLock = locks.get(theHandler);
379 if (theLock != null && theLock.isLocked()) {
380 buffers.put(theHandler, readBuffer);
381 synchronized (theLock) {
387 removeConnection(theHandler);
388 establishConnection(theHandler);
396 if (!Thread.currentThread().isInterrupted()) {
397 Thread.sleep(LISTENING_INTERVAL);
401 } catch (InterruptedException | ClosedSelectorException e) {
402 Thread.currentThread().interrupt();
408 private void establishConnection(KeContactHandler handler) {
409 if (handler.getThing().getStatusInfo().getStatusDetail() != ThingStatusDetail.CONFIGURATION_ERROR
410 && handler.getConfig().get(IP_ADDRESS) != null && !handler.getConfig().get(IP_ADDRESS).equals("")) {
411 logger.debug("Establishing the connection to the KEBA KeContact '{}'", handler.getThing().getUID());
413 DatagramChannel datagramChannel = null;
415 datagramChannel = DatagramChannel.open();
416 } catch (Exception e2) {
417 handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
418 "An exception occurred while opening a datagram channel");
421 if (datagramChannel != null) {
422 datagramChannels.put(handler, datagramChannel);
425 datagramChannel.configureBlocking(false);
426 } catch (IOException e2) {
427 handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
428 "An exception occurred while configuring a datagram channel");
431 synchronized (selector) {
433 int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
435 datagramChannel.register(selector, interestSet);
436 } catch (ClosedChannelException e1) {
437 handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
438 "An exception occurred while registering a selector");
441 InetSocketAddress remoteAddress = new InetSocketAddress(
442 (String) handler.getConfig().get(IP_ADDRESS), REMOTE_PORT_NUMBER);
445 if (logger.isTraceEnabled()) {
446 logger.trace("Connecting the channel for {} ", remoteAddress);
448 datagramChannel.connect(remoteAddress);
450 handler.updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE, "");
451 } catch (Exception e) {
452 logger.debug("An exception occurred while connecting connecting to '{}:{}' : {}", new Object[] {
453 (String) handler.getConfig().get(IP_ADDRESS), REMOTE_PORT_NUMBER, e.getMessage() });
454 handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
455 "An exception occurred while connecting");
460 handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
461 handler.getThing().getStatusInfo().getDescription());
465 private void removeConnection(KeContactHandler handler) {
466 logger.debug("Tearing down the connection to the KEBA KeContact '{}'", handler.getThing().getUID());
467 DatagramChannel datagramChannel = datagramChannels.remove(handler);
469 if (datagramChannel != null) {
470 synchronized (selector) {
472 datagramChannel.keyFor(selector).cancel();
473 datagramChannel.close();
474 handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE, "");
475 } catch (Exception e) {
476 logger.debug("An exception occurred while closing the datagramchannel for '{}': {}",
477 handler.getThing().getUID(), e.getMessage());
478 handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
479 "An exception occurred while closing the datagramchannel");
485 private boolean isConnected(KeContactHandler handler) {
486 return datagramChannels.get(handler) != null ? true : false;