2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.smartmeter.internal;
15 import java.time.Duration;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.HashMap;
19 import java.util.List;
21 import java.util.Map.Entry;
22 import java.util.concurrent.CopyOnWriteArrayList;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.function.Supplier;
28 import javax.measure.Quantity;
30 import org.eclipse.jdt.annotation.NonNullByDefault;
31 import org.eclipse.jdt.annotation.Nullable;
32 import org.openhab.binding.smartmeter.connectors.IMeterReaderConnector;
33 import org.openhab.binding.smartmeter.internal.helper.ProtocolMode;
34 import org.openhab.core.io.transport.serial.SerialPortManager;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
38 import io.reactivex.Flowable;
39 import io.reactivex.disposables.Disposable;
40 import io.reactivex.plugins.RxJavaPlugins;
41 import io.reactivex.schedulers.Schedulers;
44 * This represents a meter device.
45 * All read values of the device are cached here and can be obtained. The reading can be started with
46 * {@link #readValues(ScheduledExecutorService, Duration)}
48 * @author Matthias Steigenberger - Initial contribution
50 * @param <T> The type of Payload which is read from the device.
53 public abstract class MeterDevice<T> {
55 private static final int RETRY_DELAY = 2;
56 private final Logger logger = LoggerFactory.getLogger(MeterDevice.class);
58 * Controls wether the device info is logged to the OSGi console.
60 private boolean printMeterInfo;
62 * Map of all values captured from the device during the read request.
64 private Map<String, MeterValue<?>> valueCache;
65 private byte @Nullable [] initMessage;
67 * The id of the SML device from openHAB configuration.
69 private String deviceId;
71 * Used to establish the device connection
73 IMeterReaderConnector<T> connector;
74 private List<MeterValueListener> valueChangeListeners;
76 public MeterDevice(Supplier<SerialPortManager> serialPortManagerSupplier, String deviceId, String serialPort,
77 byte @Nullable [] initMessage, int baudrate, int baudrateChangeDelay, ProtocolMode protocolMode) {
79 this.deviceId = deviceId;
80 this.valueCache = new HashMap<>();
81 this.valueChangeListeners = new CopyOnWriteArrayList<>();
82 this.printMeterInfo = true;
83 this.connector = createConnector(serialPortManagerSupplier, serialPort, baudrate, baudrateChangeDelay,
85 RxJavaPlugins.setErrorHandler(error -> {
86 logger.error("Fatal error occured", error);
91 * Creates the actual connector that handles the serial port communication and protocol.
93 * @param serialPortManagerSupplier Supplies the {@link SerialPortManager} which is used to obtain the serial port
95 * @param serialPort The name of the port to communicate with.
96 * @param baudrate The Baudrate to set for communication.
97 * @param baudrateChangeDelay The delay which is used before changing the baudrate (used only for specific
99 * @param protocolMode The {@link ProtocolMode} to use.
100 * @return The connector which handles the serial port communication.
102 protected abstract IMeterReaderConnector<T> createConnector(Supplier<SerialPortManager> serialPortManagerSupplier,
103 String serialPort, int baudrate, int baudrateChangeDelay, ProtocolMode protocolMode);
106 * Gets the configured deviceId.
108 * @return the id of the SmlDevice from openHAB configuration.
110 public String getDeviceId() {
115 * Returns the specified OBIS value if available.
117 * @param obis the OBIS code which value should be retrieved.
118 * @return the OBIS value as String if available - otherwise null.
121 public String getValue(String obisId) {
122 MeterValue<?> smlValue = getMeterValue(obisId);
123 if (smlValue != null) {
124 return smlValue.getValue();
130 * Returns the specified OBIS value if available.
132 * @param obis the OBIS code which value should be retrieved.
133 * @return the OBIS value if available - otherwise null.
135 @SuppressWarnings("unchecked")
137 public <Q extends Quantity<Q>> MeterValue<Q> getMeterValue(String obisId) {
138 if (valueCache.containsKey(obisId)) {
139 return (MeterValue<Q>) valueCache.get(obisId);
145 * Gets all currently available OBIS codes.
147 * @return All cached OBIS codes.
149 public Collection<String> getObisCodes() {
150 return new ArrayList<>(this.valueCache.keySet());
154 * Read values from this device a store them locally against their OBIS code.
156 * If there is an error in reading, it will be retried {@value #NUMBER_OF_RETRIES} times. The retry will be delayed
157 * by {@code period} seconds.
158 * If its still failing, the connection will be closed and opened again.
160 * @return The {@link Disposable} which needs to be disposed whenever not used anymore.
163 public Disposable readValues(long timeout, ScheduledExecutorService executorService, Duration period) {
164 return Flowable.fromPublisher(connector.getMeterValues(initMessage, period, executorService))
165 .timeout(timeout + period.toMillis(), TimeUnit.MILLISECONDS, Schedulers.from(executorService))
166 .doOnSubscribe(sub -> {
167 logger.debug("Opening connection to {}", getDeviceId());
168 connector.openConnection();
170 if (ex instanceof TimeoutException) {
171 logger.debug("Timeout occured for {}; {}", getDeviceId(), ex.getMessage());
173 logger.debug("Failed to read: {}. Closing connection and trying again in {} seconds...; {}",
174 ex.getMessage(), RETRY_DELAY, getDeviceId(), ex);
176 connector.closeConnection();
177 notifyReadingError(ex);
178 }).doOnCancel(connector::closeConnection).doOnComplete(connector::closeConnection).share()
180 publisher -> publisher.delay(RETRY_DELAY, TimeUnit.SECONDS, Schedulers.from(executorService)))
181 .subscribeOn(Schedulers.from(executorService), true).subscribe((value) -> {
182 Map<String, MeterValue<?>> obisCodes = new HashMap<>(valueCache);
184 populateValueCache(value);
186 Collection<String> newObisCodes = getObisCodes();
187 // notify every removed obis code.
188 obisCodes.values().stream().filter((val) -> !newObisCodes.contains(val.getObisCode()))
189 .forEach((val) -> notifyValuesRemoved(val));
194 * Deletes all cached values.
196 * The method will always be called before new values are populated.
198 protected void clearValueCache() {
203 * Called whenever a new value was made available. The value cache needs to be filled here with
204 * {@link #addObisCache(MeterValue)}.
206 * @param payload The actual payload value.
208 protected abstract <Q extends Quantity<Q>> void populateValueCache(T payload);
211 * Adds a {@link MeterValue} to the current cache.
213 * @param value The value to add.
215 protected <Q extends Quantity<Q>> void addObisCache(MeterValue<Q> value) {
216 logger.debug("Value changed: {}", value);
217 this.valueCache.put(value.getObisCode(), value);
218 this.valueChangeListeners.forEach((listener) -> {
220 listener.valueChanged(value);
221 } catch (Exception e) {
222 logger.error("Meter listener failed", e);
228 public String toString() {
229 StringBuilder stringBuilder = new StringBuilder();
230 stringBuilder.append("Device: ");
231 stringBuilder.append(getDeviceId());
232 stringBuilder.append(System.lineSeparator());
234 for (Entry<String, MeterValue<?>> entry : valueCache.entrySet()) {
235 stringBuilder.append("Obis: " + entry.getKey() + " " + entry.getValue().toString());
236 stringBuilder.append(System.lineSeparator());
238 return stringBuilder.toString();
242 * Adds a {@link MeterValueListener} to the list of listeners which gets notified on new values being read.
244 * @param valueChangeListener The new {@link MeterValueListener}
246 public void addValueChangeListener(MeterValueListener valueChangeListener) {
247 this.valueChangeListeners.add(valueChangeListener);
251 * Removes a {@link MeterValueListener} from the list of listeners.
253 * @param valueChangeListener The listener to remove.
255 public void removeValueChangeListener(MeterValueListener valueChangeListener) {
256 this.valueChangeListeners.remove(valueChangeListener);
259 private <Q extends Quantity<Q>> void notifyValuesRemoved(MeterValue<Q> value) {
260 this.valueChangeListeners.forEach((listener) -> listener.valueRemoved(value));
263 private void notifyReadingError(Throwable e) {
264 this.valueChangeListeners.forEach((listener) -> listener.errorOccurred(e));
268 * Logs the object information with all given SML values to OSGi console.
270 * It's only called once - except the config was updated.
272 protected void printInfo() {
273 if (this.getPrintMeterInfo()) {
274 logger.info("Read out following values: {}", toString());
275 setPrintMeterInfo(false);
280 * Gets if the object information has to be logged to OSGi console.
282 * @return true if the object information should be logged, otherwise false.
284 private Boolean getPrintMeterInfo() {
285 return this.printMeterInfo;
289 * Sets if the object information has to be logged to OSGi console.
291 private void setPrintMeterInfo(Boolean printMeterInfo) {
292 this.printMeterInfo = printMeterInfo;