aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/com/mbien/opencl/util/MultiQueueBarrier.java42
1 files changed, 36 insertions, 6 deletions
diff --git a/src/com/mbien/opencl/util/MultiQueueBarrier.java b/src/com/mbien/opencl/util/MultiQueueBarrier.java
index a23ca9ad..7ac45f6a 100644
--- a/src/com/mbien/opencl/util/MultiQueueBarrier.java
+++ b/src/com/mbien/opencl/util/MultiQueueBarrier.java
@@ -10,12 +10,14 @@ import java.util.concurrent.TimeUnit;
/**
* An utility for synchronizing multiple concurrent {@link CLCommandQueue}s.
+ * This Barrier can be reused after it has been broken.
* @author Michael Bien
*/
public class MultiQueueBarrier {
private CountDownLatch latch;
private final Set<CLCommandQueue> queues;
+ private final int count;
/**
* Creates a new MultiQueueBarrier with the given queueCount.
@@ -23,15 +25,23 @@ public class MultiQueueBarrier {
* which restricts the set of allowed queues for the barrier.
*/
public MultiQueueBarrier(int queueCount) {
+ if(queueCount == 0) {
+ throw new IllegalArgumentException("queueCount was 0");
+ }
this.latch = new CountDownLatch(queueCount);
this.queues = null;
+ this.count = queueCount;
}
/**
* Creates a new MultiQueueBarrier for the given queues.
*/
public MultiQueueBarrier(CLCommandQueue... allowedQueues) {
+ if(allowedQueues.length == 0) {
+ throw new IllegalArgumentException("allowedQueues was empty");
+ }
this.latch = new CountDownLatch(allowedQueues.length);
+ this.count = allowedQueues.length;
HashSet<CLCommandQueue> set = new HashSet<CLCommandQueue>(allowedQueues.length);
for (CLCommandQueue queue : allowedQueues) {
@@ -49,7 +59,9 @@ public class MultiQueueBarrier {
checkQueue(queue);
queue.putBarrier();
- latch.countDown();
+ synchronized(this) {
+ latch.countDown();
+ }
return this;
}
@@ -62,7 +74,9 @@ public class MultiQueueBarrier {
checkQueue(queue);
queue.putWaitForEvents(events, true);
- latch.countDown();
+ synchronized(this) {
+ latch.countDown();
+ }
return this;
}
@@ -73,6 +87,7 @@ public class MultiQueueBarrier {
*/
public MultiQueueBarrier await() throws InterruptedException {
latch.await();
+ rebuildBarrierIfBroken();
return this;
}
@@ -83,15 +98,30 @@ public class MultiQueueBarrier {
*/
public MultiQueueBarrier await(long timeout, TimeUnit unit) throws InterruptedException {
latch.await(timeout, unit);
+ rebuildBarrierIfBroken();
return this;
}
/**
- * Cancels this barrier and unblocks all waiting threads.
+ * Resets this barrier and unblocks all waiting threads.
*/
- public void cancelBarrier() {
- while(latch.getCount() > 0)
- latch.countDown();
+ public void resetBarrier() {
+ synchronized(this) {
+ while(latch.getCount() > 0) {
+ latch.countDown();
+ }
+ // thats OK. Another Thread can not rebuild the barrier since we have the lock.
+ // we have to rebuid it here in case there was no thread waiting.
+ latch = new CountDownLatch(count);
+ }
+ }
+
+ private void rebuildBarrierIfBroken() {
+ synchronized (this) {
+ if (latch.getCount() == 0) {
+ latch = new CountDownLatch(count);
+ }
+ }
}
/**