2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.smartmeter.connectors;
15 import java.io.IOException;
16 import java.time.Duration;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.TimeoutException;
21 import org.eclipse.jdt.annotation.NonNullByDefault;
22 import org.eclipse.jdt.annotation.Nullable;
23 import org.reactivestreams.Publisher;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
27 import io.reactivex.BackpressureStrategy;
28 import io.reactivex.Flowable;
29 import io.reactivex.FlowableEmitter;
30 import io.reactivex.schedulers.Schedulers;
33 * Represents a basic implementation of a SML device connector.
35 * @author Matthias Steigenberger - Initial contribution
36 * @author Mathias Gilhuber - Also-By
39 public abstract class ConnectorBase<T> implements IMeterReaderConnector<T> {
41 protected final Logger logger = LoggerFactory.getLogger(getClass());
43 * The name of the port where the device is connected as defined in openHAB configuration.
45 private String portName;
46 public static final int NUMBER_OF_RETRIES = 3;
49 * Contructor for basic members.
51 * This constructor has to be called from derived classes!
54 protected ConnectorBase(String portName) {
55 this.portName = portName;
59 * Reads a new IO message.
63 * @throws IOException Whenever there was a reading error.
65 protected abstract T readNext(byte @Nullable [] initMessage) throws IOException;
68 * Whether to periodically emit values.
70 * @return whether periodically emit values or not
72 protected boolean applyPeriod() {
77 * Whether to apply a retry handling whenever the read out failed.
79 * @return whether to use the retry handling or not.
81 protected boolean applyRetryHandling() {
86 * If reading of meter values fail a retry handling shall be implemented here.
87 * The provided publisher publishes the errors.
88 * If a retry shall happen, the returned publisher shall emit an event-
92 * @return The publisher which emits events for a retry.
94 protected Publisher<?> getRetryPublisher(Duration period, Publisher<Throwable> attempts) {
95 return Flowable.fromPublisher(attempts)
96 .zipWith(Flowable.range(1, NUMBER_OF_RETRIES + 1), (throwable, attempt) -> {
97 if (throwable instanceof TimeoutException || attempt == NUMBER_OF_RETRIES + 1) {
98 throw new RuntimeException(throwable);
100 logger.warn("{}. reading attempt failed: {}. Retrying {}...", attempt, throwable.getMessage(),
106 Duration additionalDelay = period;
107 logger.warn("Delaying retry by {}", additionalDelay);
108 return Flowable.timer(additionalDelay.toMillis(), TimeUnit.MILLISECONDS);
113 * Called whenever a retry shall happen. Clients can do something here.
115 * @param retryCount The current number of retries
117 protected void retryHook(int retryCount) {
121 public Publisher<T> getMeterValues(byte @Nullable [] initMessage, Duration period, ExecutorService executor) {
122 Flowable<T> itemPublisher = Flowable.<T> create((emitter) -> {
123 emitValues(initMessage, emitter);
124 }, BackpressureStrategy.DROP);
128 result = Flowable.timer(period.toMillis(), TimeUnit.MILLISECONDS, Schedulers.from(executor))
129 .flatMap(event -> itemPublisher).repeat();
131 result = itemPublisher;
133 if (applyRetryHandling()) {
134 return result.retryWhen(attempts -> {
135 return Flowable.fromPublisher(getRetryPublisher(period, attempts));
143 * Emitting of values shall happen here. If there is an event based emitting, this can be overriden.
145 * @param initMessage The message which shall be written before reading the values.
146 * @param emitter The {@link FlowableEmitter} to emit the values to.
147 * @throws IOException thrown if any reading error occurs.
149 protected void emitValues(byte @Nullable [] initMessage, FlowableEmitter<@Nullable T> emitter) throws IOException {
150 if (!emitter.isCancelled()) {
152 emitter.onNext(readNext(initMessage));
153 emitter.onComplete();
154 } catch (IOException e) {
155 if (!emitter.isCancelled()) {
163 * Gets the name of the serial port.
165 * @return The actual name of the serial port.
167 public String getPortName() {