2 * Copyright (c) 2010-2020 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.lcn.internal.connection;
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.nio.BufferOverflowException;
18 import java.nio.ByteBuffer;
19 import java.nio.channels.AsynchronousSocketChannel;
20 import java.nio.channels.Channel;
21 import java.nio.channels.CompletionHandler;
22 import java.time.Instant;
23 import java.time.temporal.ChronoUnit;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.ScheduledExecutorService;
29 import org.eclipse.jdt.annotation.NonNullByDefault;
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.openhab.binding.lcn.internal.common.LcnAddr;
32 import org.openhab.binding.lcn.internal.common.LcnAddrGrp;
33 import org.openhab.binding.lcn.internal.common.LcnAddrMod;
34 import org.openhab.binding.lcn.internal.common.LcnDefs;
35 import org.openhab.binding.lcn.internal.common.LcnException;
36 import org.openhab.binding.lcn.internal.common.PckGenerator;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
41 * This class represents a configured connection to one LCN-PCHK.
42 * It uses a {@link AsynchronousSocketChannel} to connect to LCN-PCHK.
45 * <li>Reconnection on connection loss
46 * <li>Segment scan (to detect the local segment ID)
47 * <li>Acknowledge handling
48 * <li>Periodic value requests
49 * <li>Caching of runtime data about the underlying LCN bus
52 * @author Fabian Wolter - Initial Contribution
55 public class Connection {
56 private final Logger logger = LoggerFactory.getLogger(Connection.class);
57 private static final int BROADCAST_MODULE_ID = 3;
58 private static final int BROADCAST_SEGMENT_ID = 3;
59 private final ConnectionSettings settings;
60 private final ConnectionCallback callback;
62 private AsynchronousSocketChannel channel;
63 /** The local segment id. -1 means "unknown". */
64 private int localSegId;
65 private final ByteBuffer readBuffer = ByteBuffer.allocate(1024);
66 private final ByteArrayOutputStream sendBuffer = new ByteArrayOutputStream();
67 private final Queue<@Nullable SendData> sendQueue = new LinkedBlockingQueue<>();
68 private final BlockingQueue<PckQueueItem> offlineSendQueue = new LinkedBlockingQueue<>();
69 private final Map<LcnAddr, ModInfo> modData = Collections.synchronizedMap(new HashMap<>());
70 private volatile boolean writeInProgress;
71 private final ScheduledExecutorService scheduler;
72 private final ConnectionStateMachine connectionStateMachine;
75 * Constructs a clean (disconnected) connection with the given settings.
76 * This does not start the actual connection process.
78 * @param sets the settings to use for the new connection
79 * @param callback the callback to the owner
82 public Connection(ConnectionSettings sets, ScheduledExecutorService scheduler, ConnectionCallback callback) {
84 this.callback = callback;
85 this.scheduler = scheduler;
86 this.clearRuntimeData();
88 connectionStateMachine = new ConnectionStateMachine(this, scheduler);
91 /** Clears all runtime data. */
92 void clearRuntimeData() {
95 this.readBuffer.clear();
96 this.sendQueue.clear();
97 this.sendBuffer.reset();
101 * Retrieves the settings for this connection (never changed).
103 * @return the settings
105 public ConnectionSettings getSettings() {
106 return this.settings;
109 private boolean isSocketConnected() {
111 AsynchronousSocketChannel localChannel = channel;
112 return localChannel != null && localChannel.getRemoteAddress() != null;
113 } catch (IOException e) {
119 * Sets the local segment id.
121 * @param localSegId the new local segment id
123 public void setLocalSegId(int localSegId) {
124 this.localSegId = localSegId;
128 * Called whenever an acknowledge is received.
130 * @param addr the source LCN module
131 * @param code the LCN internal code (-1 = "positive")
133 public void onAck(LcnAddrMod addr, int code) {
134 synchronized (modData) {
135 if (modData.containsKey(addr)) {
136 modData.get(addr).onAck(code, this, this.settings.getTimeout(), System.nanoTime());
142 * Creates and/or returns cached data for the given LCN module.
144 * @param addr the module's address
147 public ModInfo updateModuleData(LcnAddrMod addr) {
148 return Objects.requireNonNull(modData.computeIfAbsent(addr, ModInfo::new));
152 * Reads and processes input from the underlying channel.
153 * Fragmented input is kept in {@link #readBuffer} and will be processed with the next call.
155 * @throws IOException if connection was closed or a generic channel error occurred
157 void readAndProcess() {
158 AsynchronousSocketChannel localChannel = channel;
159 if (localChannel != null && isSocketConnected()) {
160 localChannel.read(readBuffer, null, new CompletionHandler<@Nullable Integer, @Nullable Void>() {
162 public void completed(@Nullable Integer transmittedByteCount, @Nullable Void attachment) {
163 synchronized (Connection.this) {
164 if (transmittedByteCount == null || transmittedByteCount == -1) {
165 String msg = "Connection was closed by foreign host.";
166 connectionStateMachine.handleConnectionFailed(new LcnException(msg));
168 // read data chunks from socket and separate frames
170 int aPos = readBuffer.position(); // 0
171 String s = new String(readBuffer.array(), aPos, transmittedByteCount, LcnDefs.LCN_ENCODING);
172 int pos1 = 0, pos2 = s.indexOf(PckGenerator.TERMINATION, pos1);
174 String data = s.substring(pos1, pos2);
175 if (logger.isTraceEnabled()) {
176 logger.trace("Received: '{}'", data);
178 scheduler.submit(() -> {
179 connectionStateMachine.onInputReceived(data);
180 callback.onPckMessageReceived(data);
182 // Seek position in input array
183 aPos += s.substring(pos1, pos2 + 1).getBytes(LcnDefs.LCN_ENCODING).length;
186 pos2 = s.indexOf(PckGenerator.TERMINATION, pos1);
188 readBuffer.limit(readBuffer.capacity());
189 readBuffer.position(transmittedByteCount - aPos); // Keeps fragments for the next call
191 if (isSocketConnected()) {
199 public void failed(@Nullable Throwable e, @Nullable Void attachment) {
200 logger.debug("Lost connection");
201 connectionStateMachine.handleConnectionFailed(e);
205 connectionStateMachine.handleConnectionFailed(new LcnException("Socket not open"));
210 * Writes all queued data.
211 * Will try to write all data at once to reduce overhead.
213 public synchronized void triggerWriteToSocket() {
214 AsynchronousSocketChannel localChannel = channel;
215 if (localChannel == null || !isSocketConnected() || writeInProgress) {
219 SendData item = sendQueue.poll();
223 if (!item.write(sendBuffer, localSegId)) {
224 logger.warn("Data loss: Could not write packet into send buffer");
227 writeInProgress = true;
228 byte[] data = sendBuffer.toByteArray();
229 localChannel.write(ByteBuffer.wrap(data), null,
230 new CompletionHandler<@Nullable Integer, @Nullable Void>() {
232 public void completed(@Nullable Integer result, @Nullable Void attachment) {
233 synchronized (Connection.this) {
234 if (result != data.length) {
235 logger.warn("Data loss while writing to channel: {}", settings.getAddress());
237 if (logger.isTraceEnabled()) {
238 logger.trace("Sent: {}", new String(data, 0, data.length));
242 writeInProgress = false;
244 if (sendQueue.size() > 0) {
246 * This could lead to stack overflows, since the CompletionHandler may run in
247 * the same Thread as triggerWriteToSocket() is invoked (see
248 * {@link AsynchronousChannelGroup}/Threading), but we do not expect as much
249 * data in one chunk here, that the stack can be filled in a critical way.
251 triggerWriteToSocket();
257 public void failed(@Nullable Throwable exc, @Nullable Void attachment) {
258 synchronized (Connection.this) {
260 logger.warn("Writing to channel \"{}\" failed: {}", settings.getAddress(),
263 writeInProgress = false;
264 connectionStateMachine.handleConnectionFailed(new LcnException("write() failed"));
268 } catch (BufferOverflowException | IOException e) {
269 logger.warn("Sending failed: {}: {}: {}", item, e.getClass().getSimpleName(), e.getMessage());
275 * Queues plain text to be sent to LCN-PCHK.
276 * Sending will be done the next time {@link #triggerWriteToSocket()} is called.
278 * @param plainText the text
280 public void queueDirectlyPlainText(String plainText) {
281 this.queueAndSend(new SendDataPlainText(plainText));
285 * Queues a PCK command to be sent.
287 * @param addr the target LCN address
288 * @param wantsAck true to wait for acknowledge on receipt (should be false for group addresses)
289 * @param pck the pure PCK command (without address header)
291 void queueDirectly(LcnAddr addr, boolean wantsAck, String pck) {
292 this.queueDirectly(addr, wantsAck, pck.getBytes(LcnDefs.LCN_ENCODING));
296 * Queues a PCK command for immediate sending, regardless of the Connection state. The PCK command is automatically
297 * re-sent if the destination is not a group, an Ack is requested and the module did not answer within the expected
300 * @param addr the target LCN address
301 * @param wantsAck true to wait for acknowledge on receipt (should be false for group addresses)
302 * @param data the pure PCK command (without address header)
304 void queueDirectly(LcnAddr addr, boolean wantsAck, byte[] data) {
305 if (!addr.isGroup() && wantsAck) {
306 this.updateModuleData((LcnAddrMod) addr).queuePckCommandWithAck(data, this, this.settings.getTimeout(),
309 this.queueAndSend(new SendDataPck(addr, false, data));
314 * Enqueues a raw PCK command and triggers the socket to start sending, if it does not already. Does not take care
317 * @param data raw PCK command
319 synchronized void queueAndSend(SendData data) {
320 this.sendQueue.add(data);
322 triggerWriteToSocket();
326 * Enqueues a PCK command to the offline queue. Data will be sent when the Connection state will enter
327 * {@link ConnectionStateConnected}.
329 * @param addr LCN module address
330 * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
331 * @param data the pure PCK command (without address header)
333 void queueOffline(LcnAddr addr, boolean wantsAck, byte[] data) {
334 offlineSendQueue.add(new PckQueueItem(addr, wantsAck, data));
338 * Enqueues a PCK command for sending. Takes care of the Connection state and buffers the command for a specific
339 * time if the Connection is not ready. If an Ack is requested, the PCK command is automatically
340 * re-sent, if the module did not answer in the expected time.
342 * @param addr LCN module address
343 * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
344 * @param pck the pure PCK command (without address header)
346 public void queue(LcnAddr addr, boolean wantsAck, String pck) {
347 this.queue(addr, wantsAck, pck.getBytes(LcnDefs.LCN_ENCODING));
351 * Enqueues a PCK command for sending. Takes care of the Connection state and buffers the command for a specific
352 * time if the Connection is not ready. If an Ack is requested, the PCK command is automatically
353 * re-sent, if the module did not answer in the expected time.
355 * @param addr LCN module address
356 * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
357 * @param pck the pure PCK command (without address header)
359 public void queue(LcnAddr addr, boolean wantsAck, byte[] pck) {
360 connectionStateMachine.queue(addr, wantsAck, pck);
364 * Process the offline PCK command queue. Does only send recently enqueued PCK commands, the rest is discarded.
366 void sendOfflineQueue() {
367 List<PckQueueItem> allItems = new ArrayList<>(offlineSendQueue.size());
368 offlineSendQueue.drainTo(allItems);
370 allItems.forEach(item -> {
371 // only send messages that were enqueued recently, discard older messages
372 long timeout = settings.getTimeout();
373 if (item.getEnqueued().isAfter(Instant.now().minus(timeout * 4, ChronoUnit.MILLIS))) {
374 queueDirectly(item.getAddr(), item.isWantsAck(), item.getData());
380 * Gets the Connection's callback.
382 * @return the callback
384 public ConnectionCallback getCallback() {
389 * Sets the SocketChannel of this Connection
391 * @param channel the new Channel
393 public void setSocketChannel(AsynchronousSocketChannel channel) {
394 this.channel = channel;
398 * Gets the SocketChannel of the Connection.
400 * @returnthe socket channel
403 public Channel getSocketChannel() {
408 * Gets the local segment ID. When no segments are used, the local segment ID is 0.
410 * @return the local segment ID
412 public int getLocalSegId() {
417 * Runs the periodic updates on all ModInfos.
419 public void updateModInfos() {
420 synchronized (modData) {
421 modData.values().forEach(i -> i.update(this, settings.getTimeout(), System.nanoTime()));
426 * Removes an LCN module from the ModData list.
428 * @param addr the module's address to be removed
430 public void removeLcnModule(LcnAddr addr) {
431 modData.remove(addr);
435 * Invoked when this Connection shall be shut-down finally.
437 public void shutdown() {
438 connectionStateMachine.shutdownFinally();
442 * Sends a broadcast to all LCN modules with a reuqest to respond with an Ack.
444 public void sendModuleDiscoveryCommand() {
445 queueAndSend(new SendDataPck(new LcnAddrGrp(BROADCAST_SEGMENT_ID, BROADCAST_MODULE_ID), true,
446 PckGenerator.nullCommand().getBytes(LcnDefs.LCN_ENCODING)));
447 queueAndSend(new SendDataPck(new LcnAddrGrp(0, BROADCAST_MODULE_ID), true,
448 PckGenerator.nullCommand().getBytes(LcnDefs.LCN_ENCODING)));
452 * Requests the serial number and the firmware version of the given LCN module.
454 * @param addr module's address
456 public void sendSerialNumberRequest(LcnAddrMod addr) {
457 queueDirectly(addr, false, PckGenerator.requestSn());
461 * Requests theprogrammed name of the given LCN module.
463 * @param addr module's address
465 public void sendModuleNameRequest(LcnAddrMod addr) {
466 queueDirectly(addr, false, PckGenerator.requestModuleName(0));
467 queueDirectly(addr, false, PckGenerator.requestModuleName(1));