]> git.basschouten.com Git - openhab-addons.git/commitdiff
[pulseaudio] Make the process method asynchronous (#15179)
authorGwendal Roulleau <dalgwen@users.noreply.github.com>
Thu, 6 Jul 2023 17:21:12 +0000 (19:21 +0200)
committerGitHub <noreply@github.com>
Thu, 6 Jul 2023 17:21:12 +0000 (19:21 +0200)
* [pulseaudio] Make the process method asynchronous

And use the new 'complete' system to signal core that the sound is fully played.

---------

Signed-off-by: Gwendal Roulleau <gwendal.roulleau@gmail.com>
bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/ConvertedInputStream.java
bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSink.java
bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseaudioHandlerFactory.java
bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/handler/PulseaudioHandler.java

index cdda35fa688c99cdcba6bf2828e0bf304f9a9ec1..d96512562ac77a4f653fc95bb9d65c6dd26a148a 100644 (file)
@@ -28,7 +28,7 @@ import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
 import org.openhab.core.audio.AudioFormat;
 import org.openhab.core.audio.AudioStream;
-import org.openhab.core.audio.FixedLengthAudioStream;
+import org.openhab.core.audio.SizeableAudioStream;
 import org.openhab.core.audio.UnsupportedAudioFormatException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,8 +58,8 @@ public class ConvertedInputStream extends InputStream {
             throws UnsupportedAudioFormatException, UnsupportedAudioFileException, IOException {
         this.audioFormat = innerInputStream.getFormat();
 
-        if (innerInputStream instanceof FixedLengthAudioStream) {
-            length = ((FixedLengthAudioStream) innerInputStream).length();
+        if (innerInputStream instanceof SizeableAudioStream sizeableAudioStream) {
+            length = sizeableAudioStream.length();
         }
 
         pcmNormalizedInputStream = getPCMStreamNormalized(getPCMStream(new BufferedInputStream(innerInputStream)));
index 077bb5881a4642530ab9463bd39ffacad296e249..b9bd298caaf8c0dcaa0be88a805f1957d223d222 100644 (file)
@@ -16,9 +16,10 @@ import java.io.IOException;
 import java.net.Socket;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import javax.sound.sampled.UnsupportedAudioFileException;
 
@@ -28,9 +29,11 @@ import org.openhab.binding.pulseaudio.internal.handler.PulseaudioHandler;
 import org.openhab.core.audio.AudioFormat;
 import org.openhab.core.audio.AudioSink;
 import org.openhab.core.audio.AudioStream;
-import org.openhab.core.audio.FixedLengthAudioStream;
+import org.openhab.core.audio.FileAudioStream;
 import org.openhab.core.audio.UnsupportedAudioFormatException;
 import org.openhab.core.audio.UnsupportedAudioStreamException;
+import org.openhab.core.audio.utils.AudioSinkUtils;
+import org.openhab.core.common.Disposable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,24 +50,29 @@ public class PulseAudioAudioSink extends PulseaudioSimpleProtocolStream implemen
 
     private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSink.class);
 
-    private static final HashSet<AudioFormat> SUPPORTED_FORMATS = new HashSet<>();
-    private static final HashSet<Class<? extends AudioStream>> SUPPORTED_STREAMS = new HashSet<>();
+    private AudioSinkUtils audioSinkUtils;
 
-    static {
-        SUPPORTED_FORMATS.add(AudioFormat.WAV);
-        SUPPORTED_FORMATS.add(AudioFormat.MP3);
-        SUPPORTED_STREAMS.add(FixedLengthAudioStream.class);
-    }
+    private static final Set<AudioFormat> SUPPORTED_FORMATS = Set.of(AudioFormat.WAV, AudioFormat.MP3);
+    private static final Set<Class<? extends AudioStream>> SUPPORTED_STREAMS = Set.of(AudioStream.class);
+    private static final AudioFormat TARGET_FORMAT = new AudioFormat(AudioFormat.CONTAINER_WAVE,
+            AudioFormat.CODEC_PCM_SIGNED, false, 16, 4 * 44100, 44100L, 2);
 
-    public PulseAudioAudioSink(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler) {
+    public PulseAudioAudioSink(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler,
+            AudioSinkUtils audioSinkUtils) {
         super(pulseaudioHandler, scheduler);
+        this.audioSinkUtils = audioSinkUtils;
     }
 
     @Override
     public void process(@Nullable AudioStream audioStream)
             throws UnsupportedAudioFormatException, UnsupportedAudioStreamException {
+        processAndComplete(audioStream);
+    }
+
+    @Override
+    public CompletableFuture<@Nullable Void> processAndComplete(@Nullable AudioStream audioStream) {
         if (audioStream == null) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
         addClientCount();
         try (ConvertedInputStream normalizedPCMStream = new ConvertedInputStream(audioStream)) {
@@ -75,18 +83,38 @@ public class PulseAudioAudioSink extends PulseaudioSimpleProtocolStream implemen
                     if (clientSocketLocal != null) {
                         // send raw audio to the socket and to pulse audio
                         Instant start = Instant.now();
-                        normalizedPCMStream.transferTo(clientSocketLocal.getOutputStream());
-                        if (normalizedPCMStream.getDuration() != -1) { // ensure, if the sound has a duration
+                        if (normalizedPCMStream.getDuration() != -1) {
+                            // ensure, if the sound has a duration
                             // that we let at least this time for the system to play
+                            normalizedPCMStream.transferTo(clientSocketLocal.getOutputStream());
                             Instant end = Instant.now();
                             long millisSecondTimedToSendAudioData = Duration.between(start, end).toMillis();
                             if (millisSecondTimedToSendAudioData < normalizedPCMStream.getDuration()) {
-                                long timeToSleep = normalizedPCMStream.getDuration() - millisSecondTimedToSendAudioData;
-                                logger.debug("Sleep time to let the system play sound : {}", timeToSleep);
-                                Thread.sleep(timeToSleep);
+                                CompletableFuture<@Nullable Void> soundPlayed = new CompletableFuture<>();
+                                long timeToWait = normalizedPCMStream.getDuration() - millisSecondTimedToSendAudioData;
+                                logger.debug("Some time to let the system play sound : {}", timeToWait);
+                                scheduler.schedule(() -> soundPlayed.complete(null), timeToWait, TimeUnit.MILLISECONDS);
+                                return soundPlayed;
+                            } else {
+                                return CompletableFuture.completedFuture(null);
+                            }
+                        } else {
+                            // We have a second method available to guess the duration, and it is during transfer
+                            Long timeStampEnd = audioSinkUtils.transferAndAnalyzeLength(normalizedPCMStream,
+                                    clientSocketLocal.getOutputStream(), TARGET_FORMAT);
+                            CompletableFuture<@Nullable Void> soundPlayed = new CompletableFuture<>();
+                            if (timeStampEnd != null) {
+                                long now = System.nanoTime();
+                                long timeToWait = timeStampEnd - now;
+                                if (timeToWait > 0) {
+                                    scheduler.schedule(() -> soundPlayed.complete(null), timeToWait,
+                                            TimeUnit.NANOSECONDS);
+                                }
+                                return soundPlayed;
+                            } else {
+                                return CompletableFuture.completedFuture(null);
                             }
                         }
-                        break;
                     }
                 } catch (IOException e) {
                     disconnect(); // disconnect force to clear connection in case of socket not cleanly shutdown
@@ -97,19 +125,34 @@ public class PulseAudioAudioSink extends PulseaudioSimpleProtocolStream implemen
                         logger.warn(
                                 "Error while trying to send audio to pulseaudio audio sink. Cannot connect to {}:{}, error: {}",
                                 pulseaudioHandler.getHost(), port, e.getMessage());
-                        break;
+                        return CompletableFuture.completedFuture(null);
                     }
                 } catch (InterruptedException ie) {
                     logger.info("Interrupted during sink audio connection: {}", ie.getMessage());
-                    break;
+                    return CompletableFuture.completedFuture(null);
                 }
             }
-        } catch (UnsupportedAudioFileException | IOException e) {
-            throw new UnsupportedAudioFormatException("Cannot send sound to the pulseaudio sink",
-                    audioStream.getFormat(), e);
+        } catch (UnsupportedAudioFileException | UnsupportedAudioFormatException | IOException e) {
+            return CompletableFuture.failedFuture(new UnsupportedAudioFormatException(
+                    "Cannot send sound to the pulseaudio sink", audioStream.getFormat(), e));
         } finally {
             minusClientCount();
+            // if the stream is not needed anymore, then we should call back the AudioStream to let it a chance
+            // to auto dispose.
+            if (audioStream instanceof Disposable disposableAudioStream) {
+                try {
+                    disposableAudioStream.dispose();
+                } catch (IOException e) {
+                    String fileName = audioStream instanceof FileAudioStream file ? file.toString() : "unknown";
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Cannot dispose of stream {}", fileName, e);
+                    } else {
+                        logger.warn("Cannot dispose of stream {}, reason {}", fileName, e.getMessage());
+                    }
+                }
+            }
         }
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
index 2d3d9ab4122171d692fec0068d7e557835e5cba2..73952f2494aade91f217a7c3f37a7a2688869c9c 100644 (file)
@@ -25,6 +25,7 @@ import org.eclipse.jdt.annotation.Nullable;
 import org.openhab.binding.pulseaudio.internal.discovery.PulseaudioDeviceDiscoveryService;
 import org.openhab.binding.pulseaudio.internal.handler.PulseaudioBridgeHandler;
 import org.openhab.binding.pulseaudio.internal.handler.PulseaudioHandler;
+import org.openhab.core.audio.utils.AudioSinkUtils;
 import org.openhab.core.config.core.Configuration;
 import org.openhab.core.config.discovery.DiscoveryService;
 import org.openhab.core.thing.Bridge;
@@ -39,6 +40,7 @@ import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,6 +63,13 @@ public class PulseaudioHandlerFactory extends BaseThingHandlerFactory {
 
     private PulseAudioBindingConfiguration configuration = new PulseAudioBindingConfiguration();
 
+    private AudioSinkUtils audioSinkUtils;
+
+    @Activate
+    public PulseaudioHandlerFactory(@Reference AudioSinkUtils audioSinkUtils) {
+        this.audioSinkUtils = audioSinkUtils;
+    }
+
     @Override
     public boolean supportsThingType(ThingTypeUID thingTypeUID) {
         return SUPPORTED_THING_TYPES_UIDS.contains(thingTypeUID);
@@ -119,7 +128,7 @@ public class PulseaudioHandlerFactory extends BaseThingHandlerFactory {
             registerDeviceDiscoveryService(handler);
             return handler;
         } else if (PulseaudioHandler.SUPPORTED_THING_TYPES_UIDS.contains(thingTypeUID)) {
-            return new PulseaudioHandler(thing, bundleContext);
+            return new PulseaudioHandler(thing, bundleContext, audioSinkUtils);
         }
 
         return null;
index 4472ca39ad1766f64b6cf9c53eac15bd4854bf12..9658e5741215bcdb7a6c717e5cace604320e66d5 100644 (file)
@@ -40,6 +40,7 @@ import org.openhab.binding.pulseaudio.internal.items.Source;
 import org.openhab.core.audio.AudioFormat;
 import org.openhab.core.audio.AudioSink;
 import org.openhab.core.audio.AudioSource;
+import org.openhab.core.audio.utils.AudioSinkUtils;
 import org.openhab.core.config.core.Configuration;
 import org.openhab.core.library.types.DecimalType;
 import org.openhab.core.library.types.IncreaseDecreaseType;
@@ -89,9 +90,12 @@ public class PulseaudioHandler extends BaseThingHandler {
 
     private final BundleContext bundleContext;
 
-    public PulseaudioHandler(Thing thing, BundleContext bundleContext) {
+    private AudioSinkUtils audioSinkUtils;
+
+    public PulseaudioHandler(Thing thing, BundleContext bundleContext, AudioSinkUtils audioSinkUtils) {
         super(thing);
         this.bundleContext = bundleContext;
+        this.audioSinkUtils = audioSinkUtils;
     }
 
     @Override
@@ -127,7 +131,7 @@ public class PulseaudioHandler extends BaseThingHandler {
             return;
         }
         final PulseaudioHandler thisHandler = this;
-        PulseAudioAudioSink audioSink = new PulseAudioAudioSink(thisHandler, scheduler);
+        PulseAudioAudioSink audioSink = new PulseAudioAudioSink(thisHandler, scheduler, audioSinkUtils);
         scheduler.submit(new Runnable() {
             @Override
             public void run() {