2 * Copyright (c) 2010-2022 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.pulseaudio.internal;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.InterruptedIOException;
18 import java.io.PipedInputStream;
19 import java.io.PipedOutputStream;
20 import java.net.Socket;
21 import java.util.HashSet;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.function.Consumer;
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.openhab.binding.pulseaudio.internal.handler.PulseaudioHandler;
31 import org.openhab.core.audio.AudioException;
32 import org.openhab.core.audio.AudioFormat;
33 import org.openhab.core.audio.AudioSource;
34 import org.openhab.core.audio.AudioStream;
35 import org.openhab.core.common.ThreadPoolManager;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
40 * The audio source for openhab, implemented by a connection to a pulseaudio source using Simple TCP protocol
42 * @author Miguel Álvarez - Initial contribution
46 public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implements AudioSource {
48 private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
49 private final ConcurrentLinkedQueue<PipedOutputStream> pipeOutputs = new ConcurrentLinkedQueue<>();
50 private final ScheduledExecutorService executor;
52 private @Nullable Future<?> pipeWriteTask;
54 public PulseAudioAudioSource(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler) {
55 super(pulseaudioHandler, scheduler);
56 executor = ThreadPoolManager
57 .getScheduledPool("OH-binding-" + pulseaudioHandler.getThing().getUID() + "-source");
61 public Set<AudioFormat> getSupportedFormats() {
62 var supportedFormats = new HashSet<AudioFormat>();
63 var audioFormat = pulseaudioHandler.getSourceAudioFormat();
64 if (audioFormat != null) {
65 supportedFormats.add(audioFormat);
67 return supportedFormats;
71 public AudioStream getInputStream(AudioFormat audioFormat) throws AudioException {
73 for (int countAttempt = 1; countAttempt <= 2; countAttempt++) { // two attempts allowed
76 final Socket clientSocketLocal = clientSocket;
77 if (clientSocketLocal == null) {
80 var sourceFormat = pulseaudioHandler.getSourceAudioFormat();
81 if (sourceFormat == null) {
82 throw new AudioException("Unable to get source audio format");
84 if (!audioFormat.isCompatible(sourceFormat)) {
85 throw new AudioException("Incompatible audio format requested");
88 var pipeOutput = new PipedOutputStream();
89 var pipeInput = new PipedInputStream(pipeOutput, 1024 * 10) {
91 public void close() throws IOException {
92 unregisterPipe(pipeOutput);
96 registerPipe(pipeOutput);
97 // get raw audio from the pulse audio socket
98 return new PulseAudioStream(sourceFormat, pipeInput, (idle) -> {
101 scheduleDisconnect();
103 // ensure pipe is writing
107 } catch (IOException e) {
108 disconnect(); // disconnect to force clear connection in case of socket not cleanly shutdown
109 if (countAttempt == 2) { // we won't retry : log and quit
110 String port = clientSocket != null ? Integer.toString(clientSocket.getPort()) : "unknown";
112 "Error while trying to get audio from pulseaudio audio source. Cannot connect to {}:{}, error: {}",
113 pulseaudioHandler.getHost(), port, e.getMessage());
117 } catch (InterruptedException ie) {
118 logger.info("Interrupted during source audio connection: {}", ie.getMessage());
120 throw new AudioException(ie);
124 } catch (IOException e) {
125 throw new AudioException(e);
127 scheduleDisconnect();
130 throw new AudioException("Unable to create input stream");
133 private synchronized void registerPipe(PipedOutputStream pipeOutput) {
134 this.pipeOutputs.add(pipeOutput);
138 private synchronized void startPipeWrite() {
139 if (this.pipeWriteTask == null) {
140 this.pipeWriteTask = executor.submit(() -> {
142 byte[] buffer = new byte[1024];
144 while (!pipeOutputs.isEmpty()) {
145 var stream = getSourceInputStream();
146 if (stream != null) {
148 lengthRead = stream.read(buffer);
150 for (var output : pipeOutputs) {
152 output.write(buffer, 0, lengthRead);
153 if (pipeOutputs.contains(output)) {
156 } catch (IOException e) {
157 if (e instanceof InterruptedIOException && pipeOutputs.isEmpty()) {
158 // task has been ended while writing
161 logger.warn("IOException while writing to from pulse source pipe: {}",
162 getExceptionMessage(e));
163 } catch (RuntimeException e) {
164 logger.warn("RuntimeException while writing to pulse source pipe: {}",
165 getExceptionMessage(e));
168 } catch (IOException e) {
169 logger.warn("IOException while reading from pulse source: {}", getExceptionMessage(e));
170 if (readRetries == 0) {
171 // force reconnection on persistent IOException
176 } catch (RuntimeException e) {
177 logger.warn("RuntimeException while reading from pulse source: {}", getExceptionMessage(e));
180 logger.warn("Unable to get source input stream");
183 this.pipeWriteTask = null;
188 private synchronized void unregisterPipe(PipedOutputStream pipeOutput) {
189 this.pipeOutputs.remove(pipeOutput);
192 } catch (InterruptedException ignored) {
197 } catch (IOException ignored) {
201 private synchronized void stopPipeWriteTask() {
202 var pipeWriteTask = this.pipeWriteTask;
203 if (pipeOutputs.isEmpty() && pipeWriteTask != null) {
204 pipeWriteTask.cancel(true);
205 this.pipeWriteTask = null;
209 private @Nullable String getExceptionMessage(Exception e) {
210 String message = e.getMessage();
211 var cause = e.getCause();
212 if (message == null && cause != null) {
213 message = cause.getMessage();
218 private @Nullable InputStream getSourceInputStream() {
221 } catch (IOException | InterruptedException ignored) {
224 return (clientSocket != null) ? clientSocket.getInputStream() : null;
225 } catch (IOException ignored) {
231 public void disconnect() {
236 static class PulseAudioStream extends AudioStream {
237 private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
238 private final AudioFormat format;
239 private final InputStream input;
240 private final Consumer<Boolean> setIdle;
241 private boolean closed = false;
243 public PulseAudioStream(AudioFormat format, InputStream input, Consumer<Boolean> setIdle) {
245 this.format = format;
246 this.setIdle = setIdle;
250 public AudioFormat getFormat() {
255 public int read() throws IOException {
256 byte[] b = new byte[1];
257 int bytesRead = read(b);
258 if (-1 == bytesRead) {
261 Byte bb = Byte.valueOf(b[0]);
262 return bb.intValue();
266 public int read(byte @Nullable [] b) throws IOException {
267 return read(b, 0, b.length);
271 public int read(byte @Nullable [] b, int off, int len) throws IOException {
272 logger.trace("reading from pulseaudio stream");
274 throw new IOException("Stream is closed");
276 setIdle.accept(false);
277 return input.read(b, off, len);
281 public void close() throws IOException {
283 setIdle.accept(true);