aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/com/jsyn/io/AudioFifo.java65
-rw-r--r--tests/com/jsyn/engine/TestFifo.java31
2 files changed, 73 insertions, 23 deletions
diff --git a/src/com/jsyn/io/AudioFifo.java b/src/com/jsyn/io/AudioFifo.java
index 43f16d3..889bacc 100644
--- a/src/com/jsyn/io/AudioFifo.java
+++ b/src/com/jsyn/io/AudioFifo.java
@@ -16,6 +16,10 @@
package com.jsyn.io;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
/**
* FIFO that implements AudioInputStream, AudioOutputStream interfaces. This can be used to send
* audio data between different threads. The reads or writes may or may not wait based on flags.
@@ -33,8 +37,9 @@ public class AudioFifo implements AudioInputStream, AudioOutputStream {
private int sizeMask;
private boolean writeWaitEnabled = true;
private boolean readWaitEnabled = true;
- private Object writeSemaphore = new Object();
- private Object readSemaphore = new Object();
+ final Lock lock = new ReentrantLock();
+ final Condition notFull = lock.newCondition();
+ final Condition notEmpty = lock.newCondition();
/**
* @param size Number of doubles in the FIFO. Must be a power of 2. Eg. 1024.
@@ -70,62 +75,76 @@ public class AudioFifo implements AudioInputStream, AudioOutputStream {
@Override
public double read() {
double value = Double.NaN;
- if (readWaitEnabled) {
+ if (readWaitEnabled) {
+ lock.lock();
try {
- while (available() < 1) {
- synchronized (writeSemaphore) {
- writeSemaphore.wait();
- }
+ while (available() < 1) {
+ try {
+ notEmpty.await();
+ } catch (InterruptedException e) {
+ return Double.NaN;
}
- value = readOneInternal();
- } catch (InterruptedException e) {
+ }
+ value = readOneInternal();
+ } finally {
+ lock.unlock();
}
+
} else {
if (readIndex != writeIndex) {
value = readOneInternal();
}
}
+
+ if (writeWaitEnabled) {
+ lock.lock();
+ notFull.signal();
+ lock.unlock();
+ }
+
return value;
}
private double readOneInternal() {
double value = buffer[readIndex & accessMask];
readIndex = (readIndex + 1) & sizeMask;
- if (writeWaitEnabled) {
- synchronized (readSemaphore) {
- readSemaphore.notify();
- }
- }
return value;
}
@Override
public void write(double value) {
if (writeWaitEnabled) {
+ lock.lock();
try {
- while (available() == buffer.length) {
- synchronized (readSemaphore) {
- readSemaphore.wait();
+ while (available() == buffer.length)
+ {
+ try {
+ notFull.await();
+ } catch (InterruptedException e) {
+ return; // Silently fail
}
}
writeOneInternal(value);
- } catch (InterruptedException e) {
+ } finally {
+ lock.unlock();
}
+
} else {
if (available() != buffer.length) {
writeOneInternal(value);
}
}
+
+ if (readWaitEnabled) {
+ lock.lock();
+ notEmpty.signal();
+ lock.unlock();
+ }
}
private void writeOneInternal(double value) {
buffer[writeIndex & accessMask] = value;
writeIndex = (writeIndex + 1) & sizeMask;
- if (readWaitEnabled) {
- synchronized (writeSemaphore) {
- writeSemaphore.notify();
- }
- }
}
@Override
diff --git a/tests/com/jsyn/engine/TestFifo.java b/tests/com/jsyn/engine/TestFifo.java
index e504a0b..cc539d1 100644
--- a/tests/com/jsyn/engine/TestFifo.java
+++ b/tests/com/jsyn/engine/TestFifo.java
@@ -216,4 +216,35 @@ public class TestFifo extends TestCase {
watchdog.interrupt();
}
+
+ public void testBlockReadAndWriteWaitStress() {
+ final int chunk = 10000000; // 10 Megabytes
+ final AudioFifo fifo = new AudioFifo();
+ fifo.allocate(8);
+
+ fifo.setWriteWaitEnabled(true);
+ fifo.setReadWaitEnabled(true);
+ final double value = 50.0;
+
+ // Schedule a delayed write in another thread.
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ sleep(200);
+ for (int i = 0; i < chunk; i++) {
+ fifo.write(value + i);
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }.start();
+
+ Thread watchdog = startWatchdog(10000);
+ for (int i = 0; i < chunk; i++) {
+ assertEquals("reading back data", value + i, fifo.read());
+ }
+ watchdog.interrupt();
+ }
}