]> git.basschouten.com Git - openhab-addons.git/blob
7eb201259615ccebb8d0a8ef525645a4c717c8d7
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2024 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.smartmeter.connectors;
14
15 import java.io.IOException;
16 import java.time.Duration;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.TimeoutException;
20
21 import org.eclipse.jdt.annotation.NonNullByDefault;
22 import org.eclipse.jdt.annotation.Nullable;
23 import org.reactivestreams.Publisher;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 import io.reactivex.BackpressureStrategy;
28 import io.reactivex.Flowable;
29 import io.reactivex.FlowableEmitter;
30 import io.reactivex.schedulers.Schedulers;
31
32 /**
33  * Represents a basic implementation of a SML device connector.
34  *
35  * @author Matthias Steigenberger - Initial contribution
36  * @author Mathias Gilhuber - Also-By
37  */
38 @NonNullByDefault
39 public abstract class ConnectorBase<T> implements IMeterReaderConnector<T> {
40
41     protected final Logger logger = LoggerFactory.getLogger(getClass());
42     /**
43      * The name of the port where the device is connected as defined in openHAB configuration.
44      */
45     private String portName;
46     public static final int NUMBER_OF_RETRIES = 3;
47
48     /**
49      * Contructor for basic members.
50      *
51      * This constructor has to be called from derived classes!
52      *
53      */
54     protected ConnectorBase(String portName) {
55         this.portName = portName;
56     }
57
58     /**
59      * Reads a new IO message.
60      *
61      * @param initMessage
62      * @return The payload
63      * @throws IOException Whenever there was a reading error.
64      */
65     protected abstract T readNext(byte @Nullable [] initMessage) throws IOException;
66
67     /**
68      * Whether to periodically emit values.
69      *
70      * @return whether periodically emit values or not
71      */
72     protected boolean applyPeriod() {
73         return false;
74     }
75
76     /**
77      * Whether to apply a retry handling whenever the read out failed.
78      *
79      * @return whether to use the retry handling or not.
80      */
81     protected boolean applyRetryHandling() {
82         return false;
83     }
84
85     /**
86      * If reading of meter values fail a retry handling shall be implemented here.
87      * The provided publisher publishes the errors.
88      * If a retry shall happen, the returned publisher shall emit an event-
89      *
90      * @param period
91      * @param attempts
92      * @return The publisher which emits events for a retry.
93      */
94     protected Publisher<?> getRetryPublisher(Duration period, Publisher<Throwable> attempts) {
95         return Flowable.fromPublisher(attempts)
96                 .zipWith(Flowable.range(1, NUMBER_OF_RETRIES + 1), (throwable, attempt) -> {
97                     if (throwable instanceof TimeoutException || attempt == NUMBER_OF_RETRIES + 1) {
98                         throw new RuntimeException(throwable);
99                     } else {
100                         logger.warn("{}. reading attempt failed: {}. Retrying {}...", attempt, throwable.getMessage(),
101                                 getPortName());
102                         return attempt;
103                     }
104                 }).flatMap(i -> {
105                     retryHook(i);
106                     Duration additionalDelay = period;
107                     logger.warn("Delaying retry by {}", additionalDelay);
108                     return Flowable.timer(additionalDelay.toMillis(), TimeUnit.MILLISECONDS);
109                 });
110     }
111
112     /**
113      * Called whenever a retry shall happen. Clients can do something here.
114      *
115      * @param retryCount The current number of retries
116      */
117     protected void retryHook(int retryCount) {
118     }
119
120     @Override
121     public Publisher<T> getMeterValues(byte @Nullable [] initMessage, Duration period, ExecutorService executor) {
122         Flowable<T> itemPublisher = Flowable.<T> create((emitter) -> {
123             emitValues(initMessage, emitter);
124         }, BackpressureStrategy.DROP);
125
126         Flowable<T> result;
127         if (applyPeriod()) {
128             result = Flowable.timer(period.toMillis(), TimeUnit.MILLISECONDS, Schedulers.from(executor))
129                     .flatMap(event -> itemPublisher).repeat();
130         } else {
131             result = itemPublisher;
132         }
133         if (applyRetryHandling()) {
134             return result.retryWhen(attempts -> {
135                 return Flowable.fromPublisher(getRetryPublisher(period, attempts));
136             });
137         } else {
138             return result;
139         }
140     }
141
142     /**
143      * Emitting of values shall happen here. If there is an event based emitting, this can be overriden.
144      *
145      * @param initMessage The message which shall be written before reading the values.
146      * @param emitter The {@link FlowableEmitter} to emit the values to.
147      * @throws IOException thrown if any reading error occurs.
148      */
149     protected void emitValues(byte @Nullable [] initMessage, FlowableEmitter<@Nullable T> emitter) throws IOException {
150         if (!emitter.isCancelled()) {
151             try {
152                 emitter.onNext(readNext(initMessage));
153                 emitter.onComplete();
154             } catch (IOException e) {
155                 if (!emitter.isCancelled()) {
156                     throw e;
157                 }
158             }
159         }
160     }
161
162     /**
163      * Gets the name of the serial port.
164      *
165      * @return The actual name of the serial port.
166      */
167     public String getPortName() {
168         return portName;
169     }
170 }