2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.pulseaudio.internal;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.net.Socket;
19 import java.util.concurrent.Future;
20 import java.util.concurrent.ScheduledExecutorService;
22 import org.eclipse.jdt.annotation.NonNullByDefault;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.openhab.binding.pulseaudio.internal.handler.PulseaudioHandler;
25 import org.openhab.core.audio.AudioException;
26 import org.openhab.core.audio.AudioFormat;
27 import org.openhab.core.audio.AudioSource;
28 import org.openhab.core.audio.AudioStream;
29 import org.openhab.core.audio.PipedAudioStream;
30 import org.openhab.core.common.ThreadPoolManager;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
35 * The audio source for openhab, implemented by a connection to a pulseaudio source using Simple TCP protocol
37 * @author Miguel Álvarez - Initial contribution
41 public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implements AudioSource {
43 private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
44 private final PipedAudioStream.Group streamGroup;
45 private final ScheduledExecutorService executor;
46 private final AudioFormat streamFormat;
48 private @Nullable Future<?> pipeWriteTask;
50 public PulseAudioAudioSource(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler) {
51 super(pulseaudioHandler, scheduler);
52 streamFormat = pulseaudioHandler.getSourceAudioFormat();
53 executor = ThreadPoolManager
54 .getScheduledPool("OH-binding-" + pulseaudioHandler.getThing().getUID() + "-source");
55 streamGroup = PipedAudioStream.newGroup(streamFormat);
59 public Set<AudioFormat> getSupportedFormats() {
60 return Set.of(streamFormat);
64 public AudioStream getInputStream(AudioFormat audioFormat) throws AudioException {
66 for (int countAttempt = 1; countAttempt <= 2; countAttempt++) { // two attempts allowed
69 final Socket clientSocketLocal = clientSocket;
70 if (clientSocketLocal == null) {
73 if (!audioFormat.isCompatible(streamFormat)) {
74 throw new AudioException("Incompatible audio format requested");
76 var audioStream = streamGroup.getAudioStreamInGroup();
77 audioStream.onClose(() -> {
83 // get raw audio from the pulse audio socket
85 } catch (IOException e) {
86 disconnect(); // disconnect to force clear connection in case of socket not cleanly shutdown
87 if (countAttempt == 2) { // we won't retry : log and quit
88 final Socket clientSocketLocal = clientSocket;
89 String port = clientSocketLocal != null ? Integer.toString(clientSocketLocal.getPort())
92 "Error while trying to get audio from pulseaudio audio source. Cannot connect to {}:{}, error: {}",
93 pulseaudioHandler.getHost(), port, e.getMessage());
96 } catch (InterruptedException ie) {
97 logger.info("Interrupted during source audio connection: {}", ie.getMessage());
98 throw new AudioException(ie);
102 } catch (IOException e) {
103 throw new AudioException(e);
105 throw new AudioException("Unable to create input stream");
109 * As startPipeWrite is called for every chunk read,
110 * this wrapper method make the test before effectively
111 * locking the object (which is a costly operation)
113 private void startPipeWrite() {
114 if (this.pipeWriteTask == null) {
115 startPipeWriteSynchronized();
119 private synchronized void startPipeWriteSynchronized() {
120 if (this.pipeWriteTask == null) {
121 this.pipeWriteTask = executor.submit(() -> {
123 byte[] buffer = new byte[1200];
125 while (!streamGroup.isEmpty()) {
126 var stream = getSourceInputStream();
127 if (stream != null) {
129 lengthRead = stream.read(buffer);
131 streamGroup.write(buffer, 0, lengthRead);
133 } catch (IOException e) {
134 logger.warn("IOException while reading from pulse source: {}", getExceptionMessage(e));
135 if (readRetries == 0) {
136 // force reconnection on persistent IOException
141 } catch (RuntimeException e) {
142 logger.warn("RuntimeException while reading from pulse source: {}", getExceptionMessage(e));
145 logger.warn("Unable to get source input stream");
148 this.pipeWriteTask = null;
153 private synchronized void stopPipeWriteTask() {
154 var pipeWriteTask = this.pipeWriteTask;
155 if (streamGroup.isEmpty() && pipeWriteTask != null) {
156 pipeWriteTask.cancel(true);
157 this.pipeWriteTask = null;
161 private @Nullable String getExceptionMessage(Exception e) {
162 String message = e.getMessage();
163 var cause = e.getCause();
164 if (message == null && cause != null) {
165 message = cause.getMessage();
170 private @Nullable InputStream getSourceInputStream() {
173 } catch (IOException | InterruptedException ignored) {
176 var clientSocketFinal = clientSocket;
177 return (clientSocketFinal != null) ? clientSocketFinal.getInputStream() : null;
178 } catch (IOException ignored) {
184 public void disconnect() {