]> git.basschouten.com Git - openhab-addons.git/blob
6ffc5264f85a0e109f1ba7a4bcc209048d8ec3be
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.lcn.internal.connection;
14
15 import java.io.ByteArrayOutputStream;
16 import java.io.IOException;
17 import java.nio.BufferOverflowException;
18 import java.nio.ByteBuffer;
19 import java.nio.channels.AsynchronousSocketChannel;
20 import java.nio.channels.Channel;
21 import java.nio.channels.CompletionHandler;
22 import java.time.Instant;
23 import java.time.temporal.ChronoUnit;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Objects;
30 import java.util.Queue;
31 import java.util.concurrent.BlockingQueue;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.ScheduledExecutorService;
34
35 import org.eclipse.jdt.annotation.NonNullByDefault;
36 import org.eclipse.jdt.annotation.Nullable;
37 import org.openhab.binding.lcn.internal.common.LcnAddr;
38 import org.openhab.binding.lcn.internal.common.LcnAddrGrp;
39 import org.openhab.binding.lcn.internal.common.LcnAddrMod;
40 import org.openhab.binding.lcn.internal.common.LcnDefs;
41 import org.openhab.binding.lcn.internal.common.LcnException;
42 import org.openhab.binding.lcn.internal.common.PckGenerator;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * This class represents a configured connection to one LCN-PCHK.
48  * It uses an {@link AsynchronousSocketChannel} to connect to LCN-PCHK.
49  * Included logic:
50  * <ul>
51  * <li>Reconnection on connection loss
52  * <li>Segment scan (to detect the local segment ID)
53  * <li>Acknowledge handling
54  * <li>Periodic value requests
55  * <li>Caching of runtime data about the underlying LCN bus
56  * </ul>
57  *
58  * @author Fabian Wolter - Initial Contribution
59  */
60 @NonNullByDefault
61 public class Connection {
62     private final Logger logger = LoggerFactory.getLogger(Connection.class);
63     private static final int BROADCAST_MODULE_ID = 3;
64     private static final int BROADCAST_SEGMENT_ID = 3;
65     private final ConnectionSettings settings;
66     private final ConnectionCallback callback;
67     @Nullable
68     private AsynchronousSocketChannel channel;
69     /** The local segment id. -1 means "unknown". */
70     private int localSegId;
71     private final ByteBuffer readBuffer = ByteBuffer.allocate(1024);
72     private final ByteArrayOutputStream sendBuffer = new ByteArrayOutputStream();
73     private final Queue<@Nullable SendData> sendQueue = new LinkedBlockingQueue<>();
74     private final BlockingQueue<PckQueueItem> offlineSendQueue = new LinkedBlockingQueue<>();
75     private final Map<LcnAddr, ModInfo> modData = Collections.synchronizedMap(new HashMap<>());
76     private volatile boolean writeInProgress;
77     private final ScheduledExecutorService scheduler;
78     private final ConnectionStateMachine connectionStateMachine;
79
80     /**
81      * Constructs a clean (disconnected) connection with the given settings.
82      * This does not start the actual connection process.
83      *
84      * @param sets the settings to use for the new connection
85      * @param callback the callback to the owner
86      * @throws IOException
87      */
88     public Connection(ConnectionSettings sets, ScheduledExecutorService scheduler, ConnectionCallback callback) {
89         this.settings = sets;
90         this.callback = callback;
91         this.scheduler = scheduler;
92         this.clearRuntimeData();
93
94         connectionStateMachine = new ConnectionStateMachine(this, scheduler);
95     }
96
97     /** Clears all runtime data. */
98     void clearRuntimeData() {
99         this.channel = null;
100         this.localSegId = -1;
101         this.readBuffer.clear();
102         this.sendQueue.clear();
103         this.sendBuffer.reset();
104     }
105
106     /**
107      * Retrieves the settings for this connection (never changed).
108      *
109      * @return the settings
110      */
111     public ConnectionSettings getSettings() {
112         return this.settings;
113     }
114
115     private boolean isSocketConnected() {
116         try {
117             AsynchronousSocketChannel localChannel = channel;
118             return localChannel != null && localChannel.getRemoteAddress() != null;
119         } catch (IOException e) {
120             return false;
121         }
122     }
123
124     /**
125      * Sets the local segment id.
126      *
127      * @param localSegId the new local segment id
128      */
129     public void setLocalSegId(int localSegId) {
130         this.localSegId = localSegId;
131     }
132
133     /**
134      * Called whenever an acknowledge is received.
135      *
136      * @param addr the source LCN module
137      * @param code the LCN internal code (-1 = "positive")
138      */
139     public void onAck(LcnAddrMod addr, int code) {
140         synchronized (modData) {
141             if (modData.containsKey(addr)) {
142                 ModInfo modInfo = modData.get(addr);
143                 if (modInfo != null) {
144                     modInfo.onAck(code, this, this.settings.getTimeout(), System.currentTimeMillis());
145                 }
146             }
147         }
148     }
149
150     /**
151      * Creates and/or returns cached data for the given LCN module.
152      *
153      * @param addr the module's address
154      * @return the data
155      */
156     public ModInfo updateModuleData(LcnAddrMod addr) {
157         return Objects.requireNonNull(modData.computeIfAbsent(addr, ModInfo::new));
158     }
159
160     /**
161      * Reads and processes input from the underlying channel.
162      * Fragmented input is kept in {@link #readBuffer} and will be processed with the next call.
163      *
164      * @throws IOException if connection was closed or a generic channel error occurred
165      */
166     void readAndProcess() {
167         AsynchronousSocketChannel localChannel = channel;
168         if (localChannel != null && isSocketConnected()) {
169             localChannel.read(readBuffer, null, new CompletionHandler<@Nullable Integer, @Nullable Void>() {
170                 @Override
171                 public void completed(@Nullable Integer transmittedByteCount, @Nullable Void attachment) {
172                     synchronized (Connection.this) {
173                         if (transmittedByteCount == null || transmittedByteCount == -1) {
174                             String msg = "Connection was closed by foreign host.";
175                             connectionStateMachine.handleConnectionFailed(new LcnException(msg));
176                         } else {
177                             // read data chunks from socket and separate frames
178                             readBuffer.flip();
179                             int aPos = readBuffer.position(); // 0
180                             String s = new String(readBuffer.array(), aPos, transmittedByteCount, LcnDefs.LCN_ENCODING);
181                             int pos1 = 0, pos2 = s.indexOf(PckGenerator.TERMINATION, pos1);
182                             while (pos2 != -1) {
183                                 String data = s.substring(pos1, pos2);
184                                 if (logger.isTraceEnabled()) {
185                                     logger.trace("Received: '{}'", data);
186                                 }
187                                 scheduler.submit(() -> {
188                                     connectionStateMachine.onInputReceived(data);
189                                     callback.onPckMessageReceived(data);
190                                 });
191                                 // Seek position in input array
192                                 aPos += s.substring(pos1, pos2 + 1).getBytes(LcnDefs.LCN_ENCODING).length;
193                                 // Next input
194                                 pos1 = pos2 + 1;
195                                 pos2 = s.indexOf(PckGenerator.TERMINATION, pos1);
196                             }
197                             readBuffer.limit(readBuffer.capacity());
198                             readBuffer.position(transmittedByteCount - aPos); // Keeps fragments for the next call
199
200                             if (isSocketConnected()) {
201                                 readAndProcess();
202                             }
203                         }
204                     }
205                 }
206
207                 @Override
208                 public void failed(@Nullable Throwable e, @Nullable Void attachment) {
209                     logger.debug("Lost connection");
210                     connectionStateMachine.handleConnectionFailed(e);
211                 }
212             });
213         } else {
214             connectionStateMachine.handleConnectionFailed(new LcnException("Socket not open"));
215         }
216     }
217
218     /**
219      * Writes all queued data.
220      * Will try to write all data at once to reduce overhead.
221      */
222     public synchronized void triggerWriteToSocket() {
223         AsynchronousSocketChannel localChannel = channel;
224         if (localChannel == null || !isSocketConnected() || writeInProgress) {
225             return;
226         }
227         sendBuffer.reset();
228         SendData item = sendQueue.poll();
229
230         if (item != null) {
231             try {
232                 if (!item.write(sendBuffer, localSegId)) {
233                     logger.warn("Data loss: Could not write packet into send buffer");
234                 }
235
236                 writeInProgress = true;
237                 byte[] data = sendBuffer.toByteArray();
238                 localChannel.write(ByteBuffer.wrap(data), null,
239                         new CompletionHandler<@Nullable Integer, @Nullable Void>() {
240                             @Override
241                             public void completed(@Nullable Integer result, @Nullable Void attachment) {
242                                 synchronized (Connection.this) {
243                                     if (result != data.length) {
244                                         logger.warn("Data loss while writing to channel: {}", settings.getAddress());
245                                     } else {
246                                         if (logger.isTraceEnabled()) {
247                                             logger.trace("Sent: {}",
248                                                     new String(data, 0, data.length, LcnDefs.LCN_ENCODING).trim());
249                                         }
250                                     }
251
252                                     writeInProgress = false;
253
254                                     if (sendQueue.size() > 0) {
255                                         /**
256                                          * This could lead to stack overflows, since the CompletionHandler may run in
257                                          * the same Thread as triggerWriteToSocket() is invoked (see
258                                          * {@link AsynchronousChannelGroup}/Threading), but we do not expect as much
259                                          * data in one chunk here, that the stack can be filled in a critical way.
260                                          */
261                                         triggerWriteToSocket();
262                                     }
263                                 }
264                             }
265
266                             @Override
267                             public void failed(@Nullable Throwable exc, @Nullable Void attachment) {
268                                 synchronized (Connection.this) {
269                                     if (exc != null) {
270                                         logger.warn("Writing to channel \"{}\" failed: {}", settings.getAddress(),
271                                                 exc.getMessage());
272                                     }
273                                     writeInProgress = false;
274                                     connectionStateMachine.handleConnectionFailed(new LcnException("write() failed"));
275                                 }
276                             }
277                         });
278             } catch (BufferOverflowException | IOException e) {
279                 logger.warn("Sending failed: {}: {}: {}", item, e.getClass().getSimpleName(), e.getMessage());
280             }
281         }
282     }
283
284     /**
285      * Queues plain text to be sent to LCN-PCHK.
286      * Sending will be done the next time {@link #triggerWriteToSocket()} is called.
287      *
288      * @param plainText the text
289      */
290     public void queueDirectlyPlainText(String plainText) {
291         this.queueAndSend(new SendDataPlainText(plainText));
292     }
293
294     /**
295      * Queues a PCK command to be sent.
296      *
297      * @param addr the target LCN address
298      * @param wantsAck true to wait for acknowledge on receipt (should be false for group addresses)
299      * @param pck the pure PCK command (without address header)
300      */
301     void queueDirectly(LcnAddr addr, boolean wantsAck, String pck) {
302         this.queueDirectly(addr, wantsAck, pck.getBytes(LcnDefs.LCN_ENCODING));
303     }
304
305     /**
306      * Queues a PCK command for immediate sending, regardless of the Connection state. The PCK command is automatically
307      * re-sent if the destination is not a group, an Ack is requested and the module did not answer within the expected
308      * time.
309      *
310      * @param addr the target LCN address
311      * @param wantsAck true to wait for acknowledge on receipt (should be false for group addresses)
312      * @param data the pure PCK command (without address header)
313      */
314     void queueDirectly(LcnAddr addr, boolean wantsAck, byte[] data) {
315         if (!addr.isGroup() && wantsAck) {
316             this.updateModuleData((LcnAddrMod) addr).queuePckCommandWithAck(data, this, this.settings.getTimeout(),
317                     System.currentTimeMillis());
318         } else {
319             this.queueAndSend(new SendDataPck(addr, false, data));
320         }
321     }
322
323     /**
324      * Enqueues a raw PCK command and triggers the socket to start sending, if it does not already. Does not take care
325      * of any Acks.
326      *
327      * @param data raw PCK command
328      */
329     synchronized void queueAndSend(SendData data) {
330         this.sendQueue.add(data);
331
332         triggerWriteToSocket();
333     }
334
335     /**
336      * Enqueues a PCK command to the offline queue. Data will be sent when the Connection state will enter
337      * {@link ConnectionStateConnected}.
338      *
339      * @param addr LCN module address
340      * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
341      * @param data the pure PCK command (without address header)
342      */
343     void queueOffline(LcnAddr addr, boolean wantsAck, byte[] data) {
344         offlineSendQueue.add(new PckQueueItem(addr, wantsAck, data));
345     }
346
347     /**
348      * Enqueues a PCK command for sending. Takes care of the Connection state and buffers the command for a specific
349      * time if the Connection is not ready. If an Ack is requested, the PCK command is automatically
350      * re-sent, if the module did not answer in the expected time.
351      *
352      * @param addr LCN module address
353      * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
354      * @param pck the pure PCK command (without address header)
355      */
356     public void queue(LcnAddr addr, boolean wantsAck, String pck) {
357         this.queue(addr, wantsAck, pck.getBytes(LcnDefs.LCN_ENCODING));
358     }
359
360     /**
361      * Enqueues a PCK command for sending. Takes care of the Connection state and buffers the command for a specific
362      * time if the Connection is not ready. If an Ack is requested, the PCK command is automatically
363      * re-sent, if the module did not answer in the expected time.
364      *
365      * @param addr LCN module address
366      * @param wantsAck true, if the LCN module shall respond with an Ack on successful processing
367      * @param pck the pure PCK command (without address header)
368      */
369     public void queue(LcnAddr addr, boolean wantsAck, byte[] pck) {
370         connectionStateMachine.queue(addr, wantsAck, pck);
371     }
372
373     /**
374      * Process the offline PCK command queue. Does only send recently enqueued PCK commands, the rest is discarded.
375      */
376     void sendOfflineQueue() {
377         List<PckQueueItem> allItems = new ArrayList<>(offlineSendQueue.size());
378         offlineSendQueue.drainTo(allItems);
379
380         allItems.forEach(item -> {
381             // only send messages that were enqueued recently, discard older messages
382             long timeout = settings.getTimeout();
383             if (item.getEnqueued().isAfter(Instant.now().minus(timeout * 4, ChronoUnit.MILLIS))) {
384                 queueDirectly(item.getAddr(), item.isWantsAck(), item.getData());
385             }
386         });
387     }
388
389     /**
390      * Gets the Connection's callback.
391      *
392      * @return the callback
393      */
394     public ConnectionCallback getCallback() {
395         return callback;
396     }
397
398     /**
399      * Sets the SocketChannel of this Connection
400      *
401      * @param channel the new Channel
402      */
403     public void setSocketChannel(AsynchronousSocketChannel channel) {
404         this.channel = channel;
405     }
406
407     /**
408      * Gets the SocketChannel of the Connection.
409      *
410      * @returnthe socket channel
411      */
412     @Nullable
413     public Channel getSocketChannel() {
414         return channel;
415     }
416
417     /**
418      * Gets the local segment ID. When no segments are used, the local segment ID is 0.
419      *
420      * @return the local segment ID
421      */
422     public int getLocalSegId() {
423         return localSegId;
424     }
425
426     /**
427      * Runs the periodic updates on all ModInfos.
428      */
429     public void updateModInfos() {
430         synchronized (modData) {
431             modData.values().forEach(i -> i.update(this, settings.getTimeout(), System.currentTimeMillis()));
432         }
433     }
434
435     /**
436      * Removes an LCN module from the ModData list.
437      *
438      * @param addr the module's address to be removed
439      */
440     public void removeLcnModule(LcnAddr addr) {
441         modData.remove(addr);
442     }
443
444     /**
445      * Invoked when this Connection shall be shut-down finally.
446      */
447     public void shutdown() {
448         connectionStateMachine.shutdownFinally();
449     }
450
451     /**
452      * Sends a broadcast to all LCN modules with a reuqest to respond with an Ack.
453      */
454     public void sendModuleDiscoveryCommand() {
455         queueAndSend(new SendDataPck(new LcnAddrGrp(BROADCAST_SEGMENT_ID, BROADCAST_MODULE_ID), true,
456                 PckGenerator.nullCommand().getBytes(LcnDefs.LCN_ENCODING)));
457         queueAndSend(new SendDataPck(new LcnAddrGrp(0, BROADCAST_MODULE_ID), true,
458                 PckGenerator.nullCommand().getBytes(LcnDefs.LCN_ENCODING)));
459     }
460
461     /**
462      * Requests the serial number and the firmware version of the given LCN module.
463      *
464      * @param addr module's address
465      */
466     public void sendSerialNumberRequest(LcnAddrMod addr) {
467         queueDirectly(addr, false, PckGenerator.requestSn());
468     }
469
470     /**
471      * Requests theprogrammed name of the given LCN module.
472      *
473      * @param addr module's address
474      */
475     public void sendModuleNameRequest(LcnAddrMod addr) {
476         queueDirectly(addr, false, PckGenerator.requestModuleName(0));
477         queueDirectly(addr, false, PckGenerator.requestModuleName(1));
478     }
479 }