]> git.basschouten.com Git - openhab-addons.git/blob
513891a32d2c8e26e4e50cf901595ee59935750e
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.revogi.internal.udp;
14
15 import static java.util.stream.Collectors.toList;
16
17 import java.io.IOException;
18 import java.net.DatagramPacket;
19 import java.net.InetAddress;
20 import java.net.SocketException;
21 import java.net.SocketTimeoutException;
22 import java.net.UnknownHostException;
23 import java.nio.charset.Charset;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.concurrent.CompletableFuture;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.TimeUnit;
31
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.openhab.core.common.ThreadPoolManager;
34 import org.openhab.core.net.NetUtil;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * The {@link UdpSenderService} is responsible for sending and receiving udp packets
40  *
41  * @author Andi Bräu - Initial contribution
42  */
43 @NonNullByDefault
44 public class UdpSenderService {
45
46     /**
47      * Limit timeout waiting time, as we have to deal with UDP
48      *
49      * How it works: for every loop, we'll wait a bit longer, so the timeout counter is multiplied with the timeout base
50      * value. Let max timeout count be 2 and timeout base value 800, then we'll have a maximum of loops of 3, waiting
51      * 800ms in the 1st loop, 1600ms in the 2nd loop and 2400ms in the third loop.
52      */
53     private static final int MAX_TIMEOUT_COUNT = 2;
54     private static final long TIMEOUT_BASE_VALUE_MS = 800L;
55     private static final int REVOGI_PORT = 8888;
56
57     private final Logger logger = LoggerFactory.getLogger(UdpSenderService.class);
58     private final DatagramSocketWrapper datagramSocketWrapper;
59     private final ScheduledExecutorService scheduler;
60     private final long timeoutBaseValue;
61
62     public UdpSenderService(DatagramSocketWrapper datagramSocketWrapper, ScheduledExecutorService scheduler) {
63         this.timeoutBaseValue = TIMEOUT_BASE_VALUE_MS;
64         this.datagramSocketWrapper = datagramSocketWrapper;
65         this.scheduler = scheduler;
66     }
67
68     public UdpSenderService(DatagramSocketWrapper datagramSocketWrapper, long timeout) {
69         this.timeoutBaseValue = timeout;
70         this.datagramSocketWrapper = datagramSocketWrapper;
71         this.scheduler = ThreadPoolManager.getScheduledPool("test pool");
72     }
73
74     public CompletableFuture<List<UdpResponseDTO>> broadcastUdpDatagram(String content) {
75         List<String> allBroadcastAddresses = NetUtil.getAllBroadcastAddresses();
76         CompletableFuture<List<UdpResponseDTO>> future = new CompletableFuture<>();
77         scheduler.submit(() -> future.complete(allBroadcastAddresses.stream().map(address -> {
78             try {
79                 return sendMessage(content, InetAddress.getByName(address));
80             } catch (UnknownHostException e) {
81                 logger.warn("Could not find host with IP {}", address);
82                 return new ArrayList<UdpResponseDTO>();
83             }
84         }).flatMap(Collection::stream).distinct().collect(toList())));
85         return future;
86     }
87
88     public CompletableFuture<List<UdpResponseDTO>> sendMessage(String content, String ipAddress) {
89         try {
90             CompletableFuture<List<UdpResponseDTO>> future = new CompletableFuture<>();
91             InetAddress inetAddress = InetAddress.getByName(ipAddress);
92             scheduler.submit(() -> future.complete(sendMessage(content, inetAddress)));
93             return future;
94         } catch (UnknownHostException e) {
95             logger.warn("Could not find host with IP {}", ipAddress);
96             return CompletableFuture.completedFuture(Collections.emptyList());
97         }
98     }
99
100     private List<UdpResponseDTO> sendMessage(String content, InetAddress inetAddress) {
101         logger.debug("Using address {}", inetAddress);
102         byte[] buf = content.getBytes(Charset.defaultCharset());
103         DatagramPacket packet = new DatagramPacket(buf, buf.length, inetAddress, REVOGI_PORT);
104         List<UdpResponseDTO> responses = Collections.emptyList();
105         try {
106             datagramSocketWrapper.initSocket();
107             datagramSocketWrapper.sendPacket(packet);
108             responses = getUdpResponses();
109         } catch (IOException e) {
110             logger.warn("Error sending message or reading anwser {}", e.getMessage());
111         } finally {
112             datagramSocketWrapper.closeSocket();
113         }
114         return responses;
115     }
116
117     private List<UdpResponseDTO> getUdpResponses() {
118         int timeoutCounter = 0;
119         List<UdpResponseDTO> list = new ArrayList<>();
120         while (timeoutCounter < MAX_TIMEOUT_COUNT && !Thread.interrupted()) {
121             byte[] receivedBuf = new byte[512];
122             DatagramPacket answer = new DatagramPacket(receivedBuf, receivedBuf.length);
123             try {
124                 datagramSocketWrapper.receiveAnswer(answer);
125             } catch (SocketTimeoutException | SocketException e) {
126                 timeoutCounter++;
127                 try {
128                     TimeUnit.MILLISECONDS.sleep(timeoutCounter * timeoutBaseValue);
129                 } catch (InterruptedException ex) {
130                     logger.debug("Interrupted sleep");
131                     Thread.currentThread().interrupt();
132                 }
133                 continue;
134             } catch (IOException e) {
135                 logger.warn("Error sending message or reading anwser {}", e.getMessage());
136             }
137
138             if (answer.getAddress() != null && answer.getLength() > 0) {
139                 list.add(new UdpResponseDTO(new String(answer.getData(), 0, answer.getLength()),
140                         answer.getAddress().getHostAddress()));
141             }
142         }
143         return list;
144     }
145 }