]> git.basschouten.com Git - openhab-addons.git/commitdiff
Make sure that energy meter packets are not queued up. (#16841)
authorŁukasz Dywicki <luke@code-house.org>
Tue, 25 Jun 2024 19:27:43 +0000 (21:27 +0200)
committerGitHub <noreply@github.com>
Tue, 25 Jun 2024 19:27:43 +0000 (21:27 +0200)
Hide `open()` socket call beneath PacketListener, so caller do not need to care about that.
Signed-off-by: Łukasz Dywicki <luke@code-house.org>
bundles/org.openhab.binding.smaenergymeter/README.md
bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/discovery/SMAEnergyMeterDiscoveryService.java
bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/handler/SMAEnergyMeterHandler.java
bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/DefaultPacketListenerRegistry.java
bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/FilteringPayloadHandler.java [new file with mode: 0644]
bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/PacketListener.java
bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/ThrottlingPayloadHandler.java [new file with mode: 0644]

index 8f6be8db0d0809e4f393bb13de5907ed252fab3e..eb5ef3088025f1e5f47a70152ddde68fd4f73dc8 100644 (file)
@@ -20,12 +20,12 @@ No binding configuration required.
 Usually no manual configuration is required, as the multicast IP address and the port remain on their factory set values.
 Optionally, a refresh interval (in seconds) can be defined.
 
-| Parameter        | Name            | Description                           | Required | Default         |
-|------------------|-----------------|---------------------------------------|----------|-----------------|
-| `serialNumber`   | Serial number   | Serial number of a meter.             | yes      |                 |
-| `mcastGroup`     | Multicast Group | Multicast group used by meter.        | yes      | 239.12.255.254  |
-| `port`           | Port            | Port number used by meter.            | no       | 9522            |
-| `pollingPeriod`  | Polling Period  | Polling period used to readout meter. | no       | 30              |
+| Parameter        | Name            | Description                                                | Required | Default         |
+|------------------|-----------------|------------------------------------------------------------|----------|-----------------|
+| `serialNumber`   | Serial number   | Serial number of a meter.                                  | yes      |                 |
+| `mcastGroup`     | Multicast Group | Multicast group used by meter.                             | yes      | 239.12.255.254  |
+| `port`           | Port            | Port number used by meter.                                 | no       | 9522            |
+| `pollingPeriod`  | Polling Period  | Polling period used to publish meter reading (in seconds). | no       | 30              |
 
 The polling period parameter is used to trigger readout of meter. In case if two consecutive readout attempts fail thing will report offline status.
 
index e959f0466461b3eace697d102d70f5c81936dc80..295f39266657675ba4b6eacd113c74a751fa373f 100644 (file)
@@ -74,7 +74,6 @@ public class SMAEnergyMeterDiscoveryService extends AbstractDiscoveryService imp
         try {
             packetListener = listenerRegistry.getListener(PacketListener.DEFAULT_MCAST_GRP,
                     PacketListener.DEFAULT_MCAST_PORT);
-            packetListener.open(30);
         } catch (IOException e) {
             logger.warn("Could not start background discovery", e);
             return;
index 6308a4bf16a95be092caa917206236ee1911bf78..be470a88191966e76ad0efb816f4abd5f1ca6f07 100644 (file)
@@ -15,12 +15,15 @@ package org.openhab.binding.smaenergymeter.internal.handler;
 import static org.openhab.binding.smaenergymeter.internal.SMAEnergyMeterBindingConstants.*;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
 import org.eclipse.jdt.annotation.Nullable;
 import org.openhab.binding.smaenergymeter.internal.configuration.EnergyMeterConfig;
+import org.openhab.binding.smaenergymeter.internal.packet.FilteringPayloadHandler;
 import org.openhab.binding.smaenergymeter.internal.packet.PacketListener;
 import org.openhab.binding.smaenergymeter.internal.packet.PacketListenerRegistry;
 import org.openhab.binding.smaenergymeter.internal.packet.PayloadHandler;
+import org.openhab.binding.smaenergymeter.internal.packet.ThrottlingPayloadHandler;
 import org.openhab.core.thing.ChannelUID;
 import org.openhab.core.thing.Thing;
 import org.openhab.core.thing.ThingStatus;
@@ -42,7 +45,8 @@ public class SMAEnergyMeterHandler extends BaseThingHandler implements PayloadHa
     private final Logger logger = LoggerFactory.getLogger(SMAEnergyMeterHandler.class);
     private final PacketListenerRegistry listenerRegistry;
     private @Nullable PacketListener listener;
-    private @Nullable String serialNumber;
+    private @Nullable PayloadHandler handler;
+    private String serialNumber;
 
     public SMAEnergyMeterHandler(Thing thing, PacketListenerRegistry listenerRegistry) {
         super(thing);
@@ -84,9 +88,13 @@ public class SMAEnergyMeterHandler extends BaseThingHandler implements PayloadHa
             updateStatus(ThingStatus.UNKNOWN);
             logger.debug("Activated handler for SMA Energy Meter with S/N '{}'", serialNumber);
 
-            listener.addPayloadHandler(this);
-
-            listener.open(config.getPollingPeriod());
+            if (config.getPollingPeriod() <= 1) {
+                listener.addPayloadHandler(handler = new FilteringPayloadHandler(this, serialNumber));
+            } else {
+                listener.addPayloadHandler(handler = new FilteringPayloadHandler(
+                        new ThrottlingPayloadHandler(this, TimeUnit.SECONDS.toMillis(config.getPollingPeriod())),
+                        serialNumber));
+            }
             this.listener = listener;
             logger.debug("Polling job scheduled to run every {} sec. for '{}'", config.getPollingPeriod(),
                     getThing().getUID());
@@ -100,18 +108,15 @@ public class SMAEnergyMeterHandler extends BaseThingHandler implements PayloadHa
     public void dispose() {
         logger.debug("Disposing SMAEnergyMeter handler '{}'", getThing().getUID());
         PacketListener listener = this.listener;
-        if (listener != null) {
-            listener.removePayloadHandler(this);
+        PayloadHandler handler = this.handler;
+        if (listener != null && handler != null) {
+            listener.removePayloadHandler(handler);
             this.listener = null;
         }
     }
 
     @Override
     public void handle(EnergyMeter energyMeter) {
-        String serialNumber = this.serialNumber;
-        if (serialNumber == null || !serialNumber.equals(energyMeter.getSerialNumber())) {
-            return;
-        }
         updateStatus(ThingStatus.ONLINE);
 
         logger.debug("Update SMAEnergyMeter {} data '{}'", serialNumber, getThing().getUID());
index a479d6acae86e67c7aaf54ec36c22cf26201f565..9acd53572a9ef974b37ddcd6a04e4bdf18bc1d01 100644 (file)
@@ -16,7 +16,6 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -24,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.openhab.binding.smaenergymeter.internal.SMAEnergyMeterBindingConstants;
 import org.openhab.binding.smaenergymeter.internal.packet.PacketListener.ReceivingTask;
+import org.openhab.core.common.ThreadPoolManager;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
 import org.slf4j.Logger;
@@ -40,9 +40,8 @@ import org.slf4j.LoggerFactory;
 public class DefaultPacketListenerRegistry implements PacketListenerRegistry {
 
     private final Logger logger = LoggerFactory.getLogger(DefaultPacketListenerRegistry.class);
-    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,
-            (runnable) -> new Thread(runnable,
-                    "OH-binding-" + SMAEnergyMeterBindingConstants.BINDING_ID + "-listener"));
+    private final ScheduledExecutorService scheduler = ThreadPoolManager
+            .getScheduledPool("OH-binding-" + SMAEnergyMeterBindingConstants.BINDING_ID + "-listener");
     private final Map<String, PacketListener> listeners = new ConcurrentHashMap<>();
 
     @Override
@@ -68,8 +67,8 @@ public class DefaultPacketListenerRegistry implements PacketListenerRegistry {
         scheduler.shutdownNow();
     }
 
-    public ScheduledFuture<?> addTask(Runnable runnable, int intervalSec) {
-        return scheduler.scheduleWithFixedDelay(runnable, 0, intervalSec, TimeUnit.SECONDS);
+    public ScheduledFuture<?> addTask(ReceivingTask runnable) {
+        return scheduler.scheduleWithFixedDelay(runnable, 0, 1000, TimeUnit.MILLISECONDS);
     }
 
     public void execute(ReceivingTask receivingTask) {
diff --git a/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/FilteringPayloadHandler.java b/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/FilteringPayloadHandler.java
new file mode 100644 (file)
index 0000000..576af21
--- /dev/null
@@ -0,0 +1,42 @@
+/**
+ * Copyright (c) 2010-2024 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.binding.smaenergymeter.internal.packet;
+
+import java.io.IOException;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.openhab.binding.smaenergymeter.internal.handler.EnergyMeter;
+
+/**
+ * Payload handler which define acceptance criteria for received meter data.
+ *
+ * @author Łukasz Dywicki - Initial contribution
+ */
+@NonNullByDefault
+public class FilteringPayloadHandler implements PayloadHandler {
+
+    private final PayloadHandler delegate;
+    private final String serialNumber;
+
+    public FilteringPayloadHandler(PayloadHandler delegate, String serialNumber) {
+        this.delegate = delegate;
+        this.serialNumber = serialNumber;
+    }
+
+    @Override
+    public void handle(EnergyMeter energyMeter) throws IOException {
+        if (this.serialNumber.equals(energyMeter.getSerialNumber())) {
+            delegate.handle(energyMeter);
+        }
+    }
+}
index 976c617712c004083b06e09dfd11a08628fb21e2..49a4a911dc2be1aff056362d2e20f21a0068871d 100644 (file)
@@ -56,6 +56,9 @@ public class PacketListener {
     }
 
     public void addPayloadHandler(PayloadHandler handler) {
+        if (handlers.isEmpty()) {
+            open();
+        }
         handlers.add(handler);
     }
 
@@ -72,18 +75,22 @@ public class PacketListener {
         return socket != null && socket.isConnected();
     }
 
-    public void open(int intervalSec) throws IOException {
+    private void open() {
         if (isOpen()) {
             // no need to bind socket second time
             return;
         }
-        MulticastSocket socket = new MulticastSocket(port);
-        socket.setSoTimeout(5000);
-        InetAddress address = InetAddress.getByName(multicastGroup);
-        socket.joinGroup(address);
+        try {
+            MulticastSocket socket = new MulticastSocket(port);
+            socket.setSoTimeout(5000);
+            InetAddress address = InetAddress.getByName(multicastGroup);
+            socket.joinGroup(address);
 
-        future = registry.addTask(new ReceivingTask(socket, multicastGroup + ":" + port, handlers), intervalSec);
-        this.socket = socket;
+            future = registry.addTask(new ReceivingTask(socket, multicastGroup + ":" + port, handlers));
+            this.socket = socket;
+        } catch (IOException e) {
+            throw new RuntimeException("Could not open socket", e);
+        }
     }
 
     void close() throws IOException {
@@ -122,24 +129,25 @@ public class PacketListener {
         }
 
         public void run() {
-            try {
-                byte[] bytes = new byte[608];
-                DatagramPacket msgPacket = new DatagramPacket(bytes, bytes.length);
-                DatagramSocket socket = this.socket;
-                socket.receive(msgPacket);
+            byte[] bytes = new byte[608];
+            DatagramPacket msgPacket = new DatagramPacket(bytes, bytes.length);
+            DatagramSocket socket = this.socket;
 
-                try {
+            try {
+                do {
+                    // this loop is intended to receive all packets queued on the socket,
+                    // having a receive() call without loop causes packets to get queued over time,
+                    // if more than one meter present because we consume one packet per second
+                    socket.receive(msgPacket);
                     EnergyMeter meter = new EnergyMeter();
                     meter.parse(bytes);
 
                     for (PayloadHandler handler : handlers) {
                         handler.handle(meter);
                     }
-                } catch (IOException e) {
-                    logger.debug("Unexpected payload received for group {}", group, e);
-                }
+                } while (msgPacket.getLength() == 608);
             } catch (IOException e) {
-                logger.warn("Failed to receive data for multicast group {}", group, e);
+                logger.debug("Unexpected payload received for group {}", group, e);
             }
         }
     }
diff --git a/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/ThrottlingPayloadHandler.java b/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/ThrottlingPayloadHandler.java
new file mode 100644 (file)
index 0000000..bdd173a
--- /dev/null
@@ -0,0 +1,45 @@
+/**
+ * Copyright (c) 2010-2024 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.binding.smaenergymeter.internal.packet;
+
+import java.io.IOException;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.openhab.binding.smaenergymeter.internal.handler.EnergyMeter;
+
+/**
+ * Payload handler which defer publishing of meter data by given amount of time.
+ *
+ * @author Łukasz Dywicki - Initial contribution
+ */
+@NonNullByDefault
+public class ThrottlingPayloadHandler implements PayloadHandler {
+
+    private final PayloadHandler delegate;
+    private final long pollingPeriodMs;
+    private long publishTime = 0;
+
+    public ThrottlingPayloadHandler(PayloadHandler delegate, long pollingPeriodMs) {
+        this.delegate = delegate;
+        this.pollingPeriodMs = pollingPeriodMs;
+    }
+
+    @Override
+    public void handle(EnergyMeter energyMeter) throws IOException {
+        long ts = System.currentTimeMillis();
+        if (publishTime <= ts) {
+            delegate.handle(energyMeter);
+            publishTime = ts + pollingPeriodMs;
+        }
+    }
+}