2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.paradoxalarm.internal.communication;
15 import java.io.DataInputStream;
16 import java.io.DataOutputStream;
17 import java.io.IOException;
18 import java.net.Socket;
19 import java.net.SocketException;
20 import java.net.UnknownHostException;
21 import java.util.Arrays;
22 import java.util.Collection;
23 import java.util.concurrent.ScheduledExecutorService;
25 import org.openhab.binding.paradoxalarm.internal.util.ParadoxUtil;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
30 * The {@link AbstractCommunicator} Abstract class with common low-level communication logic. Extended by the
31 * communicator classes.
33 * @author Konstantin Polihronov - Initial contribution
35 public abstract class AbstractCommunicator implements IParadoxInitialLoginCommunicator {
37 protected static final int SOCKET_TIMEOUT = 4000;
39 private static final long PACKET_EXPIRATION_TRESHOLD_MILLISECONDS = 2000;
41 private final Logger logger = LoggerFactory.getLogger(AbstractCommunicator.class);
43 protected ScheduledExecutorService scheduler;
44 protected Collection<IDataUpdateListener> listeners;
46 protected Socket socket;
47 private ISocketTimeOutListener stoListener;
49 private final String ipAddress;
50 private final int tcpPort;
51 private DataOutputStream tx;
52 private DataInputStream rx;
54 private boolean isOnline;
56 public AbstractCommunicator(String ipAddress, int tcpPort, ScheduledExecutorService scheduler)
57 throws UnknownHostException, IOException {
58 this.ipAddress = ipAddress;
59 this.tcpPort = tcpPort;
60 logger.debug("IP Address={}, TCP Port={}", ipAddress, tcpPort);
61 this.scheduler = scheduler;
66 protected void initializeSocket() throws IOException, UnknownHostException {
67 if (socket != null && !socket.isClosed()) {
70 socket = new Socket(ipAddress, tcpPort);
71 socket.setSoTimeout(SOCKET_TIMEOUT);
72 tx = new DataOutputStream(socket.getOutputStream());
73 rx = new DataInputStream(socket.getInputStream());
77 public synchronized void close() {
78 logger.info("Stopping communication to Paradox system");
84 } catch (IOException e) {
85 logger.warn("IO exception during socket/stream close operation.", e);
90 public void submitRequest(IRequest request) {
92 request.getRequestPacket().encrypt();
94 SyncQueue syncQueue = SyncQueue.getInstance();
95 syncQueue.add(request);
96 communicateToParadox();
99 protected void communicateToParadox() {
100 SyncQueue syncQueue = SyncQueue.getInstance();
101 synchronized (syncQueue) {
102 if (syncQueue.hasPacketToReceive()) {
104 } else if (syncQueue.hasPacketsToSend()) {
108 // Recursively check if there are more packets to send in TX queue until it becomes empty
109 if (syncQueue.hasPacketsToSend() || syncQueue.hasPacketToReceive()) {
110 communicateToParadox();
115 protected void sendPacket() {
116 SyncQueue syncQueue = SyncQueue.getInstance();
117 IRequest request = syncQueue.peekSendQueue();
119 logger.trace("Sending packet with request={}", request);
120 byte[] packetBytes = request.getRequestPacket().getBytes();
121 ParadoxUtil.printPacket("Tx Packet:", packetBytes);
122 tx.write(packetBytes);
123 syncQueue.moveRequest();
124 } catch (SocketException e) {
125 logger.debug("Socket time out occurred. Informing listener. Request={}. Exception=", request, e);
126 syncQueue.removeSendRequest();
127 stoListener.onSocketTimeOutOccurred(e);
128 } catch (IOException e) {
129 logger.debug("Error while sending packet with request={}. IOException=", request, e);
130 syncQueue.removeSendRequest();
134 protected void receivePacket() {
135 SyncQueue syncQueue = SyncQueue.getInstance();
137 logger.trace("Found packet to receive in queue...");
138 byte[] result = new byte[256];
139 int readBytes = rx.read(result);
140 if (readBytes > 0 && result[1] > 0 && result[1] + 16 < 256) {
141 logger.trace("Successfully read valid packet from Rx");
142 IRequest request = syncQueue.poll();
143 byte[] bytesData = Arrays.copyOfRange(result, 0, readBytes);
144 IResponse response = new Response(request, bytesData, isEncrypted());
146 if (response.getPayload() == null || response.getHeader() == null) {
147 handleWrongPacket(result, request);
150 IResponseReceiver responseReceiver = request.getResponseReceiver();
151 if (responseReceiver != null) {
152 responseReceiver.receiveResponse(response, this);
154 } else if (SyncQueue.getInstance().peekReceiveQueue()
155 .isTimeStampExpired(PACKET_EXPIRATION_TRESHOLD_MILLISECONDS)) {
156 logger.trace("Unable to receive proper package for {} time. Rescheduling...",
157 PACKET_EXPIRATION_TRESHOLD_MILLISECONDS);
159 IRequest requestInQueue = syncQueue.poll();
160 logger.debug("Error receiving packet after reaching the set timeout of {}ms. Request: {}",
161 PACKET_EXPIRATION_TRESHOLD_MILLISECONDS, requestInQueue);
163 } catch (SocketException e) {
164 IRequest request = syncQueue.poll();
165 logger.debug("Socket time out occurred. Informing listener. Request={}, SocketException=", request, e);
166 stoListener.onSocketTimeOutOccurred(e);
167 } catch (IOException e) {
168 IRequest request = syncQueue.poll();
169 logger.debug("Unable to receive package due to IO Exception. Request {}, IOException=", request, e);
173 protected void handleWrongPacket(byte[] result, IRequest request) throws IOException {
175 "Payload or header are null. Probably unexpected package has been read. Need to retry read the same request.");
177 ParadoxUtil.printPacket("Flushing packet:", result);
178 submitRequest(request);
182 public boolean isOnline() {
187 public void setOnline(boolean flag) {
188 this.isOnline = flag;
191 public DataInputStream getRx() {
195 protected abstract void receiveEpromResponse(IResponse response);
197 protected abstract void receiveRamResponse(IResponse response);
199 public ISocketTimeOutListener getStoListener() {
204 public void setStoListener(ISocketTimeOutListener stoListener) {
205 this.stoListener = stoListener;