]> git.basschouten.com Git - openhab-addons.git/blob
fbb7138b3118e3821fddfde58f27a2e9cbfe020c
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.paradoxalarm.internal.communication;
14
15 import java.io.DataInputStream;
16 import java.io.DataOutputStream;
17 import java.io.IOException;
18 import java.net.Socket;
19 import java.net.SocketException;
20 import java.net.UnknownHostException;
21 import java.util.Arrays;
22 import java.util.Collection;
23 import java.util.concurrent.ScheduledExecutorService;
24
25 import org.openhab.binding.paradoxalarm.internal.util.ParadoxUtil;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 /**
30  * The {@link AbstractCommunicator} Abstract class with common low-level communication logic. Extended by the
31  * communicator classes.
32  *
33  * @author Konstantin Polihronov - Initial contribution
34  */
35 public abstract class AbstractCommunicator implements IParadoxInitialLoginCommunicator {
36
37     protected static final int SOCKET_TIMEOUT = 4000;
38
39     private static final long PACKET_EXPIRATION_TRESHOLD_MILLISECONDS = 2000;
40
41     private final Logger logger = LoggerFactory.getLogger(AbstractCommunicator.class);
42
43     protected ScheduledExecutorService scheduler;
44     protected Collection<IDataUpdateListener> listeners;
45
46     protected Socket socket;
47     private ISocketTimeOutListener stoListener;
48
49     private final String ipAddress;
50     private final int tcpPort;
51     private DataOutputStream tx;
52     private DataInputStream rx;
53
54     private boolean isOnline;
55
56     public AbstractCommunicator(String ipAddress, int tcpPort, ScheduledExecutorService scheduler)
57             throws UnknownHostException, IOException {
58         this.ipAddress = ipAddress;
59         this.tcpPort = tcpPort;
60         logger.debug("IP Address={}, TCP Port={}", ipAddress, tcpPort);
61         this.scheduler = scheduler;
62
63         initializeSocket();
64     }
65
66     protected void initializeSocket() throws IOException, UnknownHostException {
67         if (socket != null && !socket.isClosed()) {
68             close();
69         }
70         socket = new Socket(ipAddress, tcpPort);
71         socket.setSoTimeout(SOCKET_TIMEOUT);
72         tx = new DataOutputStream(socket.getOutputStream());
73         rx = new DataInputStream(socket.getInputStream());
74     }
75
76     @Override
77     public synchronized void close() {
78         logger.info("Stopping communication to Paradox system");
79         setOnline(false);
80         try {
81             tx.close();
82             rx.close();
83             socket.close();
84         } catch (IOException e) {
85             logger.warn("IO exception during socket/stream close operation.", e);
86         }
87     }
88
89     @Override
90     public void submitRequest(IRequest request) {
91         if (isEncrypted()) {
92             request.getRequestPacket().encrypt();
93         }
94         SyncQueue syncQueue = SyncQueue.getInstance();
95         syncQueue.add(request);
96         communicateToParadox();
97     }
98
99     protected void communicateToParadox() {
100         SyncQueue syncQueue = SyncQueue.getInstance();
101         synchronized (syncQueue) {
102             if (syncQueue.hasPacketToReceive()) {
103                 receivePacket();
104             } else if (syncQueue.hasPacketsToSend()) {
105                 sendPacket();
106             }
107
108             // Recursively check if there are more packets to send in TX queue until it becomes empty
109             if (syncQueue.hasPacketsToSend() || syncQueue.hasPacketToReceive()) {
110                 communicateToParadox();
111             }
112         }
113     }
114
115     protected void sendPacket() {
116         SyncQueue syncQueue = SyncQueue.getInstance();
117         IRequest request = syncQueue.peekSendQueue();
118         try {
119             logger.trace("Sending packet with request={}", request);
120             byte[] packetBytes = request.getRequestPacket().getBytes();
121             ParadoxUtil.printPacket("Tx Packet:", packetBytes);
122             tx.write(packetBytes);
123             syncQueue.moveRequest();
124         } catch (SocketException e) {
125             logger.debug("Socket time out occurred. Informing listener. Request={}. Exception=", request, e);
126             syncQueue.removeSendRequest();
127             stoListener.onSocketTimeOutOccurred(e);
128         } catch (IOException e) {
129             logger.debug("Error while sending packet with request={}. IOException=", request, e);
130             syncQueue.removeSendRequest();
131         }
132     }
133
134     protected void receivePacket() {
135         SyncQueue syncQueue = SyncQueue.getInstance();
136         try {
137             logger.trace("Found packet to receive in queue...");
138             byte[] result = new byte[256];
139             int readBytes = rx.read(result);
140             if (readBytes > 0 && result[1] > 0 && result[1] + 16 < 256) {
141                 logger.trace("Successfully read valid packet from Rx");
142                 IRequest request = syncQueue.poll();
143                 byte[] bytesData = Arrays.copyOfRange(result, 0, readBytes);
144                 IResponse response = new Response(request, bytesData, isEncrypted());
145
146                 if (response.getPayload() == null || response.getHeader() == null) {
147                     handleWrongPacket(result, request);
148                 }
149
150                 IResponseReceiver responseReceiver = request.getResponseReceiver();
151                 if (responseReceiver != null) {
152                     responseReceiver.receiveResponse(response, this);
153                 }
154             } else if (SyncQueue.getInstance().peekReceiveQueue()
155                     .isTimeStampExpired(PACKET_EXPIRATION_TRESHOLD_MILLISECONDS)) {
156                 logger.trace("Unable to receive proper package for {} time. Rescheduling...",
157                         PACKET_EXPIRATION_TRESHOLD_MILLISECONDS);
158             } else {
159                 IRequest requestInQueue = syncQueue.poll();
160                 logger.debug("Error receiving packet after reaching the set timeout of {}ms. Request: {}",
161                         PACKET_EXPIRATION_TRESHOLD_MILLISECONDS, requestInQueue);
162             }
163         } catch (SocketException e) {
164             IRequest request = syncQueue.poll();
165             logger.debug("Socket time out occurred. Informing listener. Request={}, SocketException=", request, e);
166             stoListener.onSocketTimeOutOccurred(e);
167         } catch (IOException e) {
168             IRequest request = syncQueue.poll();
169             logger.debug("Unable to receive package due to IO Exception. Request {}, IOException=", request, e);
170         }
171     }
172
173     protected void handleWrongPacket(byte[] result, IRequest request) throws IOException {
174         logger.trace(
175                 "Payload or header are null. Probably unexpected package has been read. Need to retry read the same request.");
176         rx.read(result);
177         ParadoxUtil.printPacket("Flushing packet:", result);
178         submitRequest(request);
179     }
180
181     @Override
182     public boolean isOnline() {
183         return isOnline;
184     }
185
186     @Override
187     public void setOnline(boolean flag) {
188         this.isOnline = flag;
189     }
190
191     public DataInputStream getRx() {
192         return rx;
193     }
194
195     protected abstract void receiveEpromResponse(IResponse response);
196
197     protected abstract void receiveRamResponse(IResponse response);
198
199     public ISocketTimeOutListener getStoListener() {
200         return stoListener;
201     }
202
203     @Override
204     public void setStoListener(ISocketTimeOutListener stoListener) {
205         this.stoListener = stoListener;
206     }
207 }