2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.proteusecometer.internal.ecometers;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.util.Optional;
18 import java.util.function.Supplier;
19 import java.util.stream.Stream;
21 import org.eclipse.jdt.annotation.NonNullByDefault;
22 import org.openhab.binding.proteusecometer.internal.WrappedException;
23 import org.openhab.binding.proteusecometer.internal.serialport.SerialPortService;
24 import org.openhab.core.util.HexUtils;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
29 * Read from Proteus EcoMeter S
31 * @author Matthias Herrmann - Initial contribution
35 public class ProteusEcoMeterSService {
37 private final Logger logger = LoggerFactory.getLogger(ProteusEcoMeterSService.class);
40 * Initialize the communication with the device, i.e. open the serial port etc.
42 * @return {@code true} if we can communicate with the device
45 public Stream<ProteusEcoMeterSReply> read(final String portId, final SerialPortService serialPort)
47 logger.trace("communicate");
49 final InputStream inputStream = serialPort.getInputStream(portId, 115200, 8, 1, 0);
50 final Supplier<Optional<ProteusEcoMeterSReply>> supplier = () -> {
51 logger.trace("Input stream opened for the port");
54 final byte[] deviceBytes = new byte[22];
55 inputStream.read(deviceBytes, 0, 22);
56 final String hexString = HexUtils.bytesToHex(deviceBytes);
57 logger.trace("Received hex string: {}", hexString);
58 final ProteusEcoMeterSParser parser = new ProteusEcoMeterSParser();
59 final Optional<ProteusEcoMeterSReply> dataOpt = parser.parseFromBytes(deviceBytes);
61 if (dataOpt.isEmpty()) {
62 logger.warn("Received bytes I don't understand: {}", hexString);
65 } catch (final IOException e) {
66 throw new WrappedException(e);
70 } catch (final IOException e) {
75 return Stream.generate(supplier).takeWhile(reply -> !Thread.interrupted()).filter(Optional::isPresent)