2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.govee.internal;
15 import java.io.IOException;
16 import java.net.DatagramPacket;
17 import java.net.DatagramSocket;
18 import java.net.InetAddress;
19 import java.net.InetSocketAddress;
20 import java.net.MulticastSocket;
21 import java.net.NetworkInterface;
22 import java.time.Instant;
23 import java.util.HashMap;
26 import org.eclipse.jdt.annotation.NonNullByDefault;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.openhab.binding.govee.internal.model.DiscoveryResponse;
29 import org.openhab.binding.govee.internal.model.GenericGoveeRequest;
30 import org.osgi.service.component.annotations.Activate;
31 import org.osgi.service.component.annotations.Component;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 import com.google.gson.Gson;
36 import com.google.gson.JsonParseException;
39 * The {@link CommunicationManager} is a thread that handles the answers of all devices.
40 * Therefore it needs to apply the information to the right thing.
42 * Discovery uses the same response code, so we must not refresh the status during discovery.
44 * @author Stefan Höhn - Initial contribution
45 * @author Danny Baumann - Thread-Safe design refactoring
48 @Component(service = CommunicationManager.class)
49 public class CommunicationManager {
50 private final Gson gson = new Gson();
51 // Holds a list of all thing handlers to send them thing updates via the receiver-Thread
52 private final Map<String, GoveeHandler> thingHandlers = new HashMap<>();
54 private StatusReceiver receiverThread;
56 private static final String DISCOVERY_MULTICAST_ADDRESS = "239.255.255.250";
57 private static final int DISCOVERY_PORT = 4001;
58 private static final int RESPONSE_PORT = 4002;
59 private static final int REQUEST_PORT = 4003;
61 private static final int INTERFACE_TIMEOUT_SEC = 5;
63 private static final String DISCOVER_REQUEST = "{\"msg\": {\"cmd\": \"scan\", \"data\": {\"account_topic\": \"reserve\"}}}";
65 public interface DiscoveryResultReceiver {
66 void onResultReceived(DiscoveryResponse result);
70 public CommunicationManager() {
73 public void registerHandler(GoveeHandler handler) {
74 synchronized (thingHandlers) {
75 thingHandlers.put(handler.getHostname(), handler);
76 if (receiverThread == null) {
77 receiverThread = new StatusReceiver();
78 receiverThread.start();
83 public void unregisterHandler(GoveeHandler handler) {
84 synchronized (thingHandlers) {
85 thingHandlers.remove(handler.getHostname());
86 if (thingHandlers.isEmpty()) {
87 StatusReceiver receiver = receiverThread;
88 if (receiver != null) {
89 receiver.stopReceiving();
91 receiverThread = null;
96 public void sendRequest(GoveeHandler handler, GenericGoveeRequest request) throws IOException {
97 final String hostname = handler.getHostname();
98 final DatagramSocket socket = new DatagramSocket();
99 socket.setReuseAddress(true);
100 final String message = gson.toJson(request);
101 final byte[] data = message.getBytes();
102 final InetAddress address = InetAddress.getByName(hostname);
103 DatagramPacket packet = new DatagramPacket(data, data.length, address, REQUEST_PORT);
104 // logger.debug("Sending {} to {}", message, hostname);
109 public void runDiscoveryForInterface(NetworkInterface intf, DiscoveryResultReceiver receiver) throws IOException {
110 synchronized (receiver) {
111 StatusReceiver localReceiver = null;
112 StatusReceiver activeReceiver = null;
115 if (receiverThread == null) {
116 localReceiver = new StatusReceiver();
117 localReceiver.start();
118 activeReceiver = localReceiver;
120 activeReceiver = receiverThread;
123 if (activeReceiver != null) {
124 activeReceiver.setDiscoveryResultsReceiver(receiver);
127 final InetAddress broadcastAddress = InetAddress.getByName(DISCOVERY_MULTICAST_ADDRESS);
128 final InetSocketAddress socketAddress = new InetSocketAddress(broadcastAddress, RESPONSE_PORT);
129 final Instant discoveryStartTime = Instant.now();
130 final Instant discoveryEndTime = discoveryStartTime.plusSeconds(INTERFACE_TIMEOUT_SEC);
132 try (MulticastSocket sendSocket = new MulticastSocket(socketAddress)) {
133 sendSocket.setSoTimeout(INTERFACE_TIMEOUT_SEC * 1000);
134 sendSocket.setReuseAddress(true);
135 sendSocket.setBroadcast(true);
136 sendSocket.setTimeToLive(2);
137 sendSocket.joinGroup(new InetSocketAddress(broadcastAddress, RESPONSE_PORT), intf);
139 byte[] requestData = DISCOVER_REQUEST.getBytes();
141 DatagramPacket request = new DatagramPacket(requestData, requestData.length, broadcastAddress,
143 sendSocket.send(request);
148 receiver.wait(INTERFACE_TIMEOUT_SEC * 1000);
149 } catch (InterruptedException e) {
150 Thread.currentThread().interrupt();
152 } while (Instant.now().isBefore(discoveryEndTime));
154 if (activeReceiver != null) {
155 activeReceiver.setDiscoveryResultsReceiver(null);
157 if (localReceiver != null) {
158 localReceiver.stopReceiving();
164 private class StatusReceiver extends Thread {
165 private final Logger logger = LoggerFactory.getLogger(CommunicationManager.class);
166 private boolean stopped = false;
167 private @Nullable DiscoveryResultReceiver discoveryResultReceiver;
169 private @Nullable MulticastSocket socket;
172 super("GoveeStatusReceiver");
175 synchronized void setDiscoveryResultsReceiver(@Nullable DiscoveryResultReceiver receiver) {
176 discoveryResultReceiver = receiver;
179 void stopReceiving() {
182 if (socket != null) {
188 } catch (InterruptedException e) {
189 Thread.currentThread().interrupt();
197 socket = new MulticastSocket(RESPONSE_PORT);
198 byte[] buffer = new byte[10240];
199 socket.setReuseAddress(true);
201 DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
202 socket.receive(packet);
207 String response = new String(packet.getData(), packet.getOffset(), packet.getLength());
208 String deviceIPAddress = packet.getAddress().toString().replace("/", "");
209 logger.trace("Response from {} = {}", deviceIPAddress, response);
211 final DiscoveryResultReceiver discoveryReceiver;
212 synchronized (this) {
213 discoveryReceiver = discoveryResultReceiver;
215 if (discoveryReceiver != null) {
216 // We're in discovery mode: try to parse result as discovery message and signal the receiver
217 // if parsing was successful
219 DiscoveryResponse result = gson.fromJson(response, DiscoveryResponse.class);
220 if (result != null) {
221 synchronized (discoveryReceiver) {
222 discoveryReceiver.onResultReceived(result);
223 discoveryReceiver.notifyAll();
226 } catch (JsonParseException e) {
227 // this probably was a status message
230 final @Nullable GoveeHandler handler;
231 synchronized (thingHandlers) {
232 handler = thingHandlers.get(deviceIPAddress);
234 if (handler == null) {
235 logger.warn("thing Handler for {} couldn't be found.", deviceIPAddress);
237 logger.debug("processing status updates for thing {} ", handler.getThing().getLabel());
238 handler.handleIncomingStatus(response);
242 } catch (IOException e) {
243 logger.warn("exception when receiving status packet", e);
244 // as we haven't received a packet we also don't know where it should have come from
245 // hence, we don't know which thing put offline.
246 // a way to monitor this would be to keep track in a list, which device answers we expect
247 // and supervise an expected answer within a given time but that will make the whole
248 // mechanism much more complicated and may be added in the future
250 if (socket != null) {