diff options
-rw-r--r-- | src/com/jsyn/io/AudioFifo.java | 65 | ||||
-rw-r--r-- | tests/com/jsyn/engine/TestFifo.java | 31 |
2 files changed, 73 insertions, 23 deletions
diff --git a/src/com/jsyn/io/AudioFifo.java b/src/com/jsyn/io/AudioFifo.java index 43f16d3..889bacc 100644 --- a/src/com/jsyn/io/AudioFifo.java +++ b/src/com/jsyn/io/AudioFifo.java @@ -16,6 +16,10 @@ package com.jsyn.io; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + /** * FIFO that implements AudioInputStream, AudioOutputStream interfaces. This can be used to send * audio data between different threads. The reads or writes may or may not wait based on flags. @@ -33,8 +37,9 @@ public class AudioFifo implements AudioInputStream, AudioOutputStream { private int sizeMask; private boolean writeWaitEnabled = true; private boolean readWaitEnabled = true; - private Object writeSemaphore = new Object(); - private Object readSemaphore = new Object(); + final Lock lock = new ReentrantLock(); + final Condition notFull = lock.newCondition(); + final Condition notEmpty = lock.newCondition(); /** * @param size Number of doubles in the FIFO. Must be a power of 2. Eg. 1024. @@ -70,62 +75,76 @@ public class AudioFifo implements AudioInputStream, AudioOutputStream { @Override public double read() { double value = Double.NaN; - if (readWaitEnabled) { + if (readWaitEnabled) { + lock.lock(); try { - while (available() < 1) { - synchronized (writeSemaphore) { - writeSemaphore.wait(); - } + while (available() < 1) { + try { + notEmpty.await(); + } catch (InterruptedException e) { + return Double.NaN; } - value = readOneInternal(); - } catch (InterruptedException e) { + } + value = readOneInternal(); + } finally { + lock.unlock(); } + } else { if (readIndex != writeIndex) { value = readOneInternal(); } } + + if (writeWaitEnabled) { + lock.lock(); + notFull.signal(); + lock.unlock(); + } + return value; } private double readOneInternal() { double value = buffer[readIndex & accessMask]; readIndex = (readIndex + 1) & sizeMask; - if (writeWaitEnabled) { - synchronized (readSemaphore) { - readSemaphore.notify(); - } - } return value; } @Override public void write(double value) { if (writeWaitEnabled) { + lock.lock(); try { - while (available() == buffer.length) { - synchronized (readSemaphore) { - readSemaphore.wait(); + while (available() == buffer.length) + { + try { + notFull.await(); + } catch (InterruptedException e) { + return; // Silently fail } } writeOneInternal(value); - } catch (InterruptedException e) { + } finally { + lock.unlock(); } + } else { if (available() != buffer.length) { writeOneInternal(value); } } + + if (readWaitEnabled) { + lock.lock(); + notEmpty.signal(); + lock.unlock(); + } } private void writeOneInternal(double value) { buffer[writeIndex & accessMask] = value; writeIndex = (writeIndex + 1) & sizeMask; - if (readWaitEnabled) { - synchronized (writeSemaphore) { - writeSemaphore.notify(); - } - } } @Override diff --git a/tests/com/jsyn/engine/TestFifo.java b/tests/com/jsyn/engine/TestFifo.java index e504a0b..cc539d1 100644 --- a/tests/com/jsyn/engine/TestFifo.java +++ b/tests/com/jsyn/engine/TestFifo.java @@ -216,4 +216,35 @@ public class TestFifo extends TestCase { watchdog.interrupt(); } + + public void testBlockReadAndWriteWaitStress() { + final int chunk = 10000000; // 10 Megabytes + final AudioFifo fifo = new AudioFifo(); + fifo.allocate(8); + + fifo.setWriteWaitEnabled(true); + fifo.setReadWaitEnabled(true); + final double value = 50.0; + + // Schedule a delayed write in another thread. + new Thread() { + @Override + public void run() { + try { + sleep(200); + for (int i = 0; i < chunk; i++) { + fifo.write(value + i); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }.start(); + + Thread watchdog = startWatchdog(10000); + for (int i = 0; i < chunk; i++) { + assertEquals("reading back data", value + i, fifo.read()); + } + watchdog.interrupt(); + } } |