2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.nikobus.internal.handler;
15 import static org.openhab.binding.nikobus.internal.NikobusBindingConstants.CONFIG_REFRESH_INTERVAL;
17 import java.io.IOException;
18 import java.io.OutputStream;
19 import java.util.Collection;
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.LinkedList;
23 import java.util.List;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import java.util.function.Consumer;
29 import java.util.stream.Collectors;
30 import java.util.stream.Stream;
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.openhab.binding.nikobus.internal.NikobusBindingConstants;
35 import org.openhab.binding.nikobus.internal.discovery.NikobusDiscoveryService;
36 import org.openhab.binding.nikobus.internal.protocol.NikobusCommand;
37 import org.openhab.binding.nikobus.internal.protocol.NikobusConnection;
38 import org.openhab.binding.nikobus.internal.utils.Utils;
39 import org.openhab.core.io.transport.serial.SerialPortManager;
40 import org.openhab.core.thing.Bridge;
41 import org.openhab.core.thing.ChannelUID;
42 import org.openhab.core.thing.Thing;
43 import org.openhab.core.thing.ThingStatus;
44 import org.openhab.core.thing.ThingStatusDetail;
45 import org.openhab.core.thing.binding.BaseBridgeHandler;
46 import org.openhab.core.thing.binding.ThingHandler;
47 import org.openhab.core.thing.binding.ThingHandlerService;
48 import org.openhab.core.types.Command;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
53 * The {@link NikobusPcLinkHandler} is responsible for handling commands, which
54 * are sent or received from the PC-Link Nikobus component.
56 * @author Boris Krivonog - Initial contribution
59 public class NikobusPcLinkHandler extends BaseBridgeHandler {
60 private final Logger logger = LoggerFactory.getLogger(NikobusPcLinkHandler.class);
61 private final Map<String, Runnable> commandListeners = Collections.synchronizedMap(new HashMap<>());
62 private final LinkedList<NikobusCommand> pendingCommands = new LinkedList<>();
63 private final StringBuilder stringBuilder = new StringBuilder();
64 private final SerialPortManager serialPortManager;
65 private @Nullable NikobusConnection connection;
66 private @Nullable NikobusCommand currentCommand;
67 private @Nullable ScheduledFuture<?> scheduledRefreshFuture;
68 private @Nullable ScheduledFuture<?> scheduledSendCommandWatchdogFuture;
69 private @Nullable String ack;
70 private @Nullable Consumer<String> unhandledCommandsProcessor;
71 private int refreshThingIndex = 0;
73 public NikobusPcLinkHandler(Bridge bridge, SerialPortManager serialPortManager) {
75 this.serialPortManager = serialPortManager;
79 public void initialize() {
81 stringBuilder.setLength(0);
83 updateStatus(ThingStatus.UNKNOWN);
85 String portName = (String) getConfig().get(NikobusBindingConstants.CONFIG_PORT_NAME);
86 if (portName == null) {
87 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.CONFIGURATION_ERROR, "Port must be set!");
91 connection = new NikobusConnection(serialPortManager, portName, this::processReceivedValue);
93 int refreshInterval = ((Number) getConfig().get(CONFIG_REFRESH_INTERVAL)).intValue();
94 scheduledRefreshFuture = scheduler.scheduleWithFixedDelay(this::refresh, refreshInterval, refreshInterval,
99 public void dispose() {
102 Utils.cancel(scheduledSendCommandWatchdogFuture);
103 scheduledSendCommandWatchdogFuture = null;
105 Utils.cancel(scheduledRefreshFuture);
106 scheduledRefreshFuture = null;
108 NikobusConnection connection = this.connection;
109 this.connection = null;
111 if (connection != null) {
117 public void handleCommand(ChannelUID channelUID, Command command) {
122 public Collection<Class<? extends ThingHandlerService>> getServices() {
123 return Collections.singleton(NikobusDiscoveryService.class);
126 private void processReceivedValue(byte value) {
127 logger.trace("Received {}", value);
130 String command = stringBuilder.toString();
131 stringBuilder.setLength(0);
133 logger.debug("Received command '{}', ack = '{}'", command, ack);
136 if (command.startsWith("$")) {
137 String ack = this.ack;
140 processResponse(command, ack);
142 Runnable listener = commandListeners.get(command);
143 if (listener != null) {
146 Consumer<String> processor = unhandledCommandsProcessor;
147 if (processor != null) {
148 processor.accept(command);
152 } catch (RuntimeException e) {
153 logger.debug("Processing command '{}' failed due {}", command, e.getMessage(), e);
156 stringBuilder.append((char) value);
158 // Take ACK part, i.e. "$0512"
159 if (stringBuilder.length() == 5) {
160 String payload = stringBuilder.toString();
161 if (payload.startsWith("$05")) {
163 logger.debug("Received ack '{}'", ack);
164 stringBuilder.setLength(0);
166 } else if (stringBuilder.length() > 128) {
167 // Fuse, if for some reason we don't receive \r don't fill buffer.
168 stringBuilder.setLength(0);
169 logger.warn("Resetting read buffer, should not happen, am I connected to Nikobus?");
174 public void addListener(String command, Runnable listener) {
175 if (commandListeners.put(command, listener) != null) {
176 logger.warn("Multiple registrations for '{}'", command);
180 public void removeListener(String command) {
181 commandListeners.remove(command);
184 public void setUnhandledCommandProcessor(Consumer<String> processor) {
185 if (unhandledCommandsProcessor != null) {
186 logger.debug("Unexpected override of unhandledCommandsProcessor");
188 unhandledCommandsProcessor = processor;
191 public void resetUnhandledCommandProcessor() {
192 unhandledCommandsProcessor = null;
195 private void processResponse(String commandPayload, @Nullable String ack) {
196 NikobusCommand command;
197 synchronized (pendingCommands) {
198 command = currentCommand;
201 if (command == null) {
202 logger.debug("Processing response but no command pending");
206 NikobusCommand.ResponseHandler responseHandler = command.getResponseHandler();
207 if (responseHandler == null) {
208 logger.debug("No response expected for current command");
213 logger.debug("No ack received");
217 String requestCommandId = command.getPayload().substring(3, 5);
218 String ackCommandId = ack.substring(3, 5);
219 if (!ackCommandId.equals(requestCommandId)) {
220 logger.debug("Unexpected command's ack '{}' != '{}'", requestCommandId, ackCommandId);
224 // Check if response has expected length.
225 if (commandPayload.length() != responseHandler.getResponseLength()) {
226 logger.debug("Unexpected response length");
230 if (!commandPayload.startsWith(responseHandler.getResponseCode())) {
231 logger.debug("Unexpected response command code");
235 String requestCommandAddress = command.getPayload().substring(5, 9);
236 String ackCommandAddress = commandPayload.substring(responseHandler.getAddressStart(),
237 responseHandler.getAddressStart() + 4);
238 if (!requestCommandAddress.equals(ackCommandAddress)) {
239 logger.debug("Unexpected response address");
243 if (responseHandler.complete(commandPayload)) {
244 resetProcessingAndProcessNext();
248 public void sendCommand(NikobusCommand command) {
249 synchronized (pendingCommands) {
250 pendingCommands.addLast(command);
253 scheduler.submit(this::processCommand);
256 private void processCommand() {
257 NikobusCommand command;
258 synchronized (pendingCommands) {
259 if (currentCommand != null) {
263 command = pendingCommands.pollFirst();
264 if (command == null) {
268 currentCommand = command;
270 sendCommand(command, 3);
273 private void sendCommand(NikobusCommand command, int retry) {
274 logger.debug("Sending retry = {}, command '{}'", retry, command.getPayload());
276 NikobusConnection connection = this.connection;
277 if (connection == null) {
282 connectIfNeeded(connection);
284 OutputStream outputStream = connection.getOutputStream();
285 if (outputStream == null) {
288 outputStream.write(command.getPayload().getBytes());
289 outputStream.flush();
290 } catch (IOException e) {
291 logger.debug("Sending command failed due {}", e.getMessage(), e);
293 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getMessage());
295 NikobusCommand.ResponseHandler responseHandler = command.getResponseHandler();
296 if (responseHandler == null) {
297 resetProcessingAndProcessNext();
298 } else if (retry > 0) {
299 scheduleSendCommandTimeout(() -> {
300 if (!responseHandler.isCompleted()) {
301 sendCommand(command, retry - 1);
305 scheduleSendCommandTimeout(() -> processTimeout(responseHandler));
310 private void scheduleSendCommandTimeout(Runnable command) {
311 scheduledSendCommandWatchdogFuture = scheduler.schedule(command, 2, TimeUnit.SECONDS);
314 private void processTimeout(NikobusCommand.ResponseHandler responseHandler) {
315 if (responseHandler.completeExceptionally(new TimeoutException("Waiting for response timed-out."))) {
316 resetProcessingAndProcessNext();
320 private void resetProcessingAndProcessNext() {
321 Utils.cancel(scheduledSendCommandWatchdogFuture);
322 synchronized (pendingCommands) {
323 currentCommand = null;
325 scheduler.submit(this::processCommand);
328 private void refresh() {
329 List<Thing> things = getThing().getThings().stream()
330 .filter(thing -> thing.getHandler() instanceof NikobusModuleHandler).collect(Collectors.toList());
332 // if there are command listeners (buttons) then we need an open connection even if no modules exist
333 if (!commandListeners.isEmpty()) {
334 NikobusConnection connection = this.connection;
335 if (connection == null) {
339 connectIfNeeded(connection);
340 } catch (IOException e) {
342 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getMessage());
347 if (things.isEmpty()) {
348 logger.debug("Nothing to refresh");
352 refreshThingIndex = (refreshThingIndex + 1) % things.size();
354 ThingHandler thingHandler = things.get(refreshThingIndex).getHandler();
355 if (thingHandler == null) {
359 NikobusModuleHandler handler = (NikobusModuleHandler) thingHandler;
360 handler.refreshModule();
363 private synchronized void connectIfNeeded(NikobusConnection connection) throws IOException {
364 if (!connection.isConnected()) {
365 connection.connect();
367 // Send connection sequence, mimicking the Nikobus software. If this is not
368 // sent, PC-Link sometimes does not forward button presses via serial interface.
369 Stream.of(new String[] { "++++", "ATH0", "ATZ", "$10110000B8CF9D", "#L0", "#E0", "#L0", "#E1" })
370 .map(NikobusCommand::new).forEach(this::sendCommand);
372 updateStatus(ThingStatus.ONLINE);