2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.smartmeter.internal;
15 import java.time.Duration;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.HashMap;
19 import java.util.List;
21 import java.util.Map.Entry;
22 import java.util.concurrent.CopyOnWriteArrayList;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.function.Supplier;
28 import javax.measure.Quantity;
30 import org.eclipse.jdt.annotation.NonNullByDefault;
31 import org.eclipse.jdt.annotation.Nullable;
32 import org.openhab.binding.smartmeter.connectors.IMeterReaderConnector;
33 import org.openhab.binding.smartmeter.internal.helper.ProtocolMode;
34 import org.openhab.core.io.transport.serial.SerialPortManager;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
38 import io.reactivex.Flowable;
39 import io.reactivex.disposables.Disposable;
40 import io.reactivex.plugins.RxJavaPlugins;
41 import io.reactivex.schedulers.Schedulers;
44 * This represents a meter device.
45 * All read values of the device are cached here and can be obtained. The reading can be started with
46 * {@link #readValues(long, ScheduledExecutorService, Duration)}
48 * @author Matthias Steigenberger - Initial contribution
50 * @param <T> The type of Payload which is read from the device.
53 public abstract class MeterDevice<T> {
55 private static final int RETRY_DELAY = 2;
56 private final Logger logger = LoggerFactory.getLogger(MeterDevice.class);
58 * Controls wether the device info is logged to the OSGi console.
60 private boolean printMeterInfo;
62 * Map of all values captured from the device during the read request.
64 private Map<String, MeterValue<?>> valueCache;
65 private byte @Nullable [] initMessage;
67 * The id of the SML device from openHAB configuration.
69 private String deviceId;
71 * Used to establish the device connection
73 IMeterReaderConnector<T> connector;
74 private List<MeterValueListener> valueChangeListeners;
76 public MeterDevice(Supplier<SerialPortManager> serialPortManagerSupplier, String deviceId, String serialPort,
77 byte @Nullable [] initMessage, int baudrate, int baudrateChangeDelay, ProtocolMode protocolMode) {
79 this.deviceId = deviceId;
80 this.valueCache = new HashMap<>();
81 this.valueChangeListeners = new CopyOnWriteArrayList<>();
82 this.printMeterInfo = true;
83 this.connector = createConnector(serialPortManagerSupplier, serialPort, baudrate, baudrateChangeDelay,
85 RxJavaPlugins.setErrorHandler(error -> {
86 logger.error("Fatal error occured", error);
91 * Creates the actual connector that handles the serial port communication and protocol.
93 * @param serialPortManagerSupplier Supplies the {@link SerialPortManager} which is used to obtain the serial port
95 * @param serialPort The name of the port to communicate with.
96 * @param baudrate The Baudrate to set for communication.
97 * @param baudrateChangeDelay The delay which is used before changing the baudrate (used only for specific
99 * @param protocolMode The {@link ProtocolMode} to use.
100 * @return The connector which handles the serial port communication.
102 protected abstract IMeterReaderConnector<T> createConnector(Supplier<SerialPortManager> serialPortManagerSupplier,
103 String serialPort, int baudrate, int baudrateChangeDelay, ProtocolMode protocolMode);
106 * Gets the configured deviceId.
108 * @return the id of the SmlDevice from openHAB configuration.
110 public String getDeviceId() {
115 * Returns the specified OBIS value if available.
117 * @param obisId the OBIS code which value should be retrieved.
118 * @return the OBIS value as String if available - otherwise null.
121 public String getValue(String obisId) {
122 MeterValue<?> smlValue = getMeterValue(obisId);
123 if (smlValue != null) {
124 return smlValue.getValue();
130 * Returns the specified OBIS value if available.
132 * @param obisId the OBIS code which value should be retrieved.
133 * @return the OBIS value if available - otherwise null.
135 @SuppressWarnings("unchecked")
137 public <Q extends Quantity<Q>> MeterValue<Q> getMeterValue(String obisId) {
138 if (valueCache.containsKey(obisId)) {
139 return (MeterValue<Q>) valueCache.get(obisId);
145 * Gets all currently available OBIS codes.
147 * @return All cached OBIS codes.
149 public Collection<String> getObisCodes() {
150 return new ArrayList<>(this.valueCache.keySet());
154 * Read values from this device a store them locally against their OBIS code.
156 * If there is an error in reading, it will be retried
157 * {@value org.openhab.binding.smartmeter.connectors.ConnectorBase#NUMBER_OF_RETRIES} times.
158 * The retry will be delayed by {@code period} seconds.
159 * If its still failing, the connection will be closed and opened again.
161 * @return The {@link Disposable} which needs to be disposed whenever not used anymore.
164 public Disposable readValues(long timeout, ScheduledExecutorService executorService, Duration period) {
165 return Flowable.fromPublisher(connector.getMeterValues(initMessage, period, executorService))
166 .timeout(timeout + period.toMillis(), TimeUnit.MILLISECONDS, Schedulers.from(executorService))
167 .doOnSubscribe(sub -> {
168 logger.debug("Opening connection to {}", getDeviceId());
169 connector.openConnection();
171 if (ex instanceof TimeoutException) {
172 logger.debug("Timeout occured for {}; {}", getDeviceId(), ex.getMessage());
174 logger.debug("Failed to read: {}. Closing connection and trying again in {} seconds...; {}",
175 ex.getMessage(), RETRY_DELAY, getDeviceId(), ex);
177 connector.closeConnection();
178 notifyReadingError(ex);
179 }).doOnCancel(connector::closeConnection).doOnComplete(connector::closeConnection).share()
181 publisher -> publisher.delay(RETRY_DELAY, TimeUnit.SECONDS, Schedulers.from(executorService)))
182 .subscribeOn(Schedulers.from(executorService), true).subscribe((value) -> {
183 Map<String, MeterValue<?>> obisCodes = new HashMap<>(valueCache);
185 populateValueCache(value);
187 Collection<String> newObisCodes = getObisCodes();
188 // notify every removed obis code.
189 obisCodes.values().stream().filter((val) -> !newObisCodes.contains(val.getObisCode()))
190 .forEach((val) -> notifyValuesRemoved(val));
195 * Deletes all cached values.
197 * The method will always be called before new values are populated.
199 protected void clearValueCache() {
204 * Called whenever a new value was made available. The value cache needs to be filled here with
205 * {@link #addObisCache(MeterValue)}.
207 * @param payload The actual payload value.
209 protected abstract <Q extends Quantity<Q>> void populateValueCache(T payload);
212 * Adds a {@link MeterValue} to the current cache.
214 * @param value The value to add.
216 protected <Q extends Quantity<Q>> void addObisCache(MeterValue<Q> value) {
217 logger.debug("Value changed: {}", value);
218 this.valueCache.put(value.getObisCode(), value);
219 this.valueChangeListeners.forEach((listener) -> {
221 listener.valueChanged(value);
222 } catch (Exception e) {
223 logger.error("Meter listener failed", e);
229 public String toString() {
230 StringBuilder stringBuilder = new StringBuilder();
231 stringBuilder.append("Device: ");
232 stringBuilder.append(getDeviceId());
233 stringBuilder.append(System.lineSeparator());
235 for (Entry<String, MeterValue<?>> entry : valueCache.entrySet()) {
236 stringBuilder.append("Obis: " + entry.getKey() + " " + entry.getValue().toString());
237 stringBuilder.append(System.lineSeparator());
239 return stringBuilder.toString();
243 * Adds a {@link MeterValueListener} to the list of listeners which gets notified on new values being read.
245 * @param valueChangeListener The new {@link MeterValueListener}
247 public void addValueChangeListener(MeterValueListener valueChangeListener) {
248 this.valueChangeListeners.add(valueChangeListener);
252 * Removes a {@link MeterValueListener} from the list of listeners.
254 * @param valueChangeListener The listener to remove.
256 public void removeValueChangeListener(MeterValueListener valueChangeListener) {
257 this.valueChangeListeners.remove(valueChangeListener);
260 private <Q extends Quantity<Q>> void notifyValuesRemoved(MeterValue<Q> value) {
261 this.valueChangeListeners.forEach((listener) -> listener.valueRemoved(value));
264 private void notifyReadingError(Throwable e) {
265 this.valueChangeListeners.forEach((listener) -> listener.errorOccurred(e));
269 * Logs the object information with all given SML values to OSGi console.
271 * It's only called once - except the config was updated.
273 protected void printInfo() {
274 if (this.getPrintMeterInfo()) {
275 logger.info("Read out following values: {}", toString());
276 setPrintMeterInfo(false);
281 * Gets if the object information has to be logged to OSGi console.
283 * @return true if the object information should be logged, otherwise false.
285 private Boolean getPrintMeterInfo() {
286 return this.printMeterInfo;
290 * Sets if the object information has to be logged to OSGi console.
292 private void setPrintMeterInfo(Boolean printMeterInfo) {
293 this.printMeterInfo = printMeterInfo;