]> git.basschouten.com Git - openhab-addons.git/blob
008a53fa6115bfd9d4c027317ccf05c36c5ddf25
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.anel.internal;
14
15 import java.io.IOException;
16 import java.net.DatagramPacket;
17 import java.net.DatagramSocket;
18 import java.net.InetAddress;
19 import java.net.SocketException;
20 import java.util.Arrays;
21 import java.util.Objects;
22 import java.util.concurrent.ExecutorService;
23 import java.util.function.Consumer;
24
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.openhab.core.common.NamedThreadFactory;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * This class handles the actual communication to ANEL devices.
33  *
34  * @author Patrick Koenemann - Initial contribution
35  */
36 @NonNullByDefault
37 public class AnelUdpConnector {
38
39     /** Buffer for incoming UDP packages. */
40     private static final int MAX_PACKET_SIZE = 512;
41
42     private final Logger logger = LoggerFactory.getLogger(AnelUdpConnector.class);
43
44     /** The device IP this connector is listening to / sends to. */
45     private final String host;
46
47     /** The port this connector is listening to. */
48     private final int receivePort;
49
50     /** The port this connector is sending to. */
51     private final int sendPort;
52
53     /** Service to spawn new threads for handling status updates. */
54     private final ExecutorService executorService;
55
56     /** Thread factory for UDP listening thread. */
57     private final NamedThreadFactory listeningThreadFactory = new NamedThreadFactory(IAnelConstants.BINDING_ID, true);
58
59     /** Socket for receiving UDP packages. */
60     private @Nullable DatagramSocket receivingSocket = null;
61     /** Socket for sending UDP packages. */
62     private @Nullable DatagramSocket sendingSocket = null;
63
64     /** The listener that gets notified upon newly received messages. */
65     private @Nullable Consumer<String> listener;
66
67     private int receiveFailures = 0;
68     private boolean listenerActive = false;
69
70     /**
71      * Create a new connector to an Anel device via the given host and UDP
72      * ports.
73      *
74      * @param host
75      *            The IP address / network name of the device.
76      * @param udpReceivePort
77      *            The UDP port to listen for packages.
78      * @param udpSendPort
79      *            The UDP port to send packages.
80      */
81     public AnelUdpConnector(String host, int udpReceivePort, int udpSendPort, ExecutorService executorService) {
82         if (udpReceivePort <= 0) {
83             throw new IllegalArgumentException("Invalid udpReceivePort: " + udpReceivePort);
84         }
85         if (udpSendPort <= 0) {
86             throw new IllegalArgumentException("Invalid udpSendPort: " + udpSendPort);
87         }
88         if (host.trim().isEmpty()) {
89             throw new IllegalArgumentException("Missing host.");
90         }
91         this.host = host;
92         this.receivePort = udpReceivePort;
93         this.sendPort = udpSendPort;
94         this.executorService = executorService;
95     }
96
97     /**
98      * Initialize socket connection to the UDP receive port for the given listener.
99      *
100      * @throws SocketException Is only thrown if <code>logNotTHrowException = false</code>.
101      * @throws InterruptedException Typically happens during shutdown.
102      */
103     public void connect(Consumer<String> listener, boolean logNotThrowExcpetion)
104             throws SocketException, InterruptedException {
105         if (receivingSocket == null) {
106             try {
107                 receivingSocket = new DatagramSocket(receivePort);
108                 sendingSocket = new DatagramSocket();
109                 this.listener = listener;
110
111                 /*-
112                  * Due to the issue with 4 concurrently listening threads [1], we should follow Kais suggestion [2]
113                  * to create our own listening daemonized thread.
114                  *
115                  * [1] https://community.openhab.org/t/anel-net-pwrctrl-binding-for-oh3/123378
116                  * [2] https://www.eclipse.org/forums/index.php/m/1775932/?#msg_1775429
117                  */
118                 listeningThreadFactory.newThread(this::listen).start();
119
120                 // wait for the listening thread to be active
121                 for (int i = 0; i < 20 && !listenerActive; i++) {
122                     Thread.sleep(100); // wait at most 20 * 100ms = 2sec for the listener to be active
123                 }
124                 if (!listenerActive) {
125                     logger.warn(
126                             "Listener thread started but listener is not yet active after 2sec; something seems to be wrong with the JVM thread handling?!");
127                 }
128             } catch (SocketException e) {
129                 if (logNotThrowExcpetion) {
130                     logger.warn(
131                             "Failed to open socket connection on port {} (maybe there is already another socket listener on that port?)",
132                             receivePort, e);
133                 }
134
135                 disconnect();
136
137                 if (!logNotThrowExcpetion) {
138                     throw e;
139                 }
140             }
141         } else if (!Objects.equals(this.listener, listener)) {
142             throw new IllegalStateException("A listening thread is already running");
143         }
144     }
145
146     private void listen() {
147         try {
148             listenUnhandledInterruption();
149         } catch (InterruptedException e) {
150             // OH shutdown - don't log anything, just quit
151         }
152     }
153
154     private void listenUnhandledInterruption() throws InterruptedException {
155         logger.info("Anel NET-PwrCtrl listener started for: '{}:{}'", host, receivePort);
156
157         final Consumer<String> listener2 = listener;
158         final DatagramSocket socket2 = receivingSocket;
159         while (listener2 != null && socket2 != null && receivingSocket != null) {
160             try {
161                 final DatagramPacket packet = new DatagramPacket(new byte[MAX_PACKET_SIZE], MAX_PACKET_SIZE);
162
163                 listenerActive = true;
164                 socket2.receive(packet); // receive packet (blocking call)
165                 listenerActive = false;
166
167                 final byte[] data = Arrays.copyOfRange(packet.getData(), 0, packet.getLength() - 1);
168
169                 if (data == null || data.length == 0) {
170                     if (isConnected()) {
171                         logger.debug("Nothing received, this may happen during shutdown or some unknown error");
172                     }
173                     continue;
174                 }
175                 receiveFailures = 0; // message successfully received, unset failure counter
176
177                 /* useful for debugging without logger (e.g. in AnelUdpConnectorTest): */
178                 // System.out.println(String.format("%s [%s] received: %s", getClass().getSimpleName(),
179                 // new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()), new String(data).trim()));
180
181                 // log & notify listener in new thread (so that listener loop continues immediately)
182                 executorService.execute(() -> {
183                     final String message = new String(data);
184
185                     logger.debug("Received data on port {}: {}", receivePort, message);
186
187                     listener2.accept(message);
188                 });
189             } catch (Exception e) {
190                 listenerActive = false;
191
192                 if (receivingSocket == null) {
193                     logger.debug("Socket closed; stopping listener on port {}.", receivePort);
194                 } else {
195                     // if we get 3 errors in a row, we should better add a delay to stop spamming the log!
196                     if (receiveFailures++ > IAnelConstants.ATTEMPTS_WITH_COMMUNICATION_ERRORS) {
197                         logger.debug(
198                                 "Unexpected error while listening on port {}; waiting 10sec before the next attempt to listen on that port.",
199                                 receivePort, e);
200                         for (int i = 0; i < 50 && receivingSocket != null; i++) {
201                             Thread.sleep(200); // 50 * 200ms = 10sec
202                         }
203                     } else {
204                         logger.warn("Unexpected error while listening on port {}", receivePort, e);
205                     }
206                 }
207             }
208         }
209     }
210
211     /** Close the socket connection. */
212     public void disconnect() {
213         logger.debug("Anel NET-PwrCtrl listener stopped for: '{}:{}'", host, receivePort);
214         listener = null;
215         final DatagramSocket receivingSocket2 = receivingSocket;
216         if (receivingSocket2 != null) {
217             receivingSocket = null;
218             if (!receivingSocket2.isClosed()) {
219                 receivingSocket2.close(); // this interrupts and terminates the listening thread
220             }
221         }
222         final DatagramSocket sendingSocket2 = sendingSocket;
223         if (sendingSocket2 != null) {
224             synchronized (this) {
225                 if (Objects.equals(sendingSocket, sendingSocket2)) {
226                     sendingSocket = null;
227                     if (!sendingSocket2.isClosed()) {
228                         sendingSocket2.close();
229                     }
230                 }
231             }
232         }
233     }
234
235     public void send(String msg) throws IOException {
236         logger.debug("Sending message '{}' to {}:{}", msg, host, sendPort);
237         if (msg.isEmpty()) {
238             throw new IllegalArgumentException("Message must not be empty");
239         }
240
241         final InetAddress ipAddress = InetAddress.getByName(host);
242         final byte[] bytes = msg.getBytes();
243         final DatagramPacket packet = new DatagramPacket(bytes, bytes.length, ipAddress, sendPort);
244
245         // make sure we are not interrupted by a disconnect while sending this message
246         synchronized (this) {
247             final DatagramSocket sendingSocket2 = sendingSocket;
248             if (sendingSocket2 != null) {
249                 sendingSocket2.send(packet);
250
251                 /* useful for debugging without logger (e.g. in AnelUdpConnectorTest): */
252                 // System.out.println(String.format("%s [%s] sent: %s", getClass().getSimpleName(),
253                 // new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()), msg));
254
255                 logger.debug("Sending successful.");
256             }
257         }
258     }
259
260     public boolean isConnected() {
261         return receivingSocket != null;
262     }
263 }