2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.lcn.internal.connection;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.nio.BufferOverflowException;
18 import java.nio.ByteBuffer;
19 import java.nio.channels.AsynchronousSocketChannel;
20 import java.nio.channels.Channel;
21 import java.nio.channels.CompletionHandler;
22 import java.time.Instant;
23 import java.time.temporal.ChronoUnit;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.Objects;
30 import java.util.Queue;
31 import java.util.concurrent.BlockingQueue;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.ScheduledExecutorService;
35 import org.eclipse.jdt.annotation.NonNullByDefault;
36 import org.eclipse.jdt.annotation.Nullable;
37 import org.openhab.binding.lcn.internal.common.LcnAddr;
38 import org.openhab.binding.lcn.internal.common.LcnAddrGrp;
39 import org.openhab.binding.lcn.internal.common.LcnAddrMod;
40 import org.openhab.binding.lcn.internal.common.LcnDefs;
41 import org.openhab.binding.lcn.internal.common.LcnException;
42 import org.openhab.binding.lcn.internal.common.PckGenerator;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * This class represents a configured connection to one LCN-PCHK.
48 * It uses an {@link AsynchronousSocketChannel} to connect to LCN-PCHK.
51 * <li>Reconnection on connection loss
52 * <li>Segment scan (to detect the local segment ID)
53 * <li>Acknowledge handling
54 * <li>Periodic value requests
55 * <li>Caching of runtime data about the underlying LCN bus
58 * @author Fabian Wolter - Initial Contribution
61 public class Connection {
62 private final Logger logger = LoggerFactory.getLogger(Connection.class);
63 private static final int BROADCAST_MODULE_ID = 3;
64 private static final int BROADCAST_SEGMENT_ID = 3;
65 private final ConnectionSettings settings;
66 private final ConnectionCallback callback;
68 private AsynchronousSocketChannel channel;
69 /** The local segment id. -1 means "unknown". */
70 private int localSegId;
71 private final ByteBuffer readBuffer = ByteBuffer.allocate(1024);
72 private final ByteArrayOutputStream sendBuffer = new ByteArrayOutputStream();
73 private final Queue<@Nullable SendData> sendQueue = new LinkedBlockingQueue<>();
74 private final BlockingQueue<PckQueueItem> offlineSendQueue = new LinkedBlockingQueue<>();
75 private final Map<LcnAddr, ModInfo> modData = Collections.synchronizedMap(new HashMap<>());
76 private volatile boolean writeInProgress;
77 private final ScheduledExecutorService scheduler;
78 private final ConnectionStateMachine connectionStateMachine;
81 * Constructs a clean (disconnected) connection with the given settings.
82 * This does not start the actual connection process.
84 * @param sets the settings to use for the new connection
85 * @param callback the callback to the owner
88 public Connection(ConnectionSettings sets, ScheduledExecutorService scheduler, ConnectionCallback callback) {
90 this.callback = callback;
91 this.scheduler = scheduler;
92 this.clearRuntimeData();
94 connectionStateMachine = new ConnectionStateMachine(this, scheduler);
97 /** Clears all runtime data. */
98 void clearRuntimeData() {
100 this.localSegId = -1;
101 this.readBuffer.clear();
102 this.sendQueue.clear();
103 this.sendBuffer.reset();
107 * Retrieves the settings for this connection (never changed).
109 * @return the settings
111 public ConnectionSettings getSettings() {
112 return this.settings;
115 private boolean isSocketConnected() {
117 AsynchronousSocketChannel localChannel = channel;
118 return localChannel != null && localChannel.getRemoteAddress() != null;
119 } catch (IOException e) {
125 * Sets the local segment id.
127 * @param localSegId the new local segment id
129 public void setLocalSegId(int localSegId) {
130 this.localSegId = localSegId;
134 * Called whenever an acknowledge is received.
136 * @param addr the source LCN module
137 * @param code the LCN internal code (-1 = "positive")
139 public void onAck(LcnAddrMod addr, int code) {
140 synchronized (modData) {
141 if (modData.containsKey(addr)) {
142 ModInfo modInfo = modData.get(addr);
143 if (modInfo != null) {
144 modInfo.onAck(code, this, this.settings.getTimeout(), System.currentTimeMillis());
151 * Creates and/or returns cached data for the given LCN module.
153 * @param addr the module's address
156 public ModInfo updateModuleData(LcnAddrMod addr) {
157 return Objects.requireNonNull(modData.computeIfAbsent(addr, ModInfo::new));
161 * Reads and processes input from the underlying channel.
162 * Fragmented input is kept in {@link #readBuffer} and will be processed with the next call.
164 * @throws IOException if connection was closed or a generic channel error occurred
166 void readAndProcess() {
167 AsynchronousSocketChannel localChannel = channel;
168 if (localChannel != null && isSocketConnected()) {
169 localChannel.read(readBuffer, null, new CompletionHandler<@Nullable Integer, @Nullable Void>() {
171 public void completed(@Nullable Integer transmittedByteCount, @Nullable Void attachment) {
172 synchronized (Connection.this) {
173 if (transmittedByteCount == null || transmittedByteCount == -1) {
174 String msg = "Connection was closed by foreign host.";
175 connectionStateMachine.handleConnectionFailed(new LcnException(msg));
177 // read data chunks from socket and separate frames
179 int aPos = readBuffer.position(); // 0
180 String s = new String(readBuffer.array(), aPos, transmittedByteCount, LcnDefs.LCN_ENCODING);
181 int pos1 = 0, pos2 = s.indexOf(PckGenerator.TERMINATION, pos1);
183 String data = s.substring(pos1, pos2);
184 if (logger.isTraceEnabled()) {
185 logger.trace("Received: '{}'", data);
187 scheduler.submit(() -> {
188 connectionStateMachine.onInputReceived(data);
189 callback.onPckMessageReceived(data);
191 // Seek position in input array
192 aPos += s.substring(pos1, pos2 + 1).getBytes(LcnDefs.LCN_ENCODING).length;
195 pos2 = s.indexOf(PckGenerator.TERMINATION, pos1);
197 readBuffer.limit(readBuffer.capacity());
198 readBuffer.position(transmittedByteCount - aPos); // Keeps fragments for the next call
200 if (isSocketConnected()) {
208 public void failed(@Nullable Throwable e, @Nullable Void attachment) {
209 logger.debug("Lost connection");
210 connectionStateMachine.handleConnectionFailed(e);
214 connectionStateMachine.handleConnectionFailed(new LcnException("Socket not open"));
219 * Writes all queued data.
220 * Will try to write all data at once to reduce overhead.
222 public synchronized void triggerWriteToSocket() {
223 AsynchronousSocketChannel localChannel = channel;
224 if (localChannel == null || !isSocketConnected() || writeInProgress) {
228 SendData item = sendQueue.poll();
232 if (!item.write(sendBuffer, localSegId)) {
233 logger.warn("Data loss: Could not write packet into send buffer");
236 writeInProgress = true;
237 byte[] data = sendBuffer.toByteArray();
238 localChannel.write(ByteBuffer.wrap(data), null,
239 new CompletionHandler<@Nullable Integer, @Nullable Void>() {
241 public void completed(@Nullable Integer result, @Nullable Void attachment) {
242 synchronized (Connection.this) {
243 if (result != data.length) {
244 logger.warn("Data loss while writing to channel: {}", settings.getAddress());
246 if (logger.isTraceEnabled()) {
247 logger.trace("Sent: {}",
248 new String(data, 0, data.length, LcnDefs.LCN_ENCODING).trim());
252 writeInProgress = false;
254 if (!sendQueue.isEmpty()) {
256 * This could lead to stack overflows, since the CompletionHandler may run in
257 * the same Thread as triggerWriteToSocket() is invoked (see
258 * {@link AsynchronousChannelGroup}/Threading), but we do not expect as much
259 * data in one chunk here, that the stack can be filled in a critical way.
261 triggerWriteToSocket();
267 public void failed(@Nullable Throwable exc, @Nullable Void attachment) {
268 synchronized (Connection.this) {
270 logger.warn("Writing to channel \"{}\" failed: {}", settings.getAddress(),
273 writeInProgress = false;
274 connectionStateMachine.handleConnectionFailed(new LcnException("write() failed"));
278 } catch (BufferOverflowException | IOException e) {
279 logger.warn("Sending failed: {}: {}: {}", item, e.getClass().getSimpleName(), e.getMessage());
285 * Queues plain text to be sent to LCN-PCHK.
286 * Sending will be done the next time {@link #triggerWriteToSocket()} is called.
288 * @param plainText the text
290 public void queueDirectlyPlainText(String plainText) {
291 this.queueAndSend(new SendDataPlainText(plainText));
295 * Queues a PCK command to be sent.
297 * @param addr the target LCN address
298 * @param wantsAck true to wait for acknowledge on receipt (should be false for group addresses)
299 * @param pck the pure PCK command (without address header)
301 void queueDirectly(LcnAddr addr, boolean wantsAck, String pck) {
302 this.queueDirectly(addr, wantsAck, pck.getBytes(LcnDefs.LCN_ENCODING));
306 * Queues a PCK command for immediate sending, regardless of the Connection state. The PCK command is automatically
307 * re-sent if the destination is not a group, an Ack is requested and the module did not answer within the expected
310 * @param addr the target LCN address
311 * @param wantsAck true to wait for acknowledge on receipt (should be false for group addresses)
312 * @param data the pure PCK command (without address header)
314 void queueDirectly(LcnAddr addr, boolean wantsAck, byte[] data) {
315 if (!addr.isGroup() && wantsAck) {
316 this.updateModuleData((LcnAddrMod) addr).queuePckCommandWithAck(data, this, this.settings.getTimeout(),
317 System.currentTimeMillis());
319 this.queueAndSend(new SendDataPck(addr, false, data));
324 * Enqueues a raw PCK command and triggers the socket to start sending, if it does not already. Does not take care
327 * @param data raw PCK command
329 synchronized void queueAndSend(SendData data) {
330 this.sendQueue.add(data);
332 triggerWriteToSocket();
336 * Enqueues a PCK command to the offline queue. Data will be sent when the Connection state will enter
337 * {@link ConnectionStateConnected}.
339 * @param addr LCN module address
340 * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
341 * @param data the pure PCK command (without address header)
343 void queueOffline(LcnAddr addr, boolean wantsAck, byte[] data) {
344 offlineSendQueue.add(new PckQueueItem(addr, wantsAck, data));
348 * Enqueues a PCK command for sending. Takes care of the Connection state and buffers the command for a specific
349 * time if the Connection is not ready. If an Ack is requested, the PCK command is automatically
350 * re-sent, if the module did not answer in the expected time.
352 * @param addr LCN module address
353 * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
354 * @param pck the pure PCK command (without address header)
356 public void queue(LcnAddr addr, boolean wantsAck, String pck) {
357 this.queue(addr, wantsAck, pck.getBytes(LcnDefs.LCN_ENCODING));
361 * Enqueues a PCK command for sending. Takes care of the Connection state and buffers the command for a specific
362 * time if the Connection is not ready. If an Ack is requested, the PCK command is automatically
363 * re-sent, if the module did not answer in the expected time.
365 * @param addr LCN module address
366 * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
367 * @param pck the pure PCK command (without address header)
369 public void queue(LcnAddr addr, boolean wantsAck, byte[] pck) {
370 connectionStateMachine.queue(addr, wantsAck, pck);
374 * Process the offline PCK command queue. Does only send recently enqueued PCK commands, the rest is discarded.
376 void sendOfflineQueue() {
377 List<PckQueueItem> allItems = new ArrayList<>(offlineSendQueue.size());
378 offlineSendQueue.drainTo(allItems);
380 allItems.forEach(item -> {
381 // only send messages that were enqueued recently, discard older messages
382 long timeout = settings.getTimeout();
383 if (item.getEnqueued().isAfter(Instant.now().minus(timeout * 4, ChronoUnit.MILLIS))) {
384 queueDirectly(item.getAddr(), item.isWantsAck(), item.getData());
390 * Gets the Connection's callback.
392 * @return the callback
394 public ConnectionCallback getCallback() {
399 * Sets the SocketChannel of this Connection
401 * @param channel the new Channel
403 public void setSocketChannel(AsynchronousSocketChannel channel) {
404 this.channel = channel;
408 * Gets the SocketChannel of the Connection.
410 * @return the socket channel
413 public Channel getSocketChannel() {
418 * Gets the local segment ID. When no segments are used, the local segment ID is 0.
420 * @return the local segment ID
422 public int getLocalSegId() {
427 * Runs the periodic updates on all ModInfos.
429 public void updateModInfos() {
430 synchronized (modData) {
431 modData.values().forEach(i -> i.update(this, settings.getTimeout(), System.currentTimeMillis()));
436 * Removes an LCN module from the ModData list.
438 * @param addr the module's address to be removed
440 public void removeLcnModule(LcnAddr addr) {
441 modData.remove(addr);
445 * Invoked when this Connection shall be shut-down finally.
447 public void shutdown() {
448 connectionStateMachine.shutdownFinally();
452 * Sends a broadcast to all LCN modules with a reuqest to respond with an Ack.
454 public void sendModuleDiscoveryCommand() {
455 queueAndSend(new SendDataPck(new LcnAddrGrp(BROADCAST_SEGMENT_ID, BROADCAST_MODULE_ID), true,
456 PckGenerator.nullCommand().getBytes(LcnDefs.LCN_ENCODING)));
457 queueAndSend(new SendDataPck(new LcnAddrGrp(0, BROADCAST_MODULE_ID), true,
458 PckGenerator.nullCommand().getBytes(LcnDefs.LCN_ENCODING)));
462 * Requests the serial number and the firmware version of the given LCN module.
464 * @param addr module's address
466 public void sendSerialNumberRequest(LcnAddrMod addr) {
467 queueDirectly(addr, false, PckGenerator.requestSn());
471 * Requests theprogrammed name of the given LCN module.
473 * @param addr module's address
475 public void sendModuleNameRequest(LcnAddrMod addr) {
476 queueDirectly(addr, false, PckGenerator.requestModuleName(0));
477 queueDirectly(addr, false, PckGenerator.requestModuleName(1));