2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.revogi.internal.udp;
15 import static java.util.stream.Collectors.toList;
17 import java.io.IOException;
18 import java.net.DatagramPacket;
19 import java.net.InetAddress;
20 import java.net.SocketException;
21 import java.net.SocketTimeoutException;
22 import java.net.UnknownHostException;
23 import java.nio.charset.Charset;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.concurrent.CompletableFuture;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.TimeUnit;
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.openhab.core.common.ThreadPoolManager;
34 import org.openhab.core.net.NetUtil;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
39 * The {@link UdpSenderService} is responsible for sending and receiving udp packets
41 * @author Andi Bräu - Initial contribution
44 public class UdpSenderService {
47 * Limit timeout waiting time, as we have to deal with UDP
49 * How it works: for every loop, we'll wait a bit longer, so the timeout counter is multiplied with the timeout base
50 * value. Let max timeout count be 2 and timeout base value 800, then we'll have a maximum of loops of 3, waiting
51 * 800ms in the 1st loop, 1600ms in the 2nd loop and 2400ms in the third loop.
53 private static final int MAX_TIMEOUT_COUNT = 2;
54 private static final long TIMEOUT_BASE_VALUE_MS = 800L;
55 private static final int REVOGI_PORT = 8888;
57 private final Logger logger = LoggerFactory.getLogger(UdpSenderService.class);
58 private final DatagramSocketWrapper datagramSocketWrapper;
59 private final ScheduledExecutorService scheduler;
60 private final long timeoutBaseValue;
62 public UdpSenderService(DatagramSocketWrapper datagramSocketWrapper, ScheduledExecutorService scheduler) {
63 this.timeoutBaseValue = TIMEOUT_BASE_VALUE_MS;
64 this.datagramSocketWrapper = datagramSocketWrapper;
65 this.scheduler = scheduler;
68 public UdpSenderService(DatagramSocketWrapper datagramSocketWrapper, long timeout) {
69 this.timeoutBaseValue = timeout;
70 this.datagramSocketWrapper = datagramSocketWrapper;
71 this.scheduler = ThreadPoolManager.getScheduledPool("test pool");
74 public CompletableFuture<List<UdpResponseDTO>> broadcastUdpDatagram(String content) {
75 List<String> allBroadcastAddresses = NetUtil.getAllBroadcastAddresses();
76 CompletableFuture<List<UdpResponseDTO>> future = new CompletableFuture<>();
77 scheduler.submit(() -> future.complete(allBroadcastAddresses.stream().map(address -> {
79 return sendMessage(content, InetAddress.getByName(address));
80 } catch (UnknownHostException e) {
81 logger.warn("Could not find host with IP {}", address);
82 return new ArrayList<UdpResponseDTO>();
84 }).flatMap(Collection::stream).distinct().collect(toList())));
88 public CompletableFuture<List<UdpResponseDTO>> sendMessage(String content, String ipAddress) {
90 CompletableFuture<List<UdpResponseDTO>> future = new CompletableFuture<>();
91 InetAddress inetAddress = InetAddress.getByName(ipAddress);
92 scheduler.submit(() -> future.complete(sendMessage(content, inetAddress)));
94 } catch (UnknownHostException e) {
95 logger.warn("Could not find host with IP {}", ipAddress);
96 return CompletableFuture.completedFuture(Collections.emptyList());
100 private List<UdpResponseDTO> sendMessage(String content, InetAddress inetAddress) {
101 logger.debug("Using address {}", inetAddress);
102 byte[] buf = content.getBytes(Charset.defaultCharset());
103 DatagramPacket packet = new DatagramPacket(buf, buf.length, inetAddress, REVOGI_PORT);
104 List<UdpResponseDTO> responses = Collections.emptyList();
106 datagramSocketWrapper.initSocket();
107 datagramSocketWrapper.sendPacket(packet);
108 responses = getUdpResponses();
109 } catch (IOException e) {
110 logger.warn("Error sending message or reading anwser {}", e.getMessage());
112 datagramSocketWrapper.closeSocket();
117 private List<UdpResponseDTO> getUdpResponses() {
118 int timeoutCounter = 0;
119 List<UdpResponseDTO> list = new ArrayList<>();
120 while (timeoutCounter < MAX_TIMEOUT_COUNT && !Thread.interrupted()) {
121 byte[] receivedBuf = new byte[512];
122 DatagramPacket answer = new DatagramPacket(receivedBuf, receivedBuf.length);
124 datagramSocketWrapper.receiveAnswer(answer);
125 } catch (SocketTimeoutException | SocketException e) {
128 TimeUnit.MILLISECONDS.sleep(timeoutCounter * timeoutBaseValue);
129 } catch (InterruptedException ex) {
130 logger.debug("Interrupted sleep");
131 Thread.currentThread().interrupt();
134 } catch (IOException e) {
135 logger.warn("Error sending message or reading anwser {}", e.getMessage());
138 if (answer.getAddress() != null && answer.getLength() > 0) {
139 list.add(new UdpResponseDTO(new String(answer.getData(), 0, answer.getLength()),
140 answer.getAddress().getHostAddress()));