]> git.basschouten.com Git - openhab-addons.git/blob
887e06c81416666123e1a6a69f10d000b0cca448
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.pulseaudio.internal;
14
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.InterruptedIOException;
18 import java.io.PipedInputStream;
19 import java.io.PipedOutputStream;
20 import java.net.Socket;
21 import java.util.HashSet;
22 import java.util.Set;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.ScheduledExecutorService;
26
27 import org.eclipse.jdt.annotation.NonNullByDefault;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.openhab.binding.pulseaudio.internal.handler.PulseaudioHandler;
30 import org.openhab.core.audio.AudioException;
31 import org.openhab.core.audio.AudioFormat;
32 import org.openhab.core.audio.AudioSource;
33 import org.openhab.core.audio.AudioStream;
34 import org.openhab.core.common.ThreadPoolManager;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * The audio source for openhab, implemented by a connection to a pulseaudio source using Simple TCP protocol
40  *
41  * @author Miguel Álvarez - Initial contribution
42  *
43  */
44 @NonNullByDefault
45 public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implements AudioSource {
46
47     private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
48     private final ConcurrentLinkedQueue<PipedOutputStream> pipeOutputs = new ConcurrentLinkedQueue<>();
49     private final ScheduledExecutorService executor;
50
51     private @Nullable Future<?> pipeWriteTask;
52
53     public PulseAudioAudioSource(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler) {
54         super(pulseaudioHandler, scheduler);
55         executor = ThreadPoolManager
56                 .getScheduledPool("OH-binding-" + pulseaudioHandler.getThing().getUID() + "-source");
57     }
58
59     @Override
60     public Set<AudioFormat> getSupportedFormats() {
61         var supportedFormats = new HashSet<AudioFormat>();
62         var audioFormat = pulseaudioHandler.getSourceAudioFormat();
63         if (audioFormat != null) {
64             supportedFormats.add(audioFormat);
65         }
66         return supportedFormats;
67     }
68
69     @Override
70     public AudioStream getInputStream(AudioFormat audioFormat) throws AudioException {
71         try {
72             for (int countAttempt = 1; countAttempt <= 2; countAttempt++) { // two attempts allowed
73                 try {
74                     connectIfNeeded();
75                     final Socket clientSocketLocal = clientSocket;
76                     if (clientSocketLocal == null) {
77                         break;
78                     }
79                     var sourceFormat = pulseaudioHandler.getSourceAudioFormat();
80                     if (sourceFormat == null) {
81                         throw new AudioException("Unable to get source audio format");
82                     }
83                     if (!audioFormat.isCompatible(sourceFormat)) {
84                         throw new AudioException("Incompatible audio format requested");
85                     }
86                     var pipeOutput = new PipedOutputStream();
87                     var pipeInput = new PipedInputStream(pipeOutput, 1024 * 10) {
88                         @Override
89                         public void close() throws IOException {
90                             unregisterPipe(pipeOutput);
91                             super.close();
92                         }
93                     };
94                     registerPipe(pipeOutput);
95                     // get raw audio from the pulse audio socket
96                     return new PulseAudioStream(sourceFormat, pipeInput, () -> {
97                         // ensure pipe is writing
98                         startPipeWrite();
99                     });
100                 } catch (IOException e) {
101                     disconnect(); // disconnect to force clear connection in case of socket not cleanly shutdown
102                     if (countAttempt == 2) { // we won't retry : log and quit
103                         final Socket clientSocketLocal = clientSocket;
104                         String port = clientSocketLocal != null ? Integer.toString(clientSocketLocal.getPort())
105                                 : "unknown";
106                         logger.warn(
107                                 "Error while trying to get audio from pulseaudio audio source. Cannot connect to {}:{}, error: {}",
108                                 pulseaudioHandler.getHost(), port, e.getMessage());
109                         throw e;
110                     }
111                 } catch (InterruptedException ie) {
112                     logger.info("Interrupted during source audio connection: {}", ie.getMessage());
113                     throw new AudioException(ie);
114                 }
115                 countAttempt++;
116             }
117         } catch (IOException e) {
118             throw new AudioException(e);
119         }
120         throw new AudioException("Unable to create input stream");
121     }
122
123     private synchronized void registerPipe(PipedOutputStream pipeOutput) {
124         boolean isAdded = this.pipeOutputs.add(pipeOutput);
125         if (isAdded) {
126             addClientCount();
127         }
128         startPipeWrite();
129     }
130
131     /**
132      * As startPipeWrite is called for every chunk read,
133      * this wrapper method make the test before effectively
134      * locking the object (which is a costly operation)
135      */
136     private void startPipeWrite() {
137         if (this.pipeWriteTask == null) {
138             startPipeWriteSynchronized();
139         }
140     }
141
142     private synchronized void startPipeWriteSynchronized() {
143         if (this.pipeWriteTask == null) {
144             this.pipeWriteTask = executor.submit(() -> {
145                 int lengthRead;
146                 byte[] buffer = new byte[1024];
147                 int readRetries = 3;
148                 while (!pipeOutputs.isEmpty()) {
149                     var stream = getSourceInputStream();
150                     if (stream != null) {
151                         try {
152                             lengthRead = stream.read(buffer);
153                             readRetries = 3;
154                             for (var output : pipeOutputs) {
155                                 try {
156                                     output.write(buffer, 0, lengthRead);
157                                     if (pipeOutputs.contains(output)) {
158                                         output.flush();
159                                     }
160                                 } catch (InterruptedIOException e) {
161                                     if (pipeOutputs.isEmpty()) {
162                                         // task has been ended while writing
163                                         return;
164                                     }
165                                     logger.warn("InterruptedIOException while writing from pulse source to pipe: {}",
166                                             getExceptionMessage(e));
167                                 } catch (IOException e) {
168                                     logger.warn("IOException while writing from pulse source to pipe: {}",
169                                             getExceptionMessage(e));
170                                 } catch (RuntimeException e) {
171                                     logger.warn("RuntimeException while writing from pulse source to pipe: {}",
172                                             getExceptionMessage(e));
173                                 }
174                             }
175                         } catch (IOException e) {
176                             logger.warn("IOException while reading from pulse source: {}", getExceptionMessage(e));
177                             if (readRetries == 0) {
178                                 // force reconnection on persistent IOException
179                                 super.disconnect();
180                             } else {
181                                 readRetries--;
182                             }
183                         } catch (RuntimeException e) {
184                             logger.warn("RuntimeException while reading from pulse source: {}", getExceptionMessage(e));
185                         }
186                     } else {
187                         logger.warn("Unable to get source input stream");
188                     }
189                 }
190                 this.pipeWriteTask = null;
191             });
192         }
193     }
194
195     private synchronized void unregisterPipe(PipedOutputStream pipeOutput) {
196         boolean isRemoved = this.pipeOutputs.remove(pipeOutput);
197         if (isRemoved) {
198             minusClientCount();
199         }
200         try {
201             Thread.sleep(0);
202         } catch (InterruptedException ignored) {
203         }
204         stopPipeWriteTask();
205         try {
206             pipeOutput.close();
207         } catch (IOException ignored) {
208         }
209     }
210
211     private synchronized void stopPipeWriteTask() {
212         var pipeWriteTask = this.pipeWriteTask;
213         if (pipeOutputs.isEmpty() && pipeWriteTask != null) {
214             pipeWriteTask.cancel(true);
215             this.pipeWriteTask = null;
216         }
217     }
218
219     private @Nullable String getExceptionMessage(Exception e) {
220         String message = e.getMessage();
221         var cause = e.getCause();
222         if (message == null && cause != null) {
223             message = cause.getMessage();
224         }
225         return message;
226     }
227
228     private @Nullable InputStream getSourceInputStream() {
229         try {
230             connectIfNeeded();
231         } catch (IOException | InterruptedException ignored) {
232         }
233         try {
234             var clientSocketFinal = clientSocket;
235             return (clientSocketFinal != null) ? clientSocketFinal.getInputStream() : null;
236         } catch (IOException ignored) {
237             return null;
238         }
239     }
240
241     @Override
242     public void disconnect() {
243         stopPipeWriteTask();
244         super.disconnect();
245     }
246
247     static class PulseAudioStream extends AudioStream {
248         private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
249         private final AudioFormat format;
250         private final InputStream input;
251         private final Runnable activity;
252         private boolean closed = false;
253
254         public PulseAudioStream(AudioFormat format, InputStream input, Runnable activity) {
255             this.input = input;
256             this.format = format;
257             this.activity = activity;
258         }
259
260         @Override
261         public AudioFormat getFormat() {
262             return format;
263         }
264
265         @Override
266         public int read() throws IOException {
267             byte[] b = new byte[1];
268             int bytesRead = read(b);
269             if (-1 == bytesRead) {
270                 return bytesRead;
271             }
272             Byte bb = Byte.valueOf(b[0]);
273             return bb.intValue();
274         }
275
276         @Override
277         public int read(byte @Nullable [] b) throws IOException {
278             return read(b, 0, b == null ? 0 : b.length);
279         }
280
281         @Override
282         public int read(byte @Nullable [] b, int off, int len) throws IOException {
283             if (b == null) {
284                 throw new IOException("Buffer is null");
285             }
286             logger.trace("reading from pulseaudio stream");
287             if (closed) {
288                 throw new IOException("Stream is closed");
289             }
290             activity.run();
291             return input.read(b, off, len);
292         }
293
294         @Override
295         public void close() throws IOException {
296             closed = true;
297             input.close();
298         }
299     };
300 }