2 * Copyright (c) 2010-2021 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.lcn.internal.connection;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.nio.BufferOverflowException;
18 import java.nio.ByteBuffer;
19 import java.nio.channels.AsynchronousSocketChannel;
20 import java.nio.channels.Channel;
21 import java.nio.channels.CompletionHandler;
22 import java.time.Instant;
23 import java.time.temporal.ChronoUnit;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.Objects;
30 import java.util.Queue;
31 import java.util.concurrent.BlockingQueue;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.ScheduledExecutorService;
35 import org.eclipse.jdt.annotation.NonNullByDefault;
36 import org.eclipse.jdt.annotation.Nullable;
37 import org.openhab.binding.lcn.internal.common.LcnAddr;
38 import org.openhab.binding.lcn.internal.common.LcnAddrGrp;
39 import org.openhab.binding.lcn.internal.common.LcnAddrMod;
40 import org.openhab.binding.lcn.internal.common.LcnDefs;
41 import org.openhab.binding.lcn.internal.common.LcnException;
42 import org.openhab.binding.lcn.internal.common.PckGenerator;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * This class represents a configured connection to one LCN-PCHK.
48 * It uses a {@link AsynchronousSocketChannel} to connect to LCN-PCHK.
51 * <li>Reconnection on connection loss
52 * <li>Segment scan (to detect the local segment ID)
53 * <li>Acknowledge handling
54 * <li>Periodic value requests
55 * <li>Caching of runtime data about the underlying LCN bus
58 * @author Fabian Wolter - Initial Contribution
61 public class Connection {
62 private final Logger logger = LoggerFactory.getLogger(Connection.class);
63 private static final int BROADCAST_MODULE_ID = 3;
64 private static final int BROADCAST_SEGMENT_ID = 3;
65 private final ConnectionSettings settings;
66 private final ConnectionCallback callback;
68 private AsynchronousSocketChannel channel;
69 /** The local segment id. -1 means "unknown". */
70 private int localSegId;
71 private final ByteBuffer readBuffer = ByteBuffer.allocate(1024);
72 private final ByteArrayOutputStream sendBuffer = new ByteArrayOutputStream();
73 private final Queue<@Nullable SendData> sendQueue = new LinkedBlockingQueue<>();
74 private final BlockingQueue<PckQueueItem> offlineSendQueue = new LinkedBlockingQueue<>();
75 private final Map<LcnAddr, ModInfo> modData = Collections.synchronizedMap(new HashMap<>());
76 private volatile boolean writeInProgress;
77 private final ScheduledExecutorService scheduler;
78 private final ConnectionStateMachine connectionStateMachine;
81 * Constructs a clean (disconnected) connection with the given settings.
82 * This does not start the actual connection process.
84 * @param sets the settings to use for the new connection
85 * @param callback the callback to the owner
88 public Connection(ConnectionSettings sets, ScheduledExecutorService scheduler, ConnectionCallback callback) {
90 this.callback = callback;
91 this.scheduler = scheduler;
92 this.clearRuntimeData();
94 connectionStateMachine = new ConnectionStateMachine(this, scheduler);
97 /** Clears all runtime data. */
98 void clearRuntimeData() {
100 this.localSegId = -1;
101 this.readBuffer.clear();
102 this.sendQueue.clear();
103 this.sendBuffer.reset();
107 * Retrieves the settings for this connection (never changed).
109 * @return the settings
111 public ConnectionSettings getSettings() {
112 return this.settings;
115 private boolean isSocketConnected() {
117 AsynchronousSocketChannel localChannel = channel;
118 return localChannel != null && localChannel.getRemoteAddress() != null;
119 } catch (IOException e) {
125 * Sets the local segment id.
127 * @param localSegId the new local segment id
129 public void setLocalSegId(int localSegId) {
130 this.localSegId = localSegId;
134 * Called whenever an acknowledge is received.
136 * @param addr the source LCN module
137 * @param code the LCN internal code (-1 = "positive")
139 public void onAck(LcnAddrMod addr, int code) {
140 synchronized (modData) {
141 if (modData.containsKey(addr)) {
142 ModInfo modInfo = modData.get(addr);
143 if (modInfo != null) {
144 modInfo.onAck(code, this, this.settings.getTimeout(), System.nanoTime());
151 * Creates and/or returns cached data for the given LCN module.
153 * @param addr the module's address
156 public ModInfo updateModuleData(LcnAddrMod addr) {
157 return Objects.requireNonNull(modData.computeIfAbsent(addr, ModInfo::new));
161 * Reads and processes input from the underlying channel.
162 * Fragmented input is kept in {@link #readBuffer} and will be processed with the next call.
164 * @throws IOException if connection was closed or a generic channel error occurred
166 void readAndProcess() {
167 AsynchronousSocketChannel localChannel = channel;
168 if (localChannel != null && isSocketConnected()) {
169 localChannel.read(readBuffer, null, new CompletionHandler<@Nullable Integer, @Nullable Void>() {
171 public void completed(@Nullable Integer transmittedByteCount, @Nullable Void attachment) {
172 synchronized (Connection.this) {
173 if (transmittedByteCount == null || transmittedByteCount == -1) {
174 String msg = "Connection was closed by foreign host.";
175 connectionStateMachine.handleConnectionFailed(new LcnException(msg));
177 // read data chunks from socket and separate frames
179 int aPos = readBuffer.position(); // 0
180 String s = new String(readBuffer.array(), aPos, transmittedByteCount, LcnDefs.LCN_ENCODING);
181 int pos1 = 0, pos2 = s.indexOf(PckGenerator.TERMINATION, pos1);
183 String data = s.substring(pos1, pos2);
184 if (logger.isTraceEnabled()) {
185 logger.trace("Received: '{}'", data);
187 scheduler.submit(() -> {
188 connectionStateMachine.onInputReceived(data);
189 callback.onPckMessageReceived(data);
191 // Seek position in input array
192 aPos += s.substring(pos1, pos2 + 1).getBytes(LcnDefs.LCN_ENCODING).length;
195 pos2 = s.indexOf(PckGenerator.TERMINATION, pos1);
197 readBuffer.limit(readBuffer.capacity());
198 readBuffer.position(transmittedByteCount - aPos); // Keeps fragments for the next call
200 if (isSocketConnected()) {
208 public void failed(@Nullable Throwable e, @Nullable Void attachment) {
209 logger.debug("Lost connection");
210 connectionStateMachine.handleConnectionFailed(e);
214 connectionStateMachine.handleConnectionFailed(new LcnException("Socket not open"));
219 * Writes all queued data.
220 * Will try to write all data at once to reduce overhead.
222 public synchronized void triggerWriteToSocket() {
223 AsynchronousSocketChannel localChannel = channel;
224 if (localChannel == null || !isSocketConnected() || writeInProgress) {
228 SendData item = sendQueue.poll();
232 if (!item.write(sendBuffer, localSegId)) {
233 logger.warn("Data loss: Could not write packet into send buffer");
236 writeInProgress = true;
237 byte[] data = sendBuffer.toByteArray();
238 localChannel.write(ByteBuffer.wrap(data), null,
239 new CompletionHandler<@Nullable Integer, @Nullable Void>() {
241 public void completed(@Nullable Integer result, @Nullable Void attachment) {
242 synchronized (Connection.this) {
243 if (result != data.length) {
244 logger.warn("Data loss while writing to channel: {}", settings.getAddress());
246 if (logger.isTraceEnabled()) {
247 logger.trace("Sent: {}", new String(data, 0, data.length));
251 writeInProgress = false;
253 if (sendQueue.size() > 0) {
255 * This could lead to stack overflows, since the CompletionHandler may run in
256 * the same Thread as triggerWriteToSocket() is invoked (see
257 * {@link AsynchronousChannelGroup}/Threading), but we do not expect as much
258 * data in one chunk here, that the stack can be filled in a critical way.
260 triggerWriteToSocket();
266 public void failed(@Nullable Throwable exc, @Nullable Void attachment) {
267 synchronized (Connection.this) {
269 logger.warn("Writing to channel \"{}\" failed: {}", settings.getAddress(),
272 writeInProgress = false;
273 connectionStateMachine.handleConnectionFailed(new LcnException("write() failed"));
277 } catch (BufferOverflowException | IOException e) {
278 logger.warn("Sending failed: {}: {}: {}", item, e.getClass().getSimpleName(), e.getMessage());
284 * Queues plain text to be sent to LCN-PCHK.
285 * Sending will be done the next time {@link #triggerWriteToSocket()} is called.
287 * @param plainText the text
289 public void queueDirectlyPlainText(String plainText) {
290 this.queueAndSend(new SendDataPlainText(plainText));
294 * Queues a PCK command to be sent.
296 * @param addr the target LCN address
297 * @param wantsAck true to wait for acknowledge on receipt (should be false for group addresses)
298 * @param pck the pure PCK command (without address header)
300 void queueDirectly(LcnAddr addr, boolean wantsAck, String pck) {
301 this.queueDirectly(addr, wantsAck, pck.getBytes(LcnDefs.LCN_ENCODING));
305 * Queues a PCK command for immediate sending, regardless of the Connection state. The PCK command is automatically
306 * re-sent if the destination is not a group, an Ack is requested and the module did not answer within the expected
309 * @param addr the target LCN address
310 * @param wantsAck true to wait for acknowledge on receipt (should be false for group addresses)
311 * @param data the pure PCK command (without address header)
313 void queueDirectly(LcnAddr addr, boolean wantsAck, byte[] data) {
314 if (!addr.isGroup() && wantsAck) {
315 this.updateModuleData((LcnAddrMod) addr).queuePckCommandWithAck(data, this, this.settings.getTimeout(),
318 this.queueAndSend(new SendDataPck(addr, false, data));
323 * Enqueues a raw PCK command and triggers the socket to start sending, if it does not already. Does not take care
326 * @param data raw PCK command
328 synchronized void queueAndSend(SendData data) {
329 this.sendQueue.add(data);
331 triggerWriteToSocket();
335 * Enqueues a PCK command to the offline queue. Data will be sent when the Connection state will enter
336 * {@link ConnectionStateConnected}.
338 * @param addr LCN module address
339 * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
340 * @param data the pure PCK command (without address header)
342 void queueOffline(LcnAddr addr, boolean wantsAck, byte[] data) {
343 offlineSendQueue.add(new PckQueueItem(addr, wantsAck, data));
347 * Enqueues a PCK command for sending. Takes care of the Connection state and buffers the command for a specific
348 * time if the Connection is not ready. If an Ack is requested, the PCK command is automatically
349 * re-sent, if the module did not answer in the expected time.
351 * @param addr LCN module address
352 * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
353 * @param pck the pure PCK command (without address header)
355 public void queue(LcnAddr addr, boolean wantsAck, String pck) {
356 this.queue(addr, wantsAck, pck.getBytes(LcnDefs.LCN_ENCODING));
360 * Enqueues a PCK command for sending. Takes care of the Connection state and buffers the command for a specific
361 * time if the Connection is not ready. If an Ack is requested, the PCK command is automatically
362 * re-sent, if the module did not answer in the expected time.
364 * @param addr LCN module address
365 * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
366 * @param pck the pure PCK command (without address header)
368 public void queue(LcnAddr addr, boolean wantsAck, byte[] pck) {
369 connectionStateMachine.queue(addr, wantsAck, pck);
373 * Process the offline PCK command queue. Does only send recently enqueued PCK commands, the rest is discarded.
375 void sendOfflineQueue() {
376 List<PckQueueItem> allItems = new ArrayList<>(offlineSendQueue.size());
377 offlineSendQueue.drainTo(allItems);
379 allItems.forEach(item -> {
380 // only send messages that were enqueued recently, discard older messages
381 long timeout = settings.getTimeout();
382 if (item.getEnqueued().isAfter(Instant.now().minus(timeout * 4, ChronoUnit.MILLIS))) {
383 queueDirectly(item.getAddr(), item.isWantsAck(), item.getData());
389 * Gets the Connection's callback.
391 * @return the callback
393 public ConnectionCallback getCallback() {
398 * Sets the SocketChannel of this Connection
400 * @param channel the new Channel
402 public void setSocketChannel(AsynchronousSocketChannel channel) {
403 this.channel = channel;
407 * Gets the SocketChannel of the Connection.
409 * @returnthe socket channel
412 public Channel getSocketChannel() {
417 * Gets the local segment ID. When no segments are used, the local segment ID is 0.
419 * @return the local segment ID
421 public int getLocalSegId() {
426 * Runs the periodic updates on all ModInfos.
428 public void updateModInfos() {
429 synchronized (modData) {
430 modData.values().forEach(i -> i.update(this, settings.getTimeout(), System.nanoTime()));
435 * Removes an LCN module from the ModData list.
437 * @param addr the module's address to be removed
439 public void removeLcnModule(LcnAddr addr) {
440 modData.remove(addr);
444 * Invoked when this Connection shall be shut-down finally.
446 public void shutdown() {
447 connectionStateMachine.shutdownFinally();
451 * Sends a broadcast to all LCN modules with a reuqest to respond with an Ack.
453 public void sendModuleDiscoveryCommand() {
454 queueAndSend(new SendDataPck(new LcnAddrGrp(BROADCAST_SEGMENT_ID, BROADCAST_MODULE_ID), true,
455 PckGenerator.nullCommand().getBytes(LcnDefs.LCN_ENCODING)));
456 queueAndSend(new SendDataPck(new LcnAddrGrp(0, BROADCAST_MODULE_ID), true,
457 PckGenerator.nullCommand().getBytes(LcnDefs.LCN_ENCODING)));
461 * Requests the serial number and the firmware version of the given LCN module.
463 * @param addr module's address
465 public void sendSerialNumberRequest(LcnAddrMod addr) {
466 queueDirectly(addr, false, PckGenerator.requestSn());
470 * Requests theprogrammed name of the given LCN module.
472 * @param addr module's address
474 public void sendModuleNameRequest(LcnAddrMod addr) {
475 queueDirectly(addr, false, PckGenerator.requestModuleName(0));
476 queueDirectly(addr, false, PckGenerator.requestModuleName(1));