if (audioStream == null) {
return;
}
+ addClientCount();
try (ConvertedInputStream normalizedPCMStream = new ConvertedInputStream(audioStream)) {
for (int countAttempt = 1; countAttempt <= 2; countAttempt++) { // two attempts allowed
try {
final Socket clientSocketLocal = clientSocket;
if (clientSocketLocal != null) {
// send raw audio to the socket and to pulse audio
- setIdle(false);
Instant start = Instant.now();
normalizedPCMStream.transferTo(clientSocketLocal.getOutputStream());
if (normalizedPCMStream.getDuration() != -1) { // ensure, if the sound has a duration
throw new UnsupportedAudioFormatException("Cannot send sound to the pulseaudio sink",
audioStream.getFormat(), e);
} finally {
- scheduleDisconnect();
+ minusClientCount();
}
- setIdle(true);
}
@Override
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.function.Consumer;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
if (!audioFormat.isCompatible(sourceFormat)) {
throw new AudioException("Incompatible audio format requested");
}
- setIdle(true);
var pipeOutput = new PipedOutputStream();
var pipeInput = new PipedInputStream(pipeOutput, 1024 * 10) {
@Override
};
registerPipe(pipeOutput);
// get raw audio from the pulse audio socket
- return new PulseAudioStream(sourceFormat, pipeInput, (idle) -> {
- setIdle(idle);
- if (idle) {
- scheduleDisconnect();
- } else {
- // ensure pipe is writing
- startPipeWrite();
- }
+ return new PulseAudioStream(sourceFormat, pipeInput, () -> {
+ // ensure pipe is writing
+ startPipeWrite();
});
} catch (IOException e) {
disconnect(); // disconnect to force clear connection in case of socket not cleanly shutdown
logger.warn(
"Error while trying to get audio from pulseaudio audio source. Cannot connect to {}:{}, error: {}",
pulseaudioHandler.getHost(), port, e.getMessage());
- setIdle(true);
throw e;
}
} catch (InterruptedException ie) {
logger.info("Interrupted during source audio connection: {}", ie.getMessage());
- setIdle(true);
throw new AudioException(ie);
}
countAttempt++;
}
} catch (IOException e) {
throw new AudioException(e);
- } finally {
- scheduleDisconnect();
}
- setIdle(true);
throw new AudioException("Unable to create input stream");
}
private synchronized void registerPipe(PipedOutputStream pipeOutput) {
- this.pipeOutputs.add(pipeOutput);
+ boolean isAdded = this.pipeOutputs.add(pipeOutput);
+ if (isAdded) {
+ addClientCount();
+ }
startPipeWrite();
}
- private synchronized void startPipeWrite() {
+ /**
+ * As startPipeWrite is called for every chunk read,
+ * this wrapper method make the test before effectively
+ * locking the object (which is a costly operation)
+ */
+ private void startPipeWrite() {
+ if (this.pipeWriteTask == null) {
+ startPipeWriteSynchronized();
+ }
+ }
+
+ private synchronized void startPipeWriteSynchronized() {
if (this.pipeWriteTask == null) {
this.pipeWriteTask = executor.submit(() -> {
int lengthRead;
}
private synchronized void unregisterPipe(PipedOutputStream pipeOutput) {
- this.pipeOutputs.remove(pipeOutput);
+ boolean isRemoved = this.pipeOutputs.remove(pipeOutput);
+ if (isRemoved) {
+ minusClientCount();
+ }
try {
Thread.sleep(0);
} catch (InterruptedException ignored) {
private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
private final AudioFormat format;
private final InputStream input;
- private final Consumer<Boolean> setIdle;
+ private final Runnable activity;
private boolean closed = false;
- public PulseAudioStream(AudioFormat format, InputStream input, Consumer<Boolean> setIdle) {
+ public PulseAudioStream(AudioFormat format, InputStream input, Runnable activity) {
this.input = input;
this.format = format;
- this.setIdle = setIdle;
+ this.activity = activity;
}
@Override
if (closed) {
throw new IOException("Stream is closed");
}
- setIdle.accept(false);
+ activity.run();
return input.read(b, off, len);
}
@Override
public void close() throws IOException {
closed = true;
- setIdle.accept(true);
input.close();
}
};
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
protected @Nullable Socket clientSocket;
- private boolean isIdle = true;
+ private ReentrantLock countClientLock = new ReentrantLock();
+ private Integer countClient = 0;
private @Nullable ScheduledFuture<?> scheduledDisconnection;
/**
* Connect to pulseaudio with the simple protocol
+ * Will schedule an attempt for disconnection after timeout
*
* @throws IOException
* @throws InterruptedException when interrupted during the loading module wait
public void connectIfNeeded() throws IOException, InterruptedException {
Socket clientSocketLocal = clientSocket;
if (clientSocketLocal == null || !clientSocketLocal.isConnected() || clientSocketLocal.isClosed()) {
- logger.debug("Simple TCP Stream connecting");
+ logger.debug("Simple TCP Stream connecting for {}", getLabel(null));
String host = pulseaudioHandler.getHost();
int port = pulseaudioHandler.getSimpleTcpPortAndLoadModuleIfNecessary();
var clientSocketFinal = new Socket(host, port);
clientSocketFinal.setSoTimeout(pulseaudioHandler.getBasicProtocolSOTimeout());
clientSocket = clientSocketFinal;
+ scheduleDisconnectIfNoClient();
}
}
*/
public void disconnect() {
final Socket clientSocketLocal = clientSocket;
- if (clientSocketLocal != null && isIdle) {
- logger.debug("Simple TCP Stream disconnecting");
+ if (clientSocketLocal != null) {
+ logger.debug("Simple TCP Stream disconnecting for {}", getLabel(null));
try {
clientSocketLocal.close();
} catch (IOException ignored) {
}
}
- public void scheduleDisconnect() {
- var scheduledDisconnectionFinal = scheduledDisconnection;
- if (scheduledDisconnectionFinal != null) {
- scheduledDisconnectionFinal.cancel(true);
- }
- int idleTimeout = pulseaudioHandler.getIdleTimeout();
- if (idleTimeout > -1) {
- logger.debug("Scheduling disconnect");
- scheduledDisconnection = scheduler.schedule(this::disconnect, idleTimeout, TimeUnit.MILLISECONDS);
+ private void scheduleDisconnectIfNoClient() {
+ countClientLock.lock();
+ try {
+ if (countClient <= 0) {
+ var scheduledDisconnectionFinal = scheduledDisconnection;
+ if (scheduledDisconnectionFinal != null) {
+ logger.debug("Aborting next disconnect");
+ scheduledDisconnectionFinal.cancel(true);
+ }
+ int idleTimeout = pulseaudioHandler.getIdleTimeout();
+ if (idleTimeout > -1) {
+ logger.debug("Scheduling next disconnect");
+ scheduledDisconnection = scheduler.schedule(this::disconnect, idleTimeout, TimeUnit.MILLISECONDS);
+ }
+ }
+ } finally {
+ countClientLock.unlock();
}
}
return label != null ? label : pulseaudioHandler.getThing().getUID().getId();
}
- public void setIdle(boolean idle) {
- isIdle = idle;
+ protected void addClientCount() {
+ countClientLock.lock();
+ try {
+ countClient += 1;
+ logger.debug("Adding new client for pulseaudio sink/source {}. Current count: {}", getLabel(null),
+ countClient);
+ if (countClient <= 0) { // safe against misuse
+ countClient = 1;
+ }
+ var scheduledDisconnectionFinal = scheduledDisconnection;
+ if (scheduledDisconnectionFinal != null) {
+ logger.debug("Aborting next disconnect");
+ scheduledDisconnectionFinal.cancel(true);
+ }
+ } finally {
+ countClientLock.unlock();
+ }
+ }
+
+ protected void minusClientCount() {
+ countClientLock.lock();
+ countClient -= 1;
+ logger.debug("Removing client for pulseaudio sink/source {}. Current count: {}", getLabel(null), countClient);
+ if (countClient < 0) { // safe against misuse
+ countClient = 0;
+ }
+ countClientLock.unlock();
+ if (countClient <= 0) {
+ scheduleDisconnectIfNoClient();
+ }
}
}
private static int parseVolume(String vol) {
int volumeTotal = 0;
int nChannels = 0;
- for (String channel : vol.split(", ")) {
+ for (String channel : vol.split(",")) {
Matcher matcher = VOLUME_PATTERN.matcher(channel.trim());
if (matcher.find()) {
volumeTotal += Integer.valueOf(matcher.group(3));
} catch (InterruptedException i) {
logger.info("Interrupted during sink audio connection: {}", i.getMessage());
return;
- } finally {
- audioSink.scheduleDisconnect();
}
}
});
} catch (InterruptedException i) {
logger.info("Interrupted during source audio connection: {}", i.getMessage());
return;
- } finally {
- audioSource.scheduleDisconnect();
}
}
});