2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.govee.internal;
15 import java.io.IOException;
16 import java.net.DatagramPacket;
17 import java.net.DatagramSocket;
18 import java.net.InetAddress;
19 import java.net.InetSocketAddress;
20 import java.net.MulticastSocket;
21 import java.net.NetworkInterface;
22 import java.time.Instant;
23 import java.util.HashMap;
26 import org.eclipse.jdt.annotation.NonNullByDefault;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.openhab.binding.govee.internal.model.DiscoveryResponse;
29 import org.openhab.binding.govee.internal.model.GenericGoveeRequest;
30 import org.osgi.service.component.annotations.Activate;
31 import org.osgi.service.component.annotations.Component;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 import com.google.gson.Gson;
36 import com.google.gson.JsonParseException;
39 * The {@link CommunicationManager} is a thread that handles the answers of all devices.
40 * Therefore it needs to apply the information to the right thing.
42 * Discovery uses the same response code, so we must not refresh the status during discovery.
44 * @author Stefan Höhn - Initial contribution
45 * @author Danny Baumann - Thread-Safe design refactoring
48 @Component(service = CommunicationManager.class)
49 public class CommunicationManager {
50 private final Logger logger = LoggerFactory.getLogger(CommunicationManager.class);
51 private final Gson gson = new Gson();
52 // Holds a list of all thing handlers to send them thing updates via the receiver-Thread
53 private final Map<String, GoveeHandler> thingHandlers = new HashMap<>();
55 private StatusReceiver receiverThread;
57 private static final String DISCOVERY_MULTICAST_ADDRESS = "239.255.255.250";
58 private static final int DISCOVERY_PORT = 4001;
59 private static final int RESPONSE_PORT = 4002;
60 private static final int REQUEST_PORT = 4003;
62 private static final int INTERFACE_TIMEOUT_SEC = 5;
64 private static final String DISCOVER_REQUEST = "{\"msg\": {\"cmd\": \"scan\", \"data\": {\"account_topic\": \"reserve\"}}}";
66 public interface DiscoveryResultReceiver {
67 void onResultReceived(DiscoveryResponse result);
71 public CommunicationManager() {
74 public void registerHandler(GoveeHandler handler) {
75 synchronized (thingHandlers) {
76 thingHandlers.put(handler.getHostname(), handler);
77 if (receiverThread == null) {
78 receiverThread = new StatusReceiver();
79 receiverThread.start();
84 public void unregisterHandler(GoveeHandler handler) {
85 synchronized (thingHandlers) {
86 thingHandlers.remove(handler.getHostname());
87 if (thingHandlers.isEmpty()) {
88 StatusReceiver receiver = receiverThread;
89 if (receiver != null) {
90 receiver.stopReceiving();
92 receiverThread = null;
97 public void sendRequest(GoveeHandler handler, GenericGoveeRequest request) throws IOException {
98 final String hostname = handler.getHostname();
99 final DatagramSocket socket = new DatagramSocket();
100 socket.setReuseAddress(true);
101 final String message = gson.toJson(request);
102 final byte[] data = message.getBytes();
103 final InetAddress address = InetAddress.getByName(hostname);
104 DatagramPacket packet = new DatagramPacket(data, data.length, address, REQUEST_PORT);
105 logger.trace("Sending {} to {}", message, hostname);
110 public void runDiscoveryForInterface(NetworkInterface intf, DiscoveryResultReceiver receiver) throws IOException {
111 synchronized (receiver) {
112 StatusReceiver localReceiver = null;
113 StatusReceiver activeReceiver = null;
116 if (receiverThread == null) {
117 localReceiver = new StatusReceiver();
118 localReceiver.start();
119 activeReceiver = localReceiver;
121 activeReceiver = receiverThread;
124 if (activeReceiver != null) {
125 activeReceiver.setDiscoveryResultsReceiver(receiver);
128 final InetAddress broadcastAddress = InetAddress.getByName(DISCOVERY_MULTICAST_ADDRESS);
129 final InetSocketAddress socketAddress = new InetSocketAddress(broadcastAddress, RESPONSE_PORT);
130 final Instant discoveryStartTime = Instant.now();
131 final Instant discoveryEndTime = discoveryStartTime.plusSeconds(INTERFACE_TIMEOUT_SEC);
133 try (MulticastSocket sendSocket = new MulticastSocket(socketAddress)) {
134 sendSocket.setSoTimeout(INTERFACE_TIMEOUT_SEC * 1000);
135 sendSocket.setReuseAddress(true);
136 sendSocket.setBroadcast(true);
137 sendSocket.setTimeToLive(2);
138 sendSocket.joinGroup(new InetSocketAddress(broadcastAddress, RESPONSE_PORT), intf);
140 byte[] requestData = DISCOVER_REQUEST.getBytes();
142 DatagramPacket request = new DatagramPacket(requestData, requestData.length, broadcastAddress,
144 sendSocket.send(request);
149 receiver.wait(INTERFACE_TIMEOUT_SEC * 1000);
150 } catch (InterruptedException e) {
151 Thread.currentThread().interrupt();
153 } while (Instant.now().isBefore(discoveryEndTime));
155 if (activeReceiver != null) {
156 activeReceiver.setDiscoveryResultsReceiver(null);
158 if (localReceiver != null) {
159 localReceiver.stopReceiving();
165 private class StatusReceiver extends Thread {
166 private final Logger logger = LoggerFactory.getLogger(CommunicationManager.class);
167 private boolean stopped = false;
168 private @Nullable DiscoveryResultReceiver discoveryResultReceiver;
170 private @Nullable MulticastSocket socket;
173 super("GoveeStatusReceiver");
176 synchronized void setDiscoveryResultsReceiver(@Nullable DiscoveryResultReceiver receiver) {
177 discoveryResultReceiver = receiver;
180 void stopReceiving() {
183 if (socket != null) {
189 } catch (InterruptedException e) {
190 Thread.currentThread().interrupt();
198 socket = new MulticastSocket(RESPONSE_PORT);
199 byte[] buffer = new byte[10240];
200 socket.setReuseAddress(true);
202 DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
203 socket.receive(packet);
208 String response = new String(packet.getData(), packet.getOffset(), packet.getLength());
209 String deviceIPAddress = packet.getAddress().toString().replace("/", "");
210 logger.trace("Response from {} = {}", deviceIPAddress, response);
212 final DiscoveryResultReceiver discoveryReceiver;
213 synchronized (this) {
214 discoveryReceiver = discoveryResultReceiver;
216 if (discoveryReceiver != null) {
217 // We're in discovery mode: try to parse result as discovery message and signal the receiver
218 // if parsing was successful
220 DiscoveryResponse result = gson.fromJson(response, DiscoveryResponse.class);
221 if (result != null) {
222 synchronized (discoveryReceiver) {
223 discoveryReceiver.onResultReceived(result);
224 discoveryReceiver.notifyAll();
227 } catch (JsonParseException e) {
229 "JsonParseException when trying to parse the response, probably a status message",
233 final @Nullable GoveeHandler handler;
234 synchronized (thingHandlers) {
235 handler = thingHandlers.get(deviceIPAddress);
237 if (handler == null) {
238 logger.warn("thing Handler for {} couldn't be found.", deviceIPAddress);
240 logger.debug("processing status updates for thing {} ", handler.getThing().getLabel());
241 handler.handleIncomingStatus(response);
245 } catch (IOException e) {
246 logger.warn("exception when receiving status packet", e);
247 // as we haven't received a packet we also don't know where it should have come from
248 // hence, we don't know which thing put offline.
249 // a way to monitor this would be to keep track in a list, which device answers we expect
250 // and supervise an expected answer within a given time but that will make the whole
251 // mechanism much more complicated and may be added in the future
253 if (socket != null) {