Hide `open()` socket call beneath PacketListener, so caller do not need to care about that.
Signed-off-by: Łukasz Dywicki <luke@code-house.org>
Usually no manual configuration is required, as the multicast IP address and the port remain on their factory set values.
Optionally, a refresh interval (in seconds) can be defined.
-| Parameter | Name | Description | Required | Default |
-|------------------|-----------------|---------------------------------------|----------|-----------------|
-| `serialNumber` | Serial number | Serial number of a meter. | yes | |
-| `mcastGroup` | Multicast Group | Multicast group used by meter. | yes | 239.12.255.254 |
-| `port` | Port | Port number used by meter. | no | 9522 |
-| `pollingPeriod` | Polling Period | Polling period used to readout meter. | no | 30 |
+| Parameter | Name | Description | Required | Default |
+|------------------|-----------------|------------------------------------------------------------|----------|-----------------|
+| `serialNumber` | Serial number | Serial number of a meter. | yes | |
+| `mcastGroup` | Multicast Group | Multicast group used by meter. | yes | 239.12.255.254 |
+| `port` | Port | Port number used by meter. | no | 9522 |
+| `pollingPeriod` | Polling Period | Polling period used to publish meter reading (in seconds). | no | 30 |
The polling period parameter is used to trigger readout of meter. In case if two consecutive readout attempts fail thing will report offline status.
try {
packetListener = listenerRegistry.getListener(PacketListener.DEFAULT_MCAST_GRP,
PacketListener.DEFAULT_MCAST_PORT);
- packetListener.open(30);
} catch (IOException e) {
logger.warn("Could not start background discovery", e);
return;
import static org.openhab.binding.smaenergymeter.internal.SMAEnergyMeterBindingConstants.*;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.smaenergymeter.internal.configuration.EnergyMeterConfig;
+import org.openhab.binding.smaenergymeter.internal.packet.FilteringPayloadHandler;
import org.openhab.binding.smaenergymeter.internal.packet.PacketListener;
import org.openhab.binding.smaenergymeter.internal.packet.PacketListenerRegistry;
import org.openhab.binding.smaenergymeter.internal.packet.PayloadHandler;
+import org.openhab.binding.smaenergymeter.internal.packet.ThrottlingPayloadHandler;
import org.openhab.core.thing.ChannelUID;
import org.openhab.core.thing.Thing;
import org.openhab.core.thing.ThingStatus;
private final Logger logger = LoggerFactory.getLogger(SMAEnergyMeterHandler.class);
private final PacketListenerRegistry listenerRegistry;
private @Nullable PacketListener listener;
- private @Nullable String serialNumber;
+ private @Nullable PayloadHandler handler;
+ private String serialNumber;
public SMAEnergyMeterHandler(Thing thing, PacketListenerRegistry listenerRegistry) {
super(thing);
updateStatus(ThingStatus.UNKNOWN);
logger.debug("Activated handler for SMA Energy Meter with S/N '{}'", serialNumber);
- listener.addPayloadHandler(this);
-
- listener.open(config.getPollingPeriod());
+ if (config.getPollingPeriod() <= 1) {
+ listener.addPayloadHandler(handler = new FilteringPayloadHandler(this, serialNumber));
+ } else {
+ listener.addPayloadHandler(handler = new FilteringPayloadHandler(
+ new ThrottlingPayloadHandler(this, TimeUnit.SECONDS.toMillis(config.getPollingPeriod())),
+ serialNumber));
+ }
this.listener = listener;
logger.debug("Polling job scheduled to run every {} sec. for '{}'", config.getPollingPeriod(),
getThing().getUID());
public void dispose() {
logger.debug("Disposing SMAEnergyMeter handler '{}'", getThing().getUID());
PacketListener listener = this.listener;
- if (listener != null) {
- listener.removePayloadHandler(this);
+ PayloadHandler handler = this.handler;
+ if (listener != null && handler != null) {
+ listener.removePayloadHandler(handler);
this.listener = null;
}
}
@Override
public void handle(EnergyMeter energyMeter) {
- String serialNumber = this.serialNumber;
- if (serialNumber == null || !serialNumber.equals(energyMeter.getSerialNumber())) {
- return;
- }
updateStatus(ThingStatus.ONLINE);
logger.debug("Update SMAEnergyMeter {} data '{}'", serialNumber, getThing().getUID());
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.binding.smaenergymeter.internal.SMAEnergyMeterBindingConstants;
import org.openhab.binding.smaenergymeter.internal.packet.PacketListener.ReceivingTask;
+import org.openhab.core.common.ThreadPoolManager;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.slf4j.Logger;
public class DefaultPacketListenerRegistry implements PacketListenerRegistry {
private final Logger logger = LoggerFactory.getLogger(DefaultPacketListenerRegistry.class);
- private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,
- (runnable) -> new Thread(runnable,
- "OH-binding-" + SMAEnergyMeterBindingConstants.BINDING_ID + "-listener"));
+ private final ScheduledExecutorService scheduler = ThreadPoolManager
+ .getScheduledPool("OH-binding-" + SMAEnergyMeterBindingConstants.BINDING_ID + "-listener");
private final Map<String, PacketListener> listeners = new ConcurrentHashMap<>();
@Override
scheduler.shutdownNow();
}
- public ScheduledFuture<?> addTask(Runnable runnable, int intervalSec) {
- return scheduler.scheduleWithFixedDelay(runnable, 0, intervalSec, TimeUnit.SECONDS);
+ public ScheduledFuture<?> addTask(ReceivingTask runnable) {
+ return scheduler.scheduleWithFixedDelay(runnable, 0, 1000, TimeUnit.MILLISECONDS);
}
public void execute(ReceivingTask receivingTask) {
--- /dev/null
+/**
+ * Copyright (c) 2010-2024 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.binding.smaenergymeter.internal.packet;
+
+import java.io.IOException;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.openhab.binding.smaenergymeter.internal.handler.EnergyMeter;
+
+/**
+ * Payload handler which define acceptance criteria for received meter data.
+ *
+ * @author Łukasz Dywicki - Initial contribution
+ */
+@NonNullByDefault
+public class FilteringPayloadHandler implements PayloadHandler {
+
+ private final PayloadHandler delegate;
+ private final String serialNumber;
+
+ public FilteringPayloadHandler(PayloadHandler delegate, String serialNumber) {
+ this.delegate = delegate;
+ this.serialNumber = serialNumber;
+ }
+
+ @Override
+ public void handle(EnergyMeter energyMeter) throws IOException {
+ if (this.serialNumber.equals(energyMeter.getSerialNumber())) {
+ delegate.handle(energyMeter);
+ }
+ }
+}
}
public void addPayloadHandler(PayloadHandler handler) {
+ if (handlers.isEmpty()) {
+ open();
+ }
handlers.add(handler);
}
return socket != null && socket.isConnected();
}
- public void open(int intervalSec) throws IOException {
+ private void open() {
if (isOpen()) {
// no need to bind socket second time
return;
}
- MulticastSocket socket = new MulticastSocket(port);
- socket.setSoTimeout(5000);
- InetAddress address = InetAddress.getByName(multicastGroup);
- socket.joinGroup(address);
+ try {
+ MulticastSocket socket = new MulticastSocket(port);
+ socket.setSoTimeout(5000);
+ InetAddress address = InetAddress.getByName(multicastGroup);
+ socket.joinGroup(address);
- future = registry.addTask(new ReceivingTask(socket, multicastGroup + ":" + port, handlers), intervalSec);
- this.socket = socket;
+ future = registry.addTask(new ReceivingTask(socket, multicastGroup + ":" + port, handlers));
+ this.socket = socket;
+ } catch (IOException e) {
+ throw new RuntimeException("Could not open socket", e);
+ }
}
void close() throws IOException {
}
public void run() {
- try {
- byte[] bytes = new byte[608];
- DatagramPacket msgPacket = new DatagramPacket(bytes, bytes.length);
- DatagramSocket socket = this.socket;
- socket.receive(msgPacket);
+ byte[] bytes = new byte[608];
+ DatagramPacket msgPacket = new DatagramPacket(bytes, bytes.length);
+ DatagramSocket socket = this.socket;
- try {
+ try {
+ do {
+ // this loop is intended to receive all packets queued on the socket,
+ // having a receive() call without loop causes packets to get queued over time,
+ // if more than one meter present because we consume one packet per second
+ socket.receive(msgPacket);
EnergyMeter meter = new EnergyMeter();
meter.parse(bytes);
for (PayloadHandler handler : handlers) {
handler.handle(meter);
}
- } catch (IOException e) {
- logger.debug("Unexpected payload received for group {}", group, e);
- }
+ } while (msgPacket.getLength() == 608);
} catch (IOException e) {
- logger.warn("Failed to receive data for multicast group {}", group, e);
+ logger.debug("Unexpected payload received for group {}", group, e);
}
}
}
--- /dev/null
+/**
+ * Copyright (c) 2010-2024 Contributors to the openHAB project
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.openhab.binding.smaenergymeter.internal.packet;
+
+import java.io.IOException;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.openhab.binding.smaenergymeter.internal.handler.EnergyMeter;
+
+/**
+ * Payload handler which defer publishing of meter data by given amount of time.
+ *
+ * @author Łukasz Dywicki - Initial contribution
+ */
+@NonNullByDefault
+public class ThrottlingPayloadHandler implements PayloadHandler {
+
+ private final PayloadHandler delegate;
+ private final long pollingPeriodMs;
+ private long publishTime = 0;
+
+ public ThrottlingPayloadHandler(PayloadHandler delegate, long pollingPeriodMs) {
+ this.delegate = delegate;
+ this.pollingPeriodMs = pollingPeriodMs;
+ }
+
+ @Override
+ public void handle(EnergyMeter energyMeter) throws IOException {
+ long ts = System.currentTimeMillis();
+ if (publishTime <= ts) {
+ delegate.handle(energyMeter);
+ publishTime = ts + pollingPeriodMs;
+ }
+ }
+}