]> git.basschouten.com Git - openhab-addons.git/blob
c123190843cb18dd1d85849274af80a73f32547e
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.keba.internal.handler;
14
15 import java.io.IOException;
16 import java.io.UnsupportedEncodingException;
17 import java.net.InetSocketAddress;
18 import java.net.PortUnreachableException;
19 import java.nio.ByteBuffer;
20 import java.nio.channels.CancelledKeyException;
21 import java.nio.channels.ClosedChannelException;
22 import java.nio.channels.ClosedSelectorException;
23 import java.nio.channels.DatagramChannel;
24 import java.nio.channels.NotYetConnectedException;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.Iterator;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.concurrent.locks.ReentrantLock;
34
35 import org.openhab.core.thing.ThingStatus;
36 import org.openhab.core.thing.ThingStatusDetail;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * The {@link KeContactTransceiver} is responsible for receiving UDP broadcast messages sent by the KEBA Charging
42  * Stations. {@link KeContactHandler} willing to receive these messages have to register themselves with the
43  * {@link KeContactTransceiver}
44  *
45  * @author Karel Goderis - Initial contribution
46  */
47
48 public class KeContactTransceiver {
49
50     public static final int LISTENER_PORT_NUMBER = 7090;
51     public static final int REMOTE_PORT_NUMBER = 7090;
52     public static final int LISTENING_INTERVAL = 100;
53     public static final int BUFFER_SIZE = 1024;
54     public static final String IP_ADDRESS = "ipAddress";
55     public static final String POLLING_REFRESH_INTERVAL = "refreshInterval";
56
57     private DatagramChannel broadcastChannel;
58     private SelectionKey broadcastKey;
59     private Selector selector;
60     private Thread transceiverThread;
61     private boolean isStarted = false;
62     private Set<KeContactHandler> handlers = Collections.synchronizedSet(new HashSet<>());
63     private Map<KeContactHandler, DatagramChannel> datagramChannels = Collections.synchronizedMap(new HashMap<>());
64     private Map<KeContactHandler, ByteBuffer> buffers = Collections.synchronizedMap(new HashMap<>());
65     private Map<KeContactHandler, ReentrantLock> locks = Collections.synchronizedMap(new HashMap<>());
66     private Map<KeContactHandler, Boolean> flags = Collections.synchronizedMap(new HashMap<>());
67
68     private final Logger logger = LoggerFactory.getLogger(KeContactTransceiver.class);
69
70     public void start() {
71         if (!isStarted) {
72             logger.debug("Starting the the KEBA KeContact transceiver");
73             try {
74                 selector = Selector.open();
75
76                 if (transceiverThread == null) {
77                     transceiverThread = new Thread(transceiverRunnable, "OH-binding-Keba-Transceiver");
78                     transceiverThread.start();
79                 }
80
81                 broadcastChannel = DatagramChannel.open();
82                 broadcastChannel.socket().bind(new InetSocketAddress(LISTENER_PORT_NUMBER));
83                 broadcastChannel.configureBlocking(false);
84
85                 logger.info("Listening for incoming data on {}", broadcastChannel.getLocalAddress());
86
87                 synchronized (selector) {
88                     selector.wakeup();
89                     broadcastKey = broadcastChannel.register(selector, broadcastChannel.validOps());
90                 }
91
92                 for (KeContactHandler listener : handlers) {
93                     establishConnection(listener);
94                 }
95
96                 isStarted = true;
97             } catch (ClosedSelectorException | CancelledKeyException | IOException e) {
98                 logger.error("An exception occurred while registering the selector: {}", e.getMessage());
99             }
100         }
101     }
102
103     public void stop() {
104         if (isStarted) {
105             for (KeContactHandler listener : handlers) {
106                 this.removeConnection(listener);
107             }
108
109             try {
110                 broadcastChannel.close();
111             } catch (IOException e) {
112                 logger.error("An exception occurred while closing the broadcast channel on port number {} : '{}'",
113                         LISTENER_PORT_NUMBER, e.getMessage(), e);
114             }
115
116             try {
117                 selector.close();
118             } catch (IOException e) {
119                 logger.error("An exception occurred while closing the selector: '{}'", e.getMessage(), e);
120             }
121
122             logger.debug("Stopping the the KEBA KeContact transceiver");
123             if (transceiverThread != null) {
124                 transceiverThread.interrupt();
125                 try {
126                     transceiverThread.join();
127                 } catch (InterruptedException e) {
128                     Thread.currentThread().interrupt();
129                 }
130                 transceiverThread = null;
131             }
132
133             locks.clear();
134             flags.clear();
135
136             isStarted = false;
137         }
138     }
139
140     private void reset() {
141         stop();
142         isStarted = false;
143         start();
144     }
145
146     public void registerHandler(KeContactHandler handler) {
147         if (handler != null) {
148             handlers.add(handler);
149             locks.put(handler, new ReentrantLock());
150
151             if (logger.isTraceEnabled()) {
152                 logger.trace("There are now {} KEBA KeContact handlers registered with the transceiver",
153                         handlers.size());
154             }
155
156             if (handlers.size() == 1) {
157                 start();
158             }
159
160             if (!isConnected(handler)) {
161                 establishConnection(handler);
162             }
163         }
164     }
165
166     public void unRegisterHandler(KeContactHandler handler) {
167         if (handler != null) {
168             locks.remove(handler);
169             handlers.remove(handler);
170
171             if (logger.isTraceEnabled()) {
172                 logger.trace("There are now {} KEBA KeContact handlers registered with the transceiver",
173                         handlers.size());
174             }
175
176             if (handlers.isEmpty()) {
177                 stop();
178             }
179         }
180     }
181
182     protected ByteBuffer send(String message, KeContactHandler handler) {
183         ReentrantLock handlerLock = locks.get(handler);
184
185         if (handlerLock != null) {
186             handlerLock.lock();
187             try {
188                 ByteBuffer buffer = ByteBuffer.allocate(message.getBytes().length);
189                 buffer.put(message.getBytes("ASCII"));
190
191                 flags.put(handler, Boolean.TRUE);
192                 buffers.put(handler, buffer);
193
194                 synchronized (handlerLock) {
195                     if (logger.isTraceEnabled()) {
196                         logger.trace("{} waiting on handerLock {}", Thread.currentThread().getName(),
197                                 handlerLock.toString());
198                     }
199                     handlerLock.wait(KeContactHandler.REPORT_INTERVAL);
200                 }
201
202                 return buffers.remove(handler);
203             } catch (UnsupportedEncodingException | InterruptedException e) {
204                 Thread.currentThread().interrupt();
205                 handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getMessage());
206             } finally {
207                 handlerLock.unlock();
208             }
209         } else {
210             if (logger.isDebugEnabled()) {
211                 logger.debug("The handler for '{}' is not yet registered with the KeContactTransceiver",
212                         handler.getThing().getUID());
213             }
214         }
215         return null;
216     }
217
218     public Runnable transceiverRunnable = () -> {
219         while (true) {
220             try {
221                 synchronized (selector) {
222                     try {
223                         selector.selectNow();
224                     } catch (IOException e) {
225                         logger.error("An exception occurred while selecting: {}", e.getMessage());
226                     }
227
228                     Iterator<SelectionKey> it = selector.selectedKeys().iterator();
229                     while (it.hasNext()) {
230                         SelectionKey selKey = it.next();
231                         it.remove();
232
233                         if (selKey.isValid() && selKey.isWritable()) {
234                             DatagramChannel theChannel = (DatagramChannel) selKey.channel();
235                             KeContactHandler theHandler = null;
236                             boolean error = false;
237
238                             for (KeContactHandler handler : handlers) {
239                                 if (theChannel.equals(datagramChannels.get(handler))) {
240                                     theHandler = handler;
241                                     break;
242                                 }
243                             }
244
245                             if (theHandler != null) {
246                                 ReentrantLock theLock = locks.get(theHandler);
247                                 Boolean theFlag = flags.get(theHandler);
248                                 if (theLock != null && theLock.isLocked() && theFlag != null
249                                         && theFlag.equals(Boolean.TRUE)) {
250                                     ByteBuffer theBuffer = buffers.remove(theHandler);
251                                     flags.put(theHandler, Boolean.FALSE);
252
253                                     if (theBuffer != null) {
254                                         try {
255                                             theBuffer.rewind();
256                                             logger.debug("Sending '{}' on the channel '{}'->'{}'",
257                                                     new Object[] { new String(theBuffer.array()),
258                                                             theChannel.getLocalAddress(),
259                                                             theChannel.getRemoteAddress() });
260                                             int byteswritten = theChannel.write(theBuffer);
261                                         } catch (NotYetConnectedException e) {
262                                             theHandler.updateStatus(ThingStatus.OFFLINE,
263                                                     ThingStatusDetail.COMMUNICATION_ERROR,
264                                                     "The remote host is not yet connected");
265                                             error = true;
266                                         } catch (ClosedChannelException e) {
267                                             theHandler.updateStatus(ThingStatus.OFFLINE,
268                                                     ThingStatusDetail.COMMUNICATION_ERROR,
269                                                     "The connection to the remote host is closed");
270                                             error = true;
271                                         } catch (IOException e) {
272                                             theHandler.updateStatus(ThingStatus.OFFLINE,
273                                                     ThingStatusDetail.COMMUNICATION_ERROR, "An IO exception occurred");
274                                             error = true;
275                                         }
276
277                                         if (error) {
278                                             removeConnection(theHandler);
279                                             establishConnection(theHandler);
280                                         }
281                                     }
282                                 }
283                             }
284                         }
285
286                         if (selKey.isValid() && selKey.isReadable()) {
287                             int numberBytesRead = 0;
288                             InetSocketAddress clientAddress = null;
289                             ByteBuffer readBuffer = null;
290                             boolean error = false;
291
292                             if (selKey.equals(broadcastKey)) {
293                                 try {
294                                     readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
295                                     clientAddress = (InetSocketAddress) broadcastChannel.receive(readBuffer);
296                                     logger.debug("Received {} from {} on the transceiver listener port ",
297                                             new String(readBuffer.array()), clientAddress);
298                                     numberBytesRead = readBuffer.position();
299                                 } catch (IOException e) {
300                                     logger.error(
301                                             "An exception occurred while receiving data on the transceiver listener port: '{}'",
302                                             e.getMessage(), e);
303                                     error = true;
304                                 }
305
306                                 if (numberBytesRead == -1) {
307                                     error = true;
308                                 }
309
310                                 if (!error) {
311                                     readBuffer.flip();
312                                     if (readBuffer.remaining() > 0) {
313                                         for (KeContactHandler handler : handlers) {
314                                             if (clientAddress != null && handler.getIPAddress()
315                                                     .equals(clientAddress.getAddress().getHostAddress())) {
316                                                 ReentrantLock theLock = locks.get(handler);
317                                                 if (theLock != null && theLock.isLocked()) {
318                                                     buffers.put(handler, readBuffer);
319                                                     synchronized (theLock) {
320                                                         if (logger.isTraceEnabled()) {
321                                                             logger.trace("{} notifyall on handerLock {}",
322                                                                     Thread.currentThread().getName(),
323                                                                     theLock.toString());
324                                                         }
325                                                         theLock.notifyAll();
326                                                     }
327                                                 } else {
328                                                     handler.onData(readBuffer);
329                                                 }
330                                             }
331                                         }
332                                     }
333                                 } else {
334                                     handlers.forEach(listener -> listener.updateStatus(ThingStatus.OFFLINE,
335                                             ThingStatusDetail.COMMUNICATION_ERROR, "The transceiver is offline"));
336                                     reset();
337                                 }
338                             } else {
339                                 DatagramChannel theChannel = (DatagramChannel) selKey.channel();
340                                 KeContactHandler theHandler = null;
341
342                                 for (KeContactHandler handlers : handlers) {
343                                     if (datagramChannels.get(handlers).equals(theChannel)) {
344                                         theHandler = handlers;
345                                         break;
346                                     }
347                                 }
348
349                                 if (theHandler != null) {
350                                     try {
351                                         readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
352                                         numberBytesRead = theChannel.read(readBuffer);
353                                         logger.debug("Received {} from {} on the transceiver listener port ",
354                                                 new String(readBuffer.array()), theChannel.getRemoteAddress());
355                                     } catch (NotYetConnectedException e) {
356                                         theHandler.updateStatus(ThingStatus.OFFLINE,
357                                                 ThingStatusDetail.COMMUNICATION_ERROR,
358                                                 "The remote host is not yet connected");
359                                         error = true;
360                                     } catch (PortUnreachableException e) {
361                                         theHandler.updateStatus(ThingStatus.OFFLINE,
362                                                 ThingStatusDetail.CONFIGURATION_ERROR,
363                                                 "The remote host is probably not a KEBA KeContact");
364                                         error = true;
365                                     } catch (IOException e) {
366                                         theHandler.updateStatus(ThingStatus.OFFLINE,
367                                                 ThingStatusDetail.COMMUNICATION_ERROR, "An IO exception occurred");
368                                         error = true;
369                                     }
370
371                                     if (numberBytesRead == -1) {
372                                         error = true;
373                                     }
374
375                                     if (!error) {
376                                         readBuffer.flip();
377                                         if (readBuffer.remaining() > 0) {
378                                             ReentrantLock theLock = locks.get(theHandler);
379                                             if (theLock != null && theLock.isLocked()) {
380                                                 buffers.put(theHandler, readBuffer);
381                                                 synchronized (theLock) {
382                                                     theLock.notifyAll();
383                                                 }
384                                             }
385                                         }
386                                     } else {
387                                         removeConnection(theHandler);
388                                         establishConnection(theHandler);
389                                     }
390                                 }
391                             }
392                         }
393                     }
394                 }
395
396                 if (!Thread.currentThread().isInterrupted()) {
397                     Thread.sleep(LISTENING_INTERVAL);
398                 } else {
399                     return;
400                 }
401             } catch (InterruptedException | ClosedSelectorException e) {
402                 Thread.currentThread().interrupt();
403                 return;
404             }
405         }
406     };
407
408     private void establishConnection(KeContactHandler handler) {
409         if (handler.getThing().getStatusInfo().getStatusDetail() != ThingStatusDetail.CONFIGURATION_ERROR
410                 && handler.getConfig().get(IP_ADDRESS) != null && !handler.getConfig().get(IP_ADDRESS).equals("")) {
411             logger.debug("Establishing the connection to the KEBA KeContact '{}'", handler.getThing().getUID());
412
413             DatagramChannel datagramChannel = null;
414             try {
415                 datagramChannel = DatagramChannel.open();
416             } catch (Exception e2) {
417                 handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
418                         "An exception occurred while opening a datagram channel");
419             }
420
421             if (datagramChannel != null) {
422                 datagramChannels.put(handler, datagramChannel);
423
424                 try {
425                     datagramChannel.configureBlocking(false);
426                 } catch (IOException e2) {
427                     handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
428                             "An exception occurred while configuring a datagram channel");
429                 }
430
431                 synchronized (selector) {
432                     selector.wakeup();
433                     int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
434                     try {
435                         datagramChannel.register(selector, interestSet);
436                     } catch (ClosedChannelException e1) {
437                         handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
438                                 "An exception occurred while registering a selector");
439                     }
440
441                     InetSocketAddress remoteAddress = new InetSocketAddress(
442                             (String) handler.getConfig().get(IP_ADDRESS), REMOTE_PORT_NUMBER);
443
444                     try {
445                         if (logger.isTraceEnabled()) {
446                             logger.trace("Connecting the channel for {} ", remoteAddress);
447                         }
448                         datagramChannel.connect(remoteAddress);
449
450                         handler.updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE, "");
451                     } catch (Exception e) {
452                         logger.debug("An exception occurred while connecting connecting to '{}:{}' : {}", new Object[] {
453                                 (String) handler.getConfig().get(IP_ADDRESS), REMOTE_PORT_NUMBER, e.getMessage() });
454                         handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
455                                 "An exception occurred while connecting");
456                     }
457                 }
458             }
459         } else {
460             handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
461                     handler.getThing().getStatusInfo().getDescription());
462         }
463     }
464
465     private void removeConnection(KeContactHandler handler) {
466         logger.debug("Tearing down the connection to the KEBA KeContact '{}'", handler.getThing().getUID());
467         DatagramChannel datagramChannel = datagramChannels.remove(handler);
468
469         if (datagramChannel != null) {
470             synchronized (selector) {
471                 try {
472                     datagramChannel.keyFor(selector).cancel();
473                     datagramChannel.close();
474                     handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE, "");
475                 } catch (Exception e) {
476                     logger.debug("An exception occurred while closing the datagramchannel for '{}': {}",
477                             handler.getThing().getUID(), e.getMessage());
478                     handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
479                             "An exception occurred while closing the datagramchannel");
480                 }
481             }
482         }
483     }
484
485     private boolean isConnected(KeContactHandler handler) {
486         return datagramChannels.get(handler) != null ? true : false;
487     }
488 }