summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Bien <[email protected]>2011-06-20 22:35:03 +0200
committerMichael Bien <[email protected]>2011-06-20 22:35:03 +0200
commit16c4aaeb9002f9d79732648c1a15e9750c9de35c (patch)
treef88e9d3337be414c5feb39f1b291c4b469cf265c
parenta256925d0f589e387bd6370a7b4c5ab7c8b0b01e (diff)
CLCommandQueuePool support for invokeAny(tasks).
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java24
-rw-r--r--test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java28
2 files changed, 48 insertions, 4 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
index e8bd0124..31aeec47 100644
--- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
+++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
@@ -11,11 +11,13 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* A multithreaded, fixed size pool of OpenCL command queues.
@@ -111,6 +113,26 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
return excecutor.invokeAll(wrapper, timeout, unit);
}
+ /**
+ * Submits all tasks for immediate execution (blocking) until a result can be returned.
+ * All other unfinished but started tasks are cancelled.
+ * @see ExecutorService#invokeAny(java.util.Collection)
+ */
+ public <R> R invokeAny(Collection<? extends CLTask<? super C, R>> tasks) throws InterruptedException, ExecutionException {
+ List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks);
+ return excecutor.invokeAny(wrapper);
+ }
+
+ /**
+ * Submits all tasks for immediate execution (blocking) until a result can be returned.
+ * All other unfinished but started tasks are cancelled.
+ * @see ExecutorService#invokeAny(java.util.Collection, long, java.util.concurrent.TimeUnit)
+ */
+ public <R> R invokeAny(Collection<? extends CLTask<? super C, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks);
+ return excecutor.invokeAny(wrapper, timeout, unit);
+ }
+
private <R> List<TaskWrapper<C, R>> wrapTasks(Collection<? extends CLTask<? super C, R>> tasks) {
List<TaskWrapper<C, R>> wrapper = new ArrayList<TaskWrapper<C, R>>(tasks.size());
for (CLTask<? super C, R> task : tasks) {
@@ -221,6 +243,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
this.index = 0;
}
+ @Override
public synchronized Thread newThread(Runnable runnable) {
SecurityManager sm = System.getSecurityManager();
@@ -253,6 +276,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
this.mode = mode;
}
+ @Override
public R call() throws Exception {
CLQueueContext context = ((QueueThread)Thread.currentThread()).context;
R result = task.execute((C)context);
diff --git a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java
index 81d34907..68ca33f9 100644
--- a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java
+++ b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java
@@ -78,6 +78,7 @@ public class CLMultiContextTest {
this.data = buffer;
}
+ @Override
public Buffer execute(CLSimpleQueueContext qc) {
CLCommandQueue queue = qc.getQueue();
@@ -134,27 +135,36 @@ public class CLMultiContextTest {
out.println("invoking "+tasks.size()+" tasks on "+pool.getSize()+" queues");
// blocking invoke
- pool.invokeAll(tasks);
+ List<Future<Buffer>> results = pool.invokeAll(tasks);
+ assertNotNull(results);
checkBuffer(1, data);
// submit blocking emediatly
for (CLTestTask task : tasks) {
- pool.submit(task).get();
+ Buffer ret = pool.submit(task).get();
+ assertNotNull(ret);
}
checkBuffer(2, data);
// submitAll using futures
List<Future<Buffer>> futures = pool.submitAll(tasks);
for (Future<Buffer> future : futures) {
- future.get();
+ Buffer ret = future.get();
+ assertNotNull(ret);
}
checkBuffer(3, data);
// switching contexts using different program
factory = CLQueueContextFactory.createSimple(programSource.replaceAll("\\+\\+", "--"));
pool.switchContext(factory);
- pool.invokeAll(tasks);
+ List<Future<Buffer>> results2 = pool.invokeAll(tasks);
+ assertNotNull(results2);
checkBuffer(2, data);
+
+ // submit any
+ Buffer buffer = pool.invokeAny(tasks);
+ assertNotNull(buffer);
+ checkContains(1, data);
pool.release();
}finally{
@@ -169,4 +179,14 @@ public class CLMultiContextTest {
data.rewind();
}
+ private void checkContains(int expected, IntBuffer data) {
+ while(data.hasRemaining()) {
+ if(expected == data.get()){
+ data.rewind();
+ return;
+ }
+ }
+ fail();
+ }
+
}