diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/com/mbien/opencl/util/MultiQueueBarrier.java | 42 |
1 files changed, 36 insertions, 6 deletions
diff --git a/src/com/mbien/opencl/util/MultiQueueBarrier.java b/src/com/mbien/opencl/util/MultiQueueBarrier.java index a23ca9ad..7ac45f6a 100644 --- a/src/com/mbien/opencl/util/MultiQueueBarrier.java +++ b/src/com/mbien/opencl/util/MultiQueueBarrier.java @@ -10,12 +10,14 @@ import java.util.concurrent.TimeUnit; /** * An utility for synchronizing multiple concurrent {@link CLCommandQueue}s. + * This Barrier can be reused after it has been broken. * @author Michael Bien */ public class MultiQueueBarrier { private CountDownLatch latch; private final Set<CLCommandQueue> queues; + private final int count; /** * Creates a new MultiQueueBarrier with the given queueCount. @@ -23,15 +25,23 @@ public class MultiQueueBarrier { * which restricts the set of allowed queues for the barrier. */ public MultiQueueBarrier(int queueCount) { + if(queueCount == 0) { + throw new IllegalArgumentException("queueCount was 0"); + } this.latch = new CountDownLatch(queueCount); this.queues = null; + this.count = queueCount; } /** * Creates a new MultiQueueBarrier for the given queues. */ public MultiQueueBarrier(CLCommandQueue... allowedQueues) { + if(allowedQueues.length == 0) { + throw new IllegalArgumentException("allowedQueues was empty"); + } this.latch = new CountDownLatch(allowedQueues.length); + this.count = allowedQueues.length; HashSet<CLCommandQueue> set = new HashSet<CLCommandQueue>(allowedQueues.length); for (CLCommandQueue queue : allowedQueues) { @@ -49,7 +59,9 @@ public class MultiQueueBarrier { checkQueue(queue); queue.putBarrier(); - latch.countDown(); + synchronized(this) { + latch.countDown(); + } return this; } @@ -62,7 +74,9 @@ public class MultiQueueBarrier { checkQueue(queue); queue.putWaitForEvents(events, true); - latch.countDown(); + synchronized(this) { + latch.countDown(); + } return this; } @@ -73,6 +87,7 @@ public class MultiQueueBarrier { */ public MultiQueueBarrier await() throws InterruptedException { latch.await(); + rebuildBarrierIfBroken(); return this; } @@ -83,15 +98,30 @@ public class MultiQueueBarrier { */ public MultiQueueBarrier await(long timeout, TimeUnit unit) throws InterruptedException { latch.await(timeout, unit); + rebuildBarrierIfBroken(); return this; } /** - * Cancels this barrier and unblocks all waiting threads. + * Resets this barrier and unblocks all waiting threads. */ - public void cancelBarrier() { - while(latch.getCount() > 0) - latch.countDown(); + public void resetBarrier() { + synchronized(this) { + while(latch.getCount() > 0) { + latch.countDown(); + } + // thats OK. Another Thread can not rebuild the barrier since we have the lock. + // we have to rebuid it here in case there was no thread waiting. + latch = new CountDownLatch(count); + } + } + + private void rebuildBarrierIfBroken() { + synchronized (this) { + if (latch.getCount() == 0) { + latch = new CountDownLatch(count); + } + } } /** |