import java.net.Socket;
import java.time.Duration;
import java.time.Instant;
-import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import javax.sound.sampled.UnsupportedAudioFileException;
import org.openhab.core.audio.AudioFormat;
import org.openhab.core.audio.AudioSink;
import org.openhab.core.audio.AudioStream;
-import org.openhab.core.audio.FixedLengthAudioStream;
+import org.openhab.core.audio.FileAudioStream;
import org.openhab.core.audio.UnsupportedAudioFormatException;
import org.openhab.core.audio.UnsupportedAudioStreamException;
+import org.openhab.core.audio.utils.AudioSinkUtils;
+import org.openhab.core.common.Disposable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSink.class);
- private static final HashSet<AudioFormat> SUPPORTED_FORMATS = new HashSet<>();
- private static final HashSet<Class<? extends AudioStream>> SUPPORTED_STREAMS = new HashSet<>();
+ private AudioSinkUtils audioSinkUtils;
- static {
- SUPPORTED_FORMATS.add(AudioFormat.WAV);
- SUPPORTED_FORMATS.add(AudioFormat.MP3);
- SUPPORTED_STREAMS.add(FixedLengthAudioStream.class);
- }
+ private static final Set<AudioFormat> SUPPORTED_FORMATS = Set.of(AudioFormat.WAV, AudioFormat.MP3);
+ private static final Set<Class<? extends AudioStream>> SUPPORTED_STREAMS = Set.of(AudioStream.class);
+ private static final AudioFormat TARGET_FORMAT = new AudioFormat(AudioFormat.CONTAINER_WAVE,
+ AudioFormat.CODEC_PCM_SIGNED, false, 16, 4 * 44100, 44100L, 2);
- public PulseAudioAudioSink(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler) {
+ public PulseAudioAudioSink(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler,
+ AudioSinkUtils audioSinkUtils) {
super(pulseaudioHandler, scheduler);
+ this.audioSinkUtils = audioSinkUtils;
}
@Override
public void process(@Nullable AudioStream audioStream)
throws UnsupportedAudioFormatException, UnsupportedAudioStreamException {
+ processAndComplete(audioStream);
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Void> processAndComplete(@Nullable AudioStream audioStream) {
if (audioStream == null) {
- return;
+ return CompletableFuture.completedFuture(null);
}
addClientCount();
try (ConvertedInputStream normalizedPCMStream = new ConvertedInputStream(audioStream)) {
if (clientSocketLocal != null) {
// send raw audio to the socket and to pulse audio
Instant start = Instant.now();
- normalizedPCMStream.transferTo(clientSocketLocal.getOutputStream());
- if (normalizedPCMStream.getDuration() != -1) { // ensure, if the sound has a duration
+ if (normalizedPCMStream.getDuration() != -1) {
+ // ensure, if the sound has a duration
// that we let at least this time for the system to play
+ normalizedPCMStream.transferTo(clientSocketLocal.getOutputStream());
Instant end = Instant.now();
long millisSecondTimedToSendAudioData = Duration.between(start, end).toMillis();
if (millisSecondTimedToSendAudioData < normalizedPCMStream.getDuration()) {
- long timeToSleep = normalizedPCMStream.getDuration() - millisSecondTimedToSendAudioData;
- logger.debug("Sleep time to let the system play sound : {}", timeToSleep);
- Thread.sleep(timeToSleep);
+ CompletableFuture<@Nullable Void> soundPlayed = new CompletableFuture<>();
+ long timeToWait = normalizedPCMStream.getDuration() - millisSecondTimedToSendAudioData;
+ logger.debug("Some time to let the system play sound : {}", timeToWait);
+ scheduler.schedule(() -> soundPlayed.complete(null), timeToWait, TimeUnit.MILLISECONDS);
+ return soundPlayed;
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ } else {
+ // We have a second method available to guess the duration, and it is during transfer
+ Long timeStampEnd = audioSinkUtils.transferAndAnalyzeLength(normalizedPCMStream,
+ clientSocketLocal.getOutputStream(), TARGET_FORMAT);
+ CompletableFuture<@Nullable Void> soundPlayed = new CompletableFuture<>();
+ if (timeStampEnd != null) {
+ long now = System.nanoTime();
+ long timeToWait = timeStampEnd - now;
+ if (timeToWait > 0) {
+ scheduler.schedule(() -> soundPlayed.complete(null), timeToWait,
+ TimeUnit.NANOSECONDS);
+ }
+ return soundPlayed;
+ } else {
+ return CompletableFuture.completedFuture(null);
}
}
- break;
}
} catch (IOException e) {
disconnect(); // disconnect force to clear connection in case of socket not cleanly shutdown
logger.warn(
"Error while trying to send audio to pulseaudio audio sink. Cannot connect to {}:{}, error: {}",
pulseaudioHandler.getHost(), port, e.getMessage());
- break;
+ return CompletableFuture.completedFuture(null);
}
} catch (InterruptedException ie) {
logger.info("Interrupted during sink audio connection: {}", ie.getMessage());
- break;
+ return CompletableFuture.completedFuture(null);
}
}
- } catch (UnsupportedAudioFileException | IOException e) {
- throw new UnsupportedAudioFormatException("Cannot send sound to the pulseaudio sink",
- audioStream.getFormat(), e);
+ } catch (UnsupportedAudioFileException | UnsupportedAudioFormatException | IOException e) {
+ return CompletableFuture.failedFuture(new UnsupportedAudioFormatException(
+ "Cannot send sound to the pulseaudio sink", audioStream.getFormat(), e));
} finally {
minusClientCount();
+ // if the stream is not needed anymore, then we should call back the AudioStream to let it a chance
+ // to auto dispose.
+ if (audioStream instanceof Disposable disposableAudioStream) {
+ try {
+ disposableAudioStream.dispose();
+ } catch (IOException e) {
+ String fileName = audioStream instanceof FileAudioStream file ? file.toString() : "unknown";
+ if (logger.isDebugEnabled()) {
+ logger.debug("Cannot dispose of stream {}", fileName, e);
+ } else {
+ logger.warn("Cannot dispose of stream {}, reason {}", fileName, e.getMessage());
+ }
+ }
+ }
}
+ return CompletableFuture.completedFuture(null);
}
@Override
import org.openhab.binding.pulseaudio.internal.discovery.PulseaudioDeviceDiscoveryService;
import org.openhab.binding.pulseaudio.internal.handler.PulseaudioBridgeHandler;
import org.openhab.binding.pulseaudio.internal.handler.PulseaudioHandler;
+import org.openhab.core.audio.utils.AudioSinkUtils;
import org.openhab.core.config.core.Configuration;
import org.openhab.core.config.discovery.DiscoveryService;
import org.openhab.core.thing.Bridge;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private PulseAudioBindingConfiguration configuration = new PulseAudioBindingConfiguration();
+ private AudioSinkUtils audioSinkUtils;
+
+ @Activate
+ public PulseaudioHandlerFactory(@Reference AudioSinkUtils audioSinkUtils) {
+ this.audioSinkUtils = audioSinkUtils;
+ }
+
@Override
public boolean supportsThingType(ThingTypeUID thingTypeUID) {
return SUPPORTED_THING_TYPES_UIDS.contains(thingTypeUID);
registerDeviceDiscoveryService(handler);
return handler;
} else if (PulseaudioHandler.SUPPORTED_THING_TYPES_UIDS.contains(thingTypeUID)) {
- return new PulseaudioHandler(thing, bundleContext);
+ return new PulseaudioHandler(thing, bundleContext, audioSinkUtils);
}
return null;