]> git.basschouten.com Git - openhab-addons.git/blob
954af61a2f38e81733950aef29bf81589b39dcee
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2024 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.emotiva.internal;
14
15 import java.net.DatagramPacket;
16 import java.net.DatagramSocket;
17 import java.net.SocketException;
18 import java.util.Arrays;
19 import java.util.Objects;
20 import java.util.concurrent.ExecutorService;
21 import java.util.function.Consumer;
22
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.binding.emotiva.internal.protocol.EmotivaUdpResponse;
26 import org.openhab.core.common.NamedThreadFactory;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /**
31  * This service is used for receiving UDP message from Emotiva devices.
32  *
33  * @author Patrick Koenemann - Initial contribution
34  * @author Espen Fossen - Adapted to Emotiva binding
35  */
36 @NonNullByDefault
37 public class EmotivaUdpReceivingService {
38
39     private final Logger logger = LoggerFactory.getLogger(EmotivaUdpReceivingService.class);
40
41     /**
42      * Buffer for incoming UDP packages.
43      */
44     private static final int MAX_PACKET_SIZE = 10240;
45
46     /**
47      * The device IP this connector is listening to / sends to.
48      */
49     private final String ipAddress;
50
51     /**
52      * The port this connector is listening to notify message.
53      */
54     private final int receivingPort;
55
56     /**
57      * Service to spawn new threads for handling status updates.
58      */
59     private final ExecutorService executorService;
60
61     /**
62      * Thread factory for UDP listening thread.
63      */
64     private final NamedThreadFactory listeningThreadFactory = new NamedThreadFactory(EmotivaBindingConstants.BINDING_ID,
65             true);
66
67     /**
68      * Socket for receiving Notify UDP packages.
69      */
70     private @Nullable DatagramSocket receivingSocket = null;
71
72     /**
73      * The listener that gets notified upon newly received messages.
74      */
75     private @Nullable Consumer<EmotivaUdpResponse> listener;
76
77     private int receiveNotifyFailures = 0;
78     private boolean listenerNotifyActive = false;
79
80     /**
81      * Create a listener to an Emotiva device via the given configuration.
82      *
83      * @param receivingPort listening port
84      * @param config Emotiva configuration values
85      */
86     public EmotivaUdpReceivingService(int receivingPort, EmotivaConfiguration config, ExecutorService executorService) {
87         if (receivingPort <= 0) {
88             throw new IllegalArgumentException("Invalid receivingPort: " + receivingPort);
89         }
90         if (config.ipAddress.trim().isEmpty()) {
91             throw new IllegalArgumentException("Missing ipAddress");
92         }
93         this.ipAddress = config.ipAddress;
94         this.receivingPort = receivingPort;
95         this.executorService = executorService;
96     }
97
98     /**
99      * Initialize socket connection to the UDP receive port for the given listener.
100      *
101      * @throws SocketException Is only thrown if <code>logNotThrowException = false</code>.
102      * @throws InterruptedException Typically happens during shutdown.
103      */
104     public void connect(Consumer<EmotivaUdpResponse> listener, boolean logNotThrowException)
105             throws SocketException, InterruptedException {
106         if (receivingSocket == null) {
107             try {
108                 receivingSocket = new DatagramSocket(receivingPort);
109
110                 this.listener = listener;
111
112                 listeningThreadFactory.newThread(this::listen).start();
113
114                 // wait for the listening thread to be active
115                 for (int i = 0; i < 20 && !listenerNotifyActive; i++) {
116                     Thread.sleep(100); // wait at most 20 * 100ms = 2sec for the listener to be active
117                 }
118                 if (!listenerNotifyActive) {
119                     logger.warn(
120                             "Listener thread started but listener is not yet active after 2sec; something seems to be wrong with the JVM thread handling?!");
121                 }
122             } catch (SocketException e) {
123                 if (logNotThrowException) {
124                     logger.warn("Failed to open socket connection on port '{}'", receivingPort);
125                 }
126
127                 disconnect();
128
129                 if (!logNotThrowException) {
130                     throw e;
131                 }
132             }
133         } else if (!Objects.equals(this.listener, listener)) {
134             throw new IllegalStateException("A listening thread is already running");
135         }
136     }
137
138     private void listen() {
139         try {
140             listenUnhandledInterruption();
141         } catch (InterruptedException e) {
142             // OH shutdown - don't log anything, just quit
143         }
144     }
145
146     private void listenUnhandledInterruption() throws InterruptedException {
147         logger.debug("Emotiva listener started for: '{}:{}'", ipAddress, receivingPort);
148
149         final Consumer<EmotivaUdpResponse> localListener = listener;
150         final DatagramSocket localReceivingSocket = receivingSocket;
151         while (localListener != null && localReceivingSocket != null && receivingSocket != null) {
152             try {
153                 final DatagramPacket answer = new DatagramPacket(new byte[MAX_PACKET_SIZE], MAX_PACKET_SIZE);
154
155                 listenerNotifyActive = true;
156                 localReceivingSocket.receive(answer); // receive packet (blocking call)
157                 listenerNotifyActive = false;
158
159                 final byte[] receivedData = Arrays.copyOfRange(answer.getData(), 0, answer.getLength() - 1);
160
161                 if (receivedData.length == 0) {
162                     if (isConnected()) {
163                         logger.debug("Nothing received, this may happen during shutdown or some unknown error");
164                     }
165                     continue;
166                 }
167                 receiveNotifyFailures = 0; // message successfully received, unset failure counter
168
169                 handleReceivedData(answer, receivedData, localListener);
170             } catch (Exception e) {
171                 listenerNotifyActive = false;
172
173                 if (receivingSocket == null) {
174                     logger.debug("Socket closed; stopping listener on port '{}'", receivingPort);
175                 } else {
176                     logger.debug("Checkin receiveFailures count {}", receiveNotifyFailures);
177                     // if we get 3 errors in a row, we should better add a delay to stop spamming the log!
178                     if (receiveNotifyFailures++ > EmotivaBindingConstants.DEFAULT_CONNECTION_RETRIES) {
179                         logger.debug(
180                                 "Unexpected error while listening on port '{}'; waiting 10sec before the next attempt to listen on that port",
181                                 receivingPort, e);
182                         for (int i = 0; i < 50 && receivingSocket != null; i++) {
183                             Thread.sleep(200); // 50 * 200ms = 10sec
184                         }
185                     } else {
186                         logger.debug("Unexpected error while listening on port '{}'", receivingPort, e);
187                     }
188                 }
189             }
190         }
191     }
192
193     private void handleReceivedData(DatagramPacket answer, byte[] receivedData,
194             Consumer<EmotivaUdpResponse> localListener) {
195         // log & notify listener in new thread (so that listener loop continues immediately)
196         executorService.execute(() -> {
197             if (answer.getAddress() != null && answer.getLength() > 0) {
198                 logger.trace("Received data on port '{}': {}", answer.getPort(), receivedData);
199                 EmotivaUdpResponse emotivaUdpResponse = new EmotivaUdpResponse(
200                         new String(answer.getData(), 0, answer.getLength()), answer.getAddress().getHostAddress());
201                 localListener.accept(emotivaUdpResponse);
202             }
203         });
204     }
205
206     /**
207      * Close the socket connection.
208      */
209     public void disconnect() {
210         logger.debug("Emotiva listener stopped for: '{}:{}'", ipAddress, receivingPort);
211         listener = null;
212         final DatagramSocket localReceivingSocket = receivingSocket;
213         if (localReceivingSocket != null) {
214             receivingSocket = null;
215             if (!localReceivingSocket.isClosed()) {
216                 localReceivingSocket.close(); // this interrupts and terminates the listening thread
217             }
218         }
219     }
220
221     public boolean isConnected() {
222         return receivingSocket != null;
223     }
224 }