summaryrefslogtreecommitdiffstats
path: root/src/com/jogamp/opencl/util/MultiQueueBarrier.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/com/jogamp/opencl/util/MultiQueueBarrier.java')
-rw-r--r--src/com/jogamp/opencl/util/MultiQueueBarrier.java141
1 files changed, 141 insertions, 0 deletions
diff --git a/src/com/jogamp/opencl/util/MultiQueueBarrier.java b/src/com/jogamp/opencl/util/MultiQueueBarrier.java
new file mode 100644
index 00000000..59398b5e
--- /dev/null
+++ b/src/com/jogamp/opencl/util/MultiQueueBarrier.java
@@ -0,0 +1,141 @@
+package com.jogamp.opencl.util;
+
+import com.jogamp.opencl.CLCommandQueue;
+import com.jogamp.opencl.CLEventList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An utility for synchronizing multiple concurrent {@link CLCommandQueue}s.
+ * This Barrier can be reused after it has been broken.
+ * @author Michael Bien
+ */
+public class MultiQueueBarrier {
+
+ private CountDownLatch latch;
+ private final Set<CLCommandQueue> queues;
+ private final int count;
+
+ /**
+ * Creates a new MultiQueueBarrier with the given queueCount.
+ * It is recommented to use {@link #MultiQueueBarrier(CLCommandQueue... allowedQueues)} if possible
+ * which restricts the set of allowed queues for the barrier.
+ */
+ public MultiQueueBarrier(int queueCount) {
+ if(queueCount == 0) {
+ throw new IllegalArgumentException("queueCount was 0");
+ }
+ this.latch = new CountDownLatch(queueCount);
+ this.queues = null;
+ this.count = queueCount;
+ }
+
+ /**
+ * Creates a new MultiQueueBarrier for the given queues.
+ */
+ public MultiQueueBarrier(CLCommandQueue... allowedQueues) {
+ if(allowedQueues.length == 0) {
+ throw new IllegalArgumentException("allowedQueues was empty");
+ }
+ this.latch = new CountDownLatch(allowedQueues.length);
+ this.count = allowedQueues.length;
+
+ HashSet<CLCommandQueue> set = new HashSet<CLCommandQueue>(allowedQueues.length);
+ for (CLCommandQueue queue : allowedQueues) {
+ set.add(queue);
+ }
+ this.queues = Collections.unmodifiableSet(set);
+ }
+
+ /**
+ * Blocks the current Thread until all commands on the {@link CLCommandQueue} finished excecution.
+ * This method may be invoked concurrently without synchronization on the MultiQueueBarrier object
+ * as long each Thread passes a distinct CLCommandQueue as parameter to this method.
+ */
+ public MultiQueueBarrier waitFor(CLCommandQueue queue) {
+ checkQueue(queue);
+
+ queue.putBarrier();
+ synchronized(this) {
+ latch.countDown();
+ }
+ return this;
+ }
+
+ /**
+ * Blocks the current Thread until the given events on the {@link CLCommandQueue} occurred.
+ * This method may be invoked concurrently without synchronization on the MultiQueueBarrier object
+ * as long each Thread passes a distinct CLCommandQueue as parameter to this method.
+ */
+ public MultiQueueBarrier waitFor(CLCommandQueue queue, CLEventList events) {
+ checkQueue(queue);
+
+ queue.putWaitForEvents(events, true);
+ synchronized(this) {
+ latch.countDown();
+ }
+ return this;
+ }
+
+ /**
+ * Blocks until all Threads which called {@link #waitFor}
+ * continue execution.
+ * This method blocks only once, all subsequent calls are ignored.
+ */
+ public MultiQueueBarrier await() throws InterruptedException {
+ latch.await();
+ rebuildBarrierIfBroken();
+ return this;
+ }
+
+ /**
+ * @see #await()
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the {@code timeout} argument
+ */
+ public MultiQueueBarrier await(long timeout, TimeUnit unit) throws InterruptedException {
+ latch.await(timeout, unit);
+ rebuildBarrierIfBroken();
+ return this;
+ }
+
+ /**
+ * Resets this barrier and unblocks all waiting threads.
+ */
+ public void resetBarrier() {
+ synchronized(this) {
+ while(latch.getCount() > 0) {
+ latch.countDown();
+ }
+ // thats OK. Another Thread can not rebuild the barrier since we have the lock.
+ // we have to rebuid it here in case there was no thread waiting.
+ latch = new CountDownLatch(count);
+ }
+ }
+
+ private void rebuildBarrierIfBroken() {
+ synchronized (this) {
+ if (latch.getCount() == 0) {
+ latch = new CountDownLatch(count);
+ }
+ }
+ }
+
+ /**
+ * Returns the current number of events which must occure before this barrier unblocks the waiting threads.
+ * This method is typically used for debugging and testing purposes.
+ */
+ public long getCount() {
+ return latch.getCount();
+ }
+
+ private void checkQueue(CLCommandQueue queue) throws IllegalArgumentException {
+ if (queues != null && !queues.contains(queue)) {
+ throw new IllegalArgumentException(queue + " is not in the allowedQueues Set: " + queues);
+ }
+ }
+
+}