From b801f7cad5c1600656ca9205452b01e6a8b192a4 Mon Sep 17 00:00:00 2001 From: Greg Date: Mon, 12 Dec 2016 17:54:22 -0800 Subject: Fix deadlock in AudioFifo. The deadlock scenario is the following: 1) AudioFifo is used with wait on write and read 2) Producer Thread calls write(). The buffer is full, then it gets into the while(available() == buffer.length) loop 3) a context switch happens right there, BEFORE readSemaphore.wait() 4) Consumer Thread calls read() multiple times until it depletes the buffer, then ends up in writeSemaphore.wait() 5) context switch back to the Producer Thread, which now calls readSemaphore.wait() Deadlock: the buffer is empty, and nobody is going to signal the producer that there is availability. This can be reproduced with a simple stress test. I added the stress test which is simply a copy of an existing one, with a very large value for the chunk variable. The race condition is more likely to be hit when the buffer is small, but I have hit it in "production" with 32K buffers, while generating large files (a few hundred megabytes). I am not sure the performance implications of the change, as my use cases are non-realtime. Still, all the tests pass. --- src/com/jsyn/io/AudioFifo.java | 65 ++++++++++++++++++++++++------------- tests/com/jsyn/engine/TestFifo.java | 31 ++++++++++++++++++ 2 files changed, 73 insertions(+), 23 deletions(-) diff --git a/src/com/jsyn/io/AudioFifo.java b/src/com/jsyn/io/AudioFifo.java index 43f16d3..889bacc 100644 --- a/src/com/jsyn/io/AudioFifo.java +++ b/src/com/jsyn/io/AudioFifo.java @@ -16,6 +16,10 @@ package com.jsyn.io; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + /** * FIFO that implements AudioInputStream, AudioOutputStream interfaces. This can be used to send * audio data between different threads. The reads or writes may or may not wait based on flags. @@ -33,8 +37,9 @@ public class AudioFifo implements AudioInputStream, AudioOutputStream { private int sizeMask; private boolean writeWaitEnabled = true; private boolean readWaitEnabled = true; - private Object writeSemaphore = new Object(); - private Object readSemaphore = new Object(); + final Lock lock = new ReentrantLock(); + final Condition notFull = lock.newCondition(); + final Condition notEmpty = lock.newCondition(); /** * @param size Number of doubles in the FIFO. Must be a power of 2. Eg. 1024. @@ -70,62 +75,76 @@ public class AudioFifo implements AudioInputStream, AudioOutputStream { @Override public double read() { double value = Double.NaN; - if (readWaitEnabled) { + if (readWaitEnabled) { + lock.lock(); try { - while (available() < 1) { - synchronized (writeSemaphore) { - writeSemaphore.wait(); - } + while (available() < 1) { + try { + notEmpty.await(); + } catch (InterruptedException e) { + return Double.NaN; } - value = readOneInternal(); - } catch (InterruptedException e) { + } + value = readOneInternal(); + } finally { + lock.unlock(); } + } else { if (readIndex != writeIndex) { value = readOneInternal(); } } + + if (writeWaitEnabled) { + lock.lock(); + notFull.signal(); + lock.unlock(); + } + return value; } private double readOneInternal() { double value = buffer[readIndex & accessMask]; readIndex = (readIndex + 1) & sizeMask; - if (writeWaitEnabled) { - synchronized (readSemaphore) { - readSemaphore.notify(); - } - } return value; } @Override public void write(double value) { if (writeWaitEnabled) { + lock.lock(); try { - while (available() == buffer.length) { - synchronized (readSemaphore) { - readSemaphore.wait(); + while (available() == buffer.length) + { + try { + notFull.await(); + } catch (InterruptedException e) { + return; // Silently fail } } writeOneInternal(value); - } catch (InterruptedException e) { + } finally { + lock.unlock(); } + } else { if (available() != buffer.length) { writeOneInternal(value); } } + + if (readWaitEnabled) { + lock.lock(); + notEmpty.signal(); + lock.unlock(); + } } private void writeOneInternal(double value) { buffer[writeIndex & accessMask] = value; writeIndex = (writeIndex + 1) & sizeMask; - if (readWaitEnabled) { - synchronized (writeSemaphore) { - writeSemaphore.notify(); - } - } } @Override diff --git a/tests/com/jsyn/engine/TestFifo.java b/tests/com/jsyn/engine/TestFifo.java index e504a0b..cc539d1 100644 --- a/tests/com/jsyn/engine/TestFifo.java +++ b/tests/com/jsyn/engine/TestFifo.java @@ -216,4 +216,35 @@ public class TestFifo extends TestCase { watchdog.interrupt(); } + + public void testBlockReadAndWriteWaitStress() { + final int chunk = 10000000; // 10 Megabytes + final AudioFifo fifo = new AudioFifo(); + fifo.allocate(8); + + fifo.setWriteWaitEnabled(true); + fifo.setReadWaitEnabled(true); + final double value = 50.0; + + // Schedule a delayed write in another thread. + new Thread() { + @Override + public void run() { + try { + sleep(200); + for (int i = 0; i < chunk; i++) { + fifo.write(value + i); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }.start(); + + Thread watchdog = startWatchdog(10000); + for (int i = 0; i < chunk; i++) { + assertEquals("reading back data", value + i, fifo.read()); + } + watchdog.interrupt(); + } } -- cgit v1.2.3