2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.emotiva.internal;
15 import java.net.DatagramPacket;
16 import java.net.DatagramSocket;
17 import java.net.SocketException;
18 import java.util.Arrays;
19 import java.util.Objects;
20 import java.util.concurrent.ExecutorService;
21 import java.util.function.Consumer;
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.binding.emotiva.internal.protocol.EmotivaUdpResponse;
26 import org.openhab.core.common.NamedThreadFactory;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
31 * This service is used for receiving UDP message from Emotiva devices.
33 * @author Patrick Koenemann - Initial contribution
34 * @author Espen Fossen - Adapted to Emotiva binding
37 public class EmotivaUdpReceivingService {
39 private final Logger logger = LoggerFactory.getLogger(EmotivaUdpReceivingService.class);
42 * Buffer for incoming UDP packages.
44 private static final int MAX_PACKET_SIZE = 10240;
47 * The device IP this connector is listening to / sends to.
49 private final String ipAddress;
52 * The port this connector is listening to notify message.
54 private final int receivingPort;
57 * Service to spawn new threads for handling status updates.
59 private final ExecutorService executorService;
62 * Thread factory for UDP listening thread.
64 private final NamedThreadFactory listeningThreadFactory = new NamedThreadFactory(EmotivaBindingConstants.BINDING_ID,
68 * Socket for receiving Notify UDP packages.
70 private @Nullable DatagramSocket receivingSocket = null;
73 * The listener that gets notified upon newly received messages.
75 private @Nullable Consumer<EmotivaUdpResponse> listener;
77 private int receiveNotifyFailures = 0;
78 private boolean listenerNotifyActive = false;
81 * Create a listener to an Emotiva device via the given configuration.
83 * @param receivingPort listening port
84 * @param config Emotiva configuration values
86 public EmotivaUdpReceivingService(int receivingPort, EmotivaConfiguration config, ExecutorService executorService) {
87 if (receivingPort <= 0) {
88 throw new IllegalArgumentException("Invalid receivingPort: " + receivingPort);
90 if (config.ipAddress.trim().isEmpty()) {
91 throw new IllegalArgumentException("Missing ipAddress");
93 this.ipAddress = config.ipAddress;
94 this.receivingPort = receivingPort;
95 this.executorService = executorService;
99 * Initialize socket connection to the UDP receive port for the given listener.
101 * @throws SocketException Is only thrown if <code>logNotThrowException = false</code>.
102 * @throws InterruptedException Typically happens during shutdown.
104 public void connect(Consumer<EmotivaUdpResponse> listener, boolean logNotThrowException)
105 throws SocketException, InterruptedException {
106 if (receivingSocket == null) {
108 receivingSocket = new DatagramSocket(receivingPort);
110 this.listener = listener;
112 listeningThreadFactory.newThread(this::listen).start();
114 // wait for the listening thread to be active
115 for (int i = 0; i < 20 && !listenerNotifyActive; i++) {
116 Thread.sleep(100); // wait at most 20 * 100ms = 2sec for the listener to be active
118 if (!listenerNotifyActive) {
120 "Listener thread started but listener is not yet active after 2sec; something seems to be wrong with the JVM thread handling?!");
122 } catch (SocketException e) {
123 if (logNotThrowException) {
124 logger.warn("Failed to open socket connection on port '{}'", receivingPort);
129 if (!logNotThrowException) {
133 } else if (!Objects.equals(this.listener, listener)) {
134 throw new IllegalStateException("A listening thread is already running");
138 private void listen() {
140 listenUnhandledInterruption();
141 } catch (InterruptedException e) {
142 // OH shutdown - don't log anything, just quit
146 private void listenUnhandledInterruption() throws InterruptedException {
147 logger.debug("Emotiva listener started for: '{}:{}'", ipAddress, receivingPort);
149 final Consumer<EmotivaUdpResponse> localListener = listener;
150 final DatagramSocket localReceivingSocket = receivingSocket;
151 while (localListener != null && localReceivingSocket != null && receivingSocket != null) {
153 final DatagramPacket answer = new DatagramPacket(new byte[MAX_PACKET_SIZE], MAX_PACKET_SIZE);
155 listenerNotifyActive = true;
156 localReceivingSocket.receive(answer); // receive packet (blocking call)
157 listenerNotifyActive = false;
159 final byte[] receivedData = Arrays.copyOfRange(answer.getData(), 0, answer.getLength() - 1);
161 if (receivedData.length == 0) {
163 logger.debug("Nothing received, this may happen during shutdown or some unknown error");
167 receiveNotifyFailures = 0; // message successfully received, unset failure counter
169 handleReceivedData(answer, receivedData, localListener);
170 } catch (Exception e) {
171 listenerNotifyActive = false;
173 if (receivingSocket == null) {
174 logger.debug("Socket closed; stopping listener on port '{}'", receivingPort);
176 logger.debug("Checkin receiveFailures count {}", receiveNotifyFailures);
177 // if we get 3 errors in a row, we should better add a delay to stop spamming the log!
178 if (receiveNotifyFailures++ > EmotivaBindingConstants.DEFAULT_CONNECTION_RETRIES) {
180 "Unexpected error while listening on port '{}'; waiting 10sec before the next attempt to listen on that port",
182 for (int i = 0; i < 50 && receivingSocket != null; i++) {
183 Thread.sleep(200); // 50 * 200ms = 10sec
186 logger.debug("Unexpected error while listening on port '{}'", receivingPort, e);
193 private void handleReceivedData(DatagramPacket answer, byte[] receivedData,
194 Consumer<EmotivaUdpResponse> localListener) {
195 // log & notify listener in new thread (so that listener loop continues immediately)
196 executorService.execute(() -> {
197 if (answer.getAddress() != null && answer.getLength() > 0) {
198 logger.trace("Received data on port '{}': {}", answer.getPort(), receivedData);
199 EmotivaUdpResponse emotivaUdpResponse = new EmotivaUdpResponse(
200 new String(answer.getData(), 0, answer.getLength()), answer.getAddress().getHostAddress());
201 localListener.accept(emotivaUdpResponse);
207 * Close the socket connection.
209 public void disconnect() {
210 logger.debug("Emotiva listener stopped for: '{}:{}'", ipAddress, receivingPort);
212 final DatagramSocket localReceivingSocket = receivingSocket;
213 if (localReceivingSocket != null) {
214 receivingSocket = null;
215 if (!localReceivingSocket.isClosed()) {
216 localReceivingSocket.close(); // this interrupts and terminates the listening thread
221 public boolean isConnected() {
222 return receivingSocket != null;