2 * Copyright (c) 2010-2022 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.pulseaudio.internal;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.InterruptedIOException;
18 import java.io.PipedInputStream;
19 import java.io.PipedOutputStream;
20 import java.net.Socket;
21 import java.util.HashSet;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.ScheduledExecutorService;
27 import org.eclipse.jdt.annotation.NonNullByDefault;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.openhab.binding.pulseaudio.internal.handler.PulseaudioHandler;
30 import org.openhab.core.audio.AudioException;
31 import org.openhab.core.audio.AudioFormat;
32 import org.openhab.core.audio.AudioSource;
33 import org.openhab.core.audio.AudioStream;
34 import org.openhab.core.common.ThreadPoolManager;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
39 * The audio source for openhab, implemented by a connection to a pulseaudio source using Simple TCP protocol
41 * @author Miguel Álvarez - Initial contribution
45 public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implements AudioSource {
47 private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
48 private final ConcurrentLinkedQueue<PipedOutputStream> pipeOutputs = new ConcurrentLinkedQueue<>();
49 private final ScheduledExecutorService executor;
51 private @Nullable Future<?> pipeWriteTask;
53 public PulseAudioAudioSource(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler) {
54 super(pulseaudioHandler, scheduler);
55 executor = ThreadPoolManager
56 .getScheduledPool("OH-binding-" + pulseaudioHandler.getThing().getUID() + "-source");
60 public Set<AudioFormat> getSupportedFormats() {
61 var supportedFormats = new HashSet<AudioFormat>();
62 var audioFormat = pulseaudioHandler.getSourceAudioFormat();
63 if (audioFormat != null) {
64 supportedFormats.add(audioFormat);
66 return supportedFormats;
70 public AudioStream getInputStream(AudioFormat audioFormat) throws AudioException {
72 for (int countAttempt = 1; countAttempt <= 2; countAttempt++) { // two attempts allowed
75 final Socket clientSocketLocal = clientSocket;
76 if (clientSocketLocal == null) {
79 var sourceFormat = pulseaudioHandler.getSourceAudioFormat();
80 if (sourceFormat == null) {
81 throw new AudioException("Unable to get source audio format");
83 if (!audioFormat.isCompatible(sourceFormat)) {
84 throw new AudioException("Incompatible audio format requested");
86 var pipeOutput = new PipedOutputStream();
87 var pipeInput = new PipedInputStream(pipeOutput, 1024 * 10) {
89 public void close() throws IOException {
90 unregisterPipe(pipeOutput);
94 registerPipe(pipeOutput);
95 // get raw audio from the pulse audio socket
96 return new PulseAudioStream(sourceFormat, pipeInput, () -> {
97 // ensure pipe is writing
100 } catch (IOException e) {
101 disconnect(); // disconnect to force clear connection in case of socket not cleanly shutdown
102 if (countAttempt == 2) { // we won't retry : log and quit
103 final Socket clientSocketLocal = clientSocket;
104 String port = clientSocketLocal != null ? Integer.toString(clientSocketLocal.getPort())
107 "Error while trying to get audio from pulseaudio audio source. Cannot connect to {}:{}, error: {}",
108 pulseaudioHandler.getHost(), port, e.getMessage());
111 } catch (InterruptedException ie) {
112 logger.info("Interrupted during source audio connection: {}", ie.getMessage());
113 throw new AudioException(ie);
117 } catch (IOException e) {
118 throw new AudioException(e);
120 throw new AudioException("Unable to create input stream");
123 private synchronized void registerPipe(PipedOutputStream pipeOutput) {
124 boolean isAdded = this.pipeOutputs.add(pipeOutput);
132 * As startPipeWrite is called for every chunk read,
133 * this wrapper method make the test before effectively
134 * locking the object (which is a costly operation)
136 private void startPipeWrite() {
137 if (this.pipeWriteTask == null) {
138 startPipeWriteSynchronized();
142 private synchronized void startPipeWriteSynchronized() {
143 if (this.pipeWriteTask == null) {
144 this.pipeWriteTask = executor.submit(() -> {
146 byte[] buffer = new byte[1024];
148 while (!pipeOutputs.isEmpty()) {
149 var stream = getSourceInputStream();
150 if (stream != null) {
152 lengthRead = stream.read(buffer);
154 for (var output : pipeOutputs) {
156 output.write(buffer, 0, lengthRead);
157 if (pipeOutputs.contains(output)) {
160 } catch (InterruptedIOException e) {
161 if (pipeOutputs.isEmpty()) {
162 // task has been ended while writing
165 logger.warn("InterruptedIOException while writing to from pulse source pipe: {}",
166 getExceptionMessage(e));
167 } catch (IOException e) {
168 logger.warn("IOException while writing to from pulse source pipe: {}",
169 getExceptionMessage(e));
170 } catch (RuntimeException e) {
171 logger.warn("RuntimeException while writing to pulse source pipe: {}",
172 getExceptionMessage(e));
175 } catch (IOException e) {
176 logger.warn("IOException while reading from pulse source: {}", getExceptionMessage(e));
177 if (readRetries == 0) {
178 // force reconnection on persistent IOException
183 } catch (RuntimeException e) {
184 logger.warn("RuntimeException while reading from pulse source: {}", getExceptionMessage(e));
187 logger.warn("Unable to get source input stream");
190 this.pipeWriteTask = null;
195 private synchronized void unregisterPipe(PipedOutputStream pipeOutput) {
196 boolean isRemoved = this.pipeOutputs.remove(pipeOutput);
202 } catch (InterruptedException ignored) {
207 } catch (IOException ignored) {
211 private synchronized void stopPipeWriteTask() {
212 var pipeWriteTask = this.pipeWriteTask;
213 if (pipeOutputs.isEmpty() && pipeWriteTask != null) {
214 pipeWriteTask.cancel(true);
215 this.pipeWriteTask = null;
219 private @Nullable String getExceptionMessage(Exception e) {
220 String message = e.getMessage();
221 var cause = e.getCause();
222 if (message == null && cause != null) {
223 message = cause.getMessage();
228 private @Nullable InputStream getSourceInputStream() {
231 } catch (IOException | InterruptedException ignored) {
234 var clientSocketFinal = clientSocket;
235 return (clientSocketFinal != null) ? clientSocketFinal.getInputStream() : null;
236 } catch (IOException ignored) {
242 public void disconnect() {
247 static class PulseAudioStream extends AudioStream {
248 private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
249 private final AudioFormat format;
250 private final InputStream input;
251 private final Runnable activity;
252 private boolean closed = false;
254 public PulseAudioStream(AudioFormat format, InputStream input, Runnable activity) {
256 this.format = format;
257 this.activity = activity;
261 public AudioFormat getFormat() {
266 public int read() throws IOException {
267 byte[] b = new byte[1];
268 int bytesRead = read(b);
269 if (-1 == bytesRead) {
272 Byte bb = Byte.valueOf(b[0]);
273 return bb.intValue();
277 public int read(byte @Nullable [] b) throws IOException {
278 return read(b, 0, b == null ? 0 : b.length);
282 public int read(byte @Nullable [] b, int off, int len) throws IOException {
284 throw new IOException("Buffer is null");
286 logger.trace("reading from pulseaudio stream");
288 throw new IOException("Stream is closed");
291 return input.read(b, off, len);
295 public void close() throws IOException {