]> git.basschouten.com Git - openhab-addons.git/blob
4a3eb1e00d9a732839f72eb3fca8e89bfcedb34b
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.keba.internal.handler;
14
15 import java.io.IOException;
16 import java.io.UnsupportedEncodingException;
17 import java.net.InetSocketAddress;
18 import java.net.PortUnreachableException;
19 import java.nio.ByteBuffer;
20 import java.nio.channels.CancelledKeyException;
21 import java.nio.channels.ClosedChannelException;
22 import java.nio.channels.ClosedSelectorException;
23 import java.nio.channels.DatagramChannel;
24 import java.nio.channels.NotYetConnectedException;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.Iterator;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.concurrent.locks.ReentrantLock;
34
35 import org.openhab.core.thing.ThingStatus;
36 import org.openhab.core.thing.ThingStatusDetail;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * The {@link KeContactTransceiver} is responsible for receiving UDP broadcast messages sent by the KEBA Charging
42  * Stations. {@link KeContactHandler} willing to receive these messages have to register themselves with the
43  * {@link KeContactTransceiver}
44  *
45  * @author Karel Goderis - Initial contribution
46  */
47
48 public class KeContactTransceiver {
49
50     public static final int LISTENER_PORT_NUMBER = 7090;
51     public static final int LISTENING_INTERVAL = 100;
52     public static final int BUFFER_SIZE = 1024;
53
54     private DatagramChannel broadcastChannel;
55     private SelectionKey broadcastKey;
56     private Selector selector;
57     private Thread transceiverThread;
58     private boolean isStarted = false;
59     private Set<KeContactHandler> handlers = Collections.synchronizedSet(new HashSet<>());
60     private Map<KeContactHandler, DatagramChannel> datagramChannels = Collections.synchronizedMap(new HashMap<>());
61     private Map<KeContactHandler, ByteBuffer> buffers = Collections.synchronizedMap(new HashMap<>());
62     private Map<KeContactHandler, ReentrantLock> locks = Collections.synchronizedMap(new HashMap<>());
63     private Map<KeContactHandler, Boolean> flags = Collections.synchronizedMap(new HashMap<>());
64
65     private final Logger logger = LoggerFactory.getLogger(KeContactTransceiver.class);
66
67     public void start() {
68         if (!isStarted) {
69             logger.debug("Starting the the KEBA KeContact transceiver");
70             try {
71                 selector = Selector.open();
72
73                 if (transceiverThread == null) {
74                     transceiverThread = new Thread(transceiverRunnable, "OH-binding-Keba-Transceiver");
75                     transceiverThread.start();
76                 }
77
78                 broadcastChannel = DatagramChannel.open();
79                 broadcastChannel.socket().bind(new InetSocketAddress(LISTENER_PORT_NUMBER));
80                 broadcastChannel.configureBlocking(false);
81
82                 logger.info("Listening for incoming data on {}", broadcastChannel.getLocalAddress());
83
84                 synchronized (selector) {
85                     selector.wakeup();
86                     broadcastKey = broadcastChannel.register(selector, broadcastChannel.validOps());
87                 }
88
89                 for (KeContactHandler listener : handlers) {
90                     establishConnection(listener);
91                 }
92
93                 isStarted = true;
94             } catch (ClosedSelectorException | CancelledKeyException | IOException e) {
95                 logger.error("An exception occurred while registering the selector: {}", e.getMessage());
96             }
97         }
98     }
99
100     public void stop() {
101         if (isStarted) {
102             for (KeContactHandler listener : handlers) {
103                 this.removeConnection(listener);
104             }
105
106             try {
107                 broadcastChannel.close();
108             } catch (IOException e) {
109                 logger.error("An exception occurred while closing the broadcast channel on port number {} : '{}'",
110                         LISTENER_PORT_NUMBER, e.getMessage(), e);
111             }
112
113             try {
114                 selector.close();
115             } catch (IOException e) {
116                 logger.error("An exception occurred while closing the selector: '{}'", e.getMessage(), e);
117             }
118
119             logger.debug("Stopping the the KEBA KeContact transceiver");
120             if (transceiverThread != null) {
121                 transceiverThread.interrupt();
122                 try {
123                     transceiverThread.join();
124                 } catch (InterruptedException e) {
125                     Thread.currentThread().interrupt();
126                 }
127                 transceiverThread = null;
128             }
129
130             locks.clear();
131             flags.clear();
132
133             isStarted = false;
134         }
135     }
136
137     private void reset() {
138         stop();
139         isStarted = false;
140         start();
141     }
142
143     public void registerHandler(KeContactHandler handler) {
144         if (handler != null) {
145             handlers.add(handler);
146             locks.put(handler, new ReentrantLock());
147
148             if (logger.isTraceEnabled()) {
149                 logger.trace("There are now {} KEBA KeContact handlers registered with the transceiver",
150                         handlers.size());
151             }
152
153             if (handlers.size() == 1) {
154                 start();
155             }
156
157             if (!isConnected(handler)) {
158                 establishConnection(handler);
159             }
160         }
161     }
162
163     public void unRegisterHandler(KeContactHandler handler) {
164         if (handler != null) {
165             locks.remove(handler);
166             handlers.remove(handler);
167
168             if (logger.isTraceEnabled()) {
169                 logger.trace("There are now {} KEBA KeContact handlers registered with the transceiver",
170                         handlers.size());
171             }
172
173             if (handlers.isEmpty()) {
174                 stop();
175             }
176         }
177     }
178
179     protected ByteBuffer send(String message, KeContactHandler handler) {
180         ReentrantLock handlerLock = locks.get(handler);
181
182         if (handlerLock != null) {
183             handlerLock.lock();
184             try {
185                 ByteBuffer buffer = ByteBuffer.allocate(message.getBytes().length);
186                 buffer.put(message.getBytes("ASCII"));
187
188                 flags.put(handler, Boolean.TRUE);
189                 buffers.put(handler, buffer);
190
191                 synchronized (handlerLock) {
192                     if (logger.isTraceEnabled()) {
193                         logger.trace("{} waiting on handerLock {}", Thread.currentThread().getName(),
194                                 handlerLock.toString());
195                     }
196                     handlerLock.wait(KeContactHandler.REPORT_INTERVAL);
197                 }
198
199                 return buffers.remove(handler);
200             } catch (UnsupportedEncodingException | InterruptedException e) {
201                 Thread.currentThread().interrupt();
202                 handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getMessage());
203             } finally {
204                 handlerLock.unlock();
205             }
206         } else {
207             if (logger.isDebugEnabled()) {
208                 logger.debug("The handler for '{}' is not yet registered with the KeContactTransceiver",
209                         handler.getThing().getUID());
210             }
211         }
212         return null;
213     }
214
215     public Runnable transceiverRunnable = () -> {
216         while (true) {
217             try {
218                 synchronized (selector) {
219                     try {
220                         selector.selectNow();
221                     } catch (IOException e) {
222                         logger.error("An exception occurred while selecting: {}", e.getMessage());
223                     }
224
225                     Iterator<SelectionKey> it = selector.selectedKeys().iterator();
226                     while (it.hasNext()) {
227                         SelectionKey selKey = it.next();
228                         it.remove();
229
230                         if (selKey.isValid() && selKey.isWritable()) {
231                             DatagramChannel theChannel = (DatagramChannel) selKey.channel();
232                             KeContactHandler theHandler = null;
233                             boolean error = false;
234
235                             for (KeContactHandler handler : handlers) {
236                                 if (theChannel.equals(datagramChannels.get(handler))) {
237                                     theHandler = handler;
238                                     break;
239                                 }
240                             }
241
242                             if (theHandler != null) {
243                                 ReentrantLock theLock = locks.get(theHandler);
244                                 Boolean theFlag = flags.get(theHandler);
245                                 if (theLock != null && theLock.isLocked() && theFlag != null
246                                         && theFlag.equals(Boolean.TRUE)) {
247                                     ByteBuffer theBuffer = buffers.remove(theHandler);
248                                     flags.put(theHandler, Boolean.FALSE);
249
250                                     if (theBuffer != null) {
251                                         try {
252                                             theBuffer.rewind();
253                                             logger.debug("Sending '{}' on the channel '{}'->'{}'",
254                                                     new Object[] { new String(theBuffer.array()),
255                                                             theChannel.getLocalAddress(),
256                                                             theChannel.getRemoteAddress() });
257                                             theChannel.write(theBuffer);
258                                         } catch (NotYetConnectedException e) {
259                                             theHandler.updateStatus(ThingStatus.OFFLINE,
260                                                     ThingStatusDetail.COMMUNICATION_ERROR,
261                                                     "The remote host is not yet connected");
262                                             error = true;
263                                         } catch (ClosedChannelException e) {
264                                             theHandler.updateStatus(ThingStatus.OFFLINE,
265                                                     ThingStatusDetail.COMMUNICATION_ERROR,
266                                                     "The connection to the remote host is closed");
267                                             error = true;
268                                         } catch (IOException e) {
269                                             theHandler.updateStatus(ThingStatus.OFFLINE,
270                                                     ThingStatusDetail.COMMUNICATION_ERROR, "An IO exception occurred");
271                                             error = true;
272                                         }
273
274                                         if (error) {
275                                             removeConnection(theHandler);
276                                             establishConnection(theHandler);
277                                         }
278                                     }
279                                 }
280                             }
281                         }
282
283                         if (selKey.isValid() && selKey.isReadable()) {
284                             int numberBytesRead = 0;
285                             InetSocketAddress clientAddress = null;
286                             ByteBuffer readBuffer = null;
287                             boolean error = false;
288
289                             if (selKey.equals(broadcastKey)) {
290                                 try {
291                                     readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
292                                     clientAddress = (InetSocketAddress) broadcastChannel.receive(readBuffer);
293                                     logger.debug("Received {} from {} on the transceiver listener port ",
294                                             new String(readBuffer.array()), clientAddress);
295                                     numberBytesRead = readBuffer.position();
296                                 } catch (IOException e) {
297                                     logger.error(
298                                             "An exception occurred while receiving data on the transceiver listener port: '{}'",
299                                             e.getMessage(), e);
300                                     error = true;
301                                 }
302
303                                 if (numberBytesRead == -1) {
304                                     error = true;
305                                 }
306
307                                 if (!error) {
308                                     readBuffer.flip();
309                                     if (readBuffer.remaining() > 0) {
310                                         for (KeContactHandler handler : handlers) {
311                                             if (clientAddress != null && handler.getIPAddress()
312                                                     .equals(clientAddress.getAddress().getHostAddress())) {
313                                                 ReentrantLock theLock = locks.get(handler);
314                                                 if (theLock != null && theLock.isLocked()) {
315                                                     buffers.put(handler, readBuffer);
316                                                     synchronized (theLock) {
317                                                         if (logger.isTraceEnabled()) {
318                                                             logger.trace("{} notifyall on handerLock {}",
319                                                                     Thread.currentThread().getName(),
320                                                                     theLock.toString());
321                                                         }
322                                                         theLock.notifyAll();
323                                                     }
324                                                 } else {
325                                                     handler.onData(readBuffer);
326                                                 }
327                                             }
328                                         }
329                                     }
330                                 } else {
331                                     handlers.forEach(listener -> listener.updateStatus(ThingStatus.OFFLINE,
332                                             ThingStatusDetail.COMMUNICATION_ERROR, "The transceiver is offline"));
333                                     reset();
334                                 }
335                             } else {
336                                 DatagramChannel theChannel = (DatagramChannel) selKey.channel();
337                                 KeContactHandler theHandler = null;
338
339                                 for (KeContactHandler handlers : handlers) {
340                                     if (datagramChannels.get(handlers).equals(theChannel)) {
341                                         theHandler = handlers;
342                                         break;
343                                     }
344                                 }
345
346                                 if (theHandler != null) {
347                                     try {
348                                         readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
349                                         numberBytesRead = theChannel.read(readBuffer);
350                                         logger.debug("Received {} from {} on the transceiver listener port ",
351                                                 new String(readBuffer.array()), theChannel.getRemoteAddress());
352                                     } catch (NotYetConnectedException e) {
353                                         theHandler.updateStatus(ThingStatus.OFFLINE,
354                                                 ThingStatusDetail.COMMUNICATION_ERROR,
355                                                 "The remote host is not yet connected");
356                                         error = true;
357                                     } catch (PortUnreachableException e) {
358                                         theHandler.updateStatus(ThingStatus.OFFLINE,
359                                                 ThingStatusDetail.CONFIGURATION_ERROR,
360                                                 "The remote host is probably not a KEBA KeContact");
361                                         error = true;
362                                     } catch (IOException e) {
363                                         theHandler.updateStatus(ThingStatus.OFFLINE,
364                                                 ThingStatusDetail.COMMUNICATION_ERROR, "An IO exception occurred");
365                                         error = true;
366                                     }
367
368                                     if (numberBytesRead == -1) {
369                                         error = true;
370                                     }
371
372                                     if (!error) {
373                                         readBuffer.flip();
374                                         if (readBuffer.remaining() > 0) {
375                                             ReentrantLock theLock = locks.get(theHandler);
376                                             if (theLock != null && theLock.isLocked()) {
377                                                 buffers.put(theHandler, readBuffer);
378                                                 synchronized (theLock) {
379                                                     theLock.notifyAll();
380                                                 }
381                                             }
382                                         }
383                                     } else {
384                                         removeConnection(theHandler);
385                                         establishConnection(theHandler);
386                                     }
387                                 }
388                             }
389                         }
390                     }
391                 }
392
393                 if (!Thread.currentThread().isInterrupted()) {
394                     Thread.sleep(LISTENING_INTERVAL);
395                 } else {
396                     return;
397                 }
398             } catch (InterruptedException | ClosedSelectorException e) {
399                 Thread.currentThread().interrupt();
400                 return;
401             }
402         }
403     };
404
405     private void establishConnection(KeContactHandler handler) {
406         String ipAddress = handler.getIPAddress();
407         if (handler.getThing().getStatusInfo().getStatusDetail() != ThingStatusDetail.CONFIGURATION_ERROR
408                 && !ipAddress.equals("")) {
409             logger.debug("Establishing the connection to the KEBA KeContact '{}'", handler.getThing().getUID());
410
411             DatagramChannel datagramChannel = null;
412             try {
413                 datagramChannel = DatagramChannel.open();
414             } catch (Exception e2) {
415                 handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
416                         "An exception occurred while opening a datagram channel");
417             }
418
419             if (datagramChannel != null) {
420                 datagramChannels.put(handler, datagramChannel);
421
422                 try {
423                     datagramChannel.configureBlocking(false);
424                 } catch (IOException e2) {
425                     handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
426                             "An exception occurred while configuring a datagram channel");
427                 }
428
429                 synchronized (selector) {
430                     selector.wakeup();
431                     int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
432                     try {
433                         datagramChannel.register(selector, interestSet);
434                     } catch (ClosedChannelException e1) {
435                         handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
436                                 "An exception occurred while registering a selector");
437                     }
438
439                     InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, LISTENER_PORT_NUMBER);
440
441                     try {
442                         if (logger.isTraceEnabled()) {
443                             logger.trace("Connecting the channel for {} ", remoteAddress);
444                         }
445                         datagramChannel.connect(remoteAddress);
446
447                         handler.updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE, "");
448                     } catch (Exception e) {
449                         logger.debug("An exception occurred while connecting connecting to '{}:{}' : {}",
450                                 new Object[] { ipAddress, LISTENER_PORT_NUMBER, e.getMessage() });
451                         handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
452                                 "An exception occurred while connecting");
453                     }
454                 }
455             }
456         } else {
457             handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
458                     handler.getThing().getStatusInfo().getDescription());
459         }
460     }
461
462     private void removeConnection(KeContactHandler handler) {
463         logger.debug("Tearing down the connection to the KEBA KeContact '{}'", handler.getThing().getUID());
464         DatagramChannel datagramChannel = datagramChannels.remove(handler);
465
466         if (datagramChannel != null) {
467             synchronized (selector) {
468                 try {
469                     datagramChannel.keyFor(selector).cancel();
470                     datagramChannel.close();
471                     handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE, "");
472                 } catch (Exception e) {
473                     logger.debug("An exception occurred while closing the datagramchannel for '{}': {}",
474                             handler.getThing().getUID(), e.getMessage());
475                     handler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
476                             "An exception occurred while closing the datagramchannel");
477                 }
478             }
479         }
480     }
481
482     private boolean isConnected(KeContactHandler handler) {
483         return datagramChannels.get(handler) != null ? true : false;
484     }
485 }