2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.lcn.internal.connection;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.nio.BufferOverflowException;
18 import java.nio.ByteBuffer;
19 import java.nio.channels.AsynchronousSocketChannel;
20 import java.nio.channels.Channel;
21 import java.nio.channels.CompletionHandler;
22 import java.time.Instant;
23 import java.time.temporal.ChronoUnit;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.Queue;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.concurrent.ScheduledExecutorService;
34 import org.eclipse.jdt.annotation.NonNullByDefault;
35 import org.eclipse.jdt.annotation.Nullable;
36 import org.openhab.binding.lcn.internal.common.LcnAddr;
37 import org.openhab.binding.lcn.internal.common.LcnAddrGrp;
38 import org.openhab.binding.lcn.internal.common.LcnAddrMod;
39 import org.openhab.binding.lcn.internal.common.LcnDefs;
40 import org.openhab.binding.lcn.internal.common.LcnException;
41 import org.openhab.binding.lcn.internal.common.PckGenerator;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * This class represents a configured connection to one LCN-PCHK.
47 * It uses a {@link AsynchronousSocketChannel} to connect to LCN-PCHK.
50 * <li>Reconnection on connection loss
51 * <li>Segment scan (to detect the local segment ID)
52 * <li>Acknowledge handling
53 * <li>Periodic value requests
54 * <li>Caching of runtime data about the underlying LCN bus
57 * @author Fabian Wolter - Initial Contribution
60 public class Connection {
61 private final Logger logger = LoggerFactory.getLogger(Connection.class);
62 private static final int BROADCAST_MODULE_ID = 3;
63 private static final int BROADCAST_SEGMENT_ID = 3;
64 private final ConnectionSettings settings;
65 private final ConnectionCallback callback;
67 private AsynchronousSocketChannel channel;
68 /** The local segment id. -1 means "unknown". */
69 private int localSegId;
70 private final ByteBuffer readBuffer = ByteBuffer.allocate(1024);
71 private final ByteArrayOutputStream sendBuffer = new ByteArrayOutputStream();
72 private final Queue<@Nullable SendData> sendQueue = new LinkedBlockingQueue<>();
73 private final BlockingQueue<PckQueueItem> offlineSendQueue = new LinkedBlockingQueue<>();
74 private final Map<LcnAddr, ModInfo> modData = Collections.synchronizedMap(new HashMap<>());
75 private volatile boolean writeInProgress;
76 private final ScheduledExecutorService scheduler;
77 private final ConnectionStateMachine connectionStateMachine;
80 * Constructs a clean (disconnected) connection with the given settings.
81 * This does not start the actual connection process.
83 * @param sets the settings to use for the new connection
84 * @param callback the callback to the owner
87 public Connection(ConnectionSettings sets, ScheduledExecutorService scheduler, ConnectionCallback callback) {
89 this.callback = callback;
90 this.scheduler = scheduler;
91 this.clearRuntimeData();
93 connectionStateMachine = new ConnectionStateMachine(this, scheduler);
96 /** Clears all runtime data. */
97 void clearRuntimeData() {
100 this.readBuffer.clear();
101 this.sendQueue.clear();
102 this.sendBuffer.reset();
106 * Retrieves the settings for this connection (never changed).
108 * @return the settings
110 public ConnectionSettings getSettings() {
111 return this.settings;
114 private boolean isSocketConnected() {
116 AsynchronousSocketChannel localChannel = channel;
117 return localChannel != null && localChannel.getRemoteAddress() != null;
118 } catch (IOException e) {
124 * Sets the local segment id.
126 * @param localSegId the new local segment id
128 public void setLocalSegId(int localSegId) {
129 this.localSegId = localSegId;
133 * Called whenever an acknowledge is received.
135 * @param addr the source LCN module
136 * @param code the LCN internal code (-1 = "positive")
138 public void onAck(LcnAddrMod addr, int code) {
139 synchronized (modData) {
140 if (modData.containsKey(addr)) {
141 modData.get(addr).onAck(code, this, this.settings.getTimeout(), System.nanoTime());
147 * Creates and/or returns cached data for the given LCN module.
149 * @param addr the module's address
152 public ModInfo updateModuleData(LcnAddrMod addr) {
153 return modData.computeIfAbsent(addr, ModInfo::new);
157 * Reads and processes input from the underlying channel.
158 * Fragmented input is kept in {@link #readBuffer} and will be processed with the next call.
160 * @throws IOException if connection was closed or a generic channel error occurred
162 void readAndProcess() {
163 AsynchronousSocketChannel localChannel = channel;
164 if (localChannel != null && isSocketConnected()) {
165 localChannel.read(readBuffer, null, new CompletionHandler<@Nullable Integer, @Nullable Void>() {
167 public void completed(@Nullable Integer transmittedByteCount, @Nullable Void attachment) {
168 synchronized (Connection.this) {
169 if (transmittedByteCount == null || transmittedByteCount == -1) {
170 String msg = "Connection was closed by foreign host.";
171 connectionStateMachine.handleConnectionFailed(new LcnException(msg));
173 // read data chunks from socket and separate frames
175 int aPos = readBuffer.position(); // 0
176 String s = new String(readBuffer.array(), aPos, transmittedByteCount, LcnDefs.LCN_ENCODING);
177 int pos1 = 0, pos2 = s.indexOf(PckGenerator.TERMINATION, pos1);
179 String data = s.substring(pos1, pos2);
180 if (logger.isTraceEnabled()) {
181 logger.trace("Received: '{}'", data);
183 scheduler.submit(() -> {
184 connectionStateMachine.onInputReceived(data);
185 callback.onPckMessageReceived(data);
187 // Seek position in input array
188 aPos += s.substring(pos1, pos2 + 1).getBytes(LcnDefs.LCN_ENCODING).length;
191 pos2 = s.indexOf(PckGenerator.TERMINATION, pos1);
193 readBuffer.limit(readBuffer.capacity());
194 readBuffer.position(transmittedByteCount - aPos); // Keeps fragments for the next call
196 if (isSocketConnected()) {
204 public void failed(@Nullable Throwable e, @Nullable Void attachment) {
205 logger.debug("Lost connection");
206 connectionStateMachine.handleConnectionFailed(e);
210 connectionStateMachine.handleConnectionFailed(new LcnException("Socket not open"));
215 * Writes all queued data.
216 * Will try to write all data at once to reduce overhead.
218 public synchronized void triggerWriteToSocket() {
219 AsynchronousSocketChannel localChannel = channel;
220 if (localChannel == null || !isSocketConnected() || writeInProgress) {
224 SendData item = sendQueue.poll();
228 if (!item.write(sendBuffer, localSegId)) {
229 logger.warn("Data loss: Could not write packet into send buffer");
232 writeInProgress = true;
233 byte[] data = sendBuffer.toByteArray();
234 localChannel.write(ByteBuffer.wrap(data), null,
235 new CompletionHandler<@Nullable Integer, @Nullable Void>() {
237 public void completed(@Nullable Integer result, @Nullable Void attachment) {
238 synchronized (Connection.this) {
239 if (result != data.length) {
240 logger.warn("Data loss while writing to channel: {}", settings.getAddress());
242 if (logger.isTraceEnabled()) {
243 logger.trace("Sent: {}", new String(data, 0, data.length));
247 writeInProgress = false;
249 if (sendQueue.size() > 0) {
251 * This could lead to stack overflows, since the CompletionHandler may run in
252 * the same Thread as triggerWriteToSocket() is invoked (see
253 * {@link AsynchronousChannelGroup}/Threading), but we do not expect as much
254 * data in one chunk here, that the stack can be filled in a critical way.
256 triggerWriteToSocket();
262 public void failed(@Nullable Throwable exc, @Nullable Void attachment) {
263 synchronized (Connection.this) {
265 logger.warn("Writing to channel \"{}\" failed: {}", settings.getAddress(),
268 writeInProgress = false;
269 connectionStateMachine.handleConnectionFailed(new LcnException("write() failed"));
273 } catch (BufferOverflowException | IOException e) {
274 logger.warn("Sending failed: {}: {}: {}", item, e.getClass().getSimpleName(), e.getMessage());
280 * Queues plain text to be sent to LCN-PCHK.
281 * Sending will be done the next time {@link #triggerWriteToSocket()} is called.
283 * @param plainText the text
285 public void queueDirectlyPlainText(String plainText) {
286 this.queueAndSend(new SendDataPlainText(plainText));
290 * Queues a PCK command to be sent.
292 * @param addr the target LCN address
293 * @param wantsAck true to wait for acknowledge on receipt (should be false for group addresses)
294 * @param pck the pure PCK command (without address header)
296 void queueDirectly(LcnAddr addr, boolean wantsAck, String pck) {
297 this.queueDirectly(addr, wantsAck, pck.getBytes(LcnDefs.LCN_ENCODING));
301 * Queues a PCK command for immediate sending, regardless of the Connection state. The PCK command is automatically
302 * re-sent if the destination is not a group, an Ack is requested and the module did not answer within the expected
305 * @param addr the target LCN address
306 * @param wantsAck true to wait for acknowledge on receipt (should be false for group addresses)
307 * @param data the pure PCK command (without address header)
309 void queueDirectly(LcnAddr addr, boolean wantsAck, byte[] data) {
310 if (!addr.isGroup() && wantsAck) {
311 this.updateModuleData((LcnAddrMod) addr).queuePckCommandWithAck(data, this, this.settings.getTimeout(),
314 this.queueAndSend(new SendDataPck(addr, false, data));
319 * Enqueues a raw PCK command and triggers the socket to start sending, if it does not already. Does not take care
322 * @param data raw PCK command
324 synchronized void queueAndSend(SendData data) {
325 this.sendQueue.add(data);
327 triggerWriteToSocket();
331 * Enqueues a PCK command to the offline queue. Data will be sent when the Connection state will enter
332 * {@link ConnectionStateConnected}.
334 * @param addr LCN module address
335 * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
336 * @param data the pure PCK command (without address header)
338 void queueOffline(LcnAddr addr, boolean wantsAck, byte[] data) {
339 offlineSendQueue.add(new PckQueueItem(addr, wantsAck, data));
343 * Enqueues a PCK command for sending. Takes care of the Connection state and buffers the command for a specific
344 * time if the Connection is not ready. If an Ack is requested, the PCK command is automatically
345 * re-sent, if the module did not answer in the expected time.
347 * @param addr LCN module address
348 * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
349 * @param pck the pure PCK command (without address header)
351 public void queue(LcnAddr addr, boolean wantsAck, String pck) {
352 this.queue(addr, wantsAck, pck.getBytes(LcnDefs.LCN_ENCODING));
356 * Enqueues a PCK command for sending. Takes care of the Connection state and buffers the command for a specific
357 * time if the Connection is not ready. If an Ack is requested, the PCK command is automatically
358 * re-sent, if the module did not answer in the expected time.
360 * @param addr LCN module address
361 * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
362 * @param pck the pure PCK command (without address header)
364 public void queue(LcnAddr addr, boolean wantsAck, byte[] pck) {
365 connectionStateMachine.queue(addr, wantsAck, pck);
369 * Process the offline PCK command queue. Does only send recently enqueued PCK commands, the rest is discarded.
371 void sendOfflineQueue() {
372 List<PckQueueItem> allItems = new ArrayList<>(offlineSendQueue.size());
373 offlineSendQueue.drainTo(allItems);
375 allItems.forEach(item -> {
376 // only send messages that were enqueued recently, discard older messages
377 long timeout = settings.getTimeout();
378 if (item.getEnqueued().isAfter(Instant.now().minus(timeout * 4, ChronoUnit.MILLIS))) {
379 queueDirectly(item.getAddr(), item.isWantsAck(), item.getData());
385 * Gets the Connection's callback.
387 * @return the callback
389 public ConnectionCallback getCallback() {
394 * Sets the SocketChannel of this Connection
396 * @param channel the new Channel
398 public void setSocketChannel(AsynchronousSocketChannel channel) {
399 this.channel = channel;
403 * Gets the SocketChannel of the Connection.
405 * @returnthe socket channel
408 public Channel getSocketChannel() {
413 * Gets the local segment ID. When no segments are used, the local segment ID is 0.
415 * @return the local segment ID
417 public int getLocalSegId() {
422 * Runs the periodic updates on all ModInfos.
424 public void updateModInfos() {
425 synchronized (modData) {
426 modData.values().forEach(i -> i.update(this, settings.getTimeout(), System.nanoTime()));
431 * Removes an LCN module from the ModData list.
433 * @param addr the module's address to be removed
435 public void removeLcnModule(LcnAddr addr) {
436 modData.remove(addr);
440 * Invoked when this Connection shall be shut-down finally.
442 public void shutdown() {
443 connectionStateMachine.shutdownFinally();
447 * Sends a broadcast to all LCN modules with a reuqest to respond with an Ack.
449 public void sendModuleDiscoveryCommand() {
450 queueAndSend(new SendDataPck(new LcnAddrGrp(BROADCAST_SEGMENT_ID, BROADCAST_MODULE_ID), true,
451 PckGenerator.nullCommand().getBytes(LcnDefs.LCN_ENCODING)));
452 queueAndSend(new SendDataPck(new LcnAddrGrp(0, BROADCAST_MODULE_ID), true,
453 PckGenerator.nullCommand().getBytes(LcnDefs.LCN_ENCODING)));
457 * Requests the serial number and the firmware version of the given LCN module.
459 * @param addr module's address
461 public void sendSerialNumberRequest(LcnAddrMod addr) {
462 queueDirectly(addr, false, PckGenerator.requestSn());
466 * Requests theprogrammed name of the given LCN module.
468 * @param addr module's address
470 public void sendModuleNameRequest(LcnAddrMod addr) {
471 queueDirectly(addr, false, PckGenerator.requestModuleName(0));
472 queueDirectly(addr, false, PckGenerator.requestModuleName(1));