diff options
author | Michael Bien <[email protected]> | 2011-06-20 22:35:03 +0200 |
---|---|---|
committer | Michael Bien <[email protected]> | 2011-06-20 22:35:03 +0200 |
commit | 16c4aaeb9002f9d79732648c1a15e9750c9de35c (patch) | |
tree | f88e9d3337be414c5feb39f1b291c4b469cf265c | |
parent | a256925d0f589e387bd6370a7b4c5ab7c8b0b01e (diff) |
CLCommandQueuePool support for invokeAny(tasks).
-rw-r--r-- | src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java | 24 | ||||
-rw-r--r-- | test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java | 28 |
2 files changed, 48 insertions, 4 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java index e8bd0124..31aeec47 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java +++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java @@ -11,11 +11,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * A multithreaded, fixed size pool of OpenCL command queues. @@ -111,6 +113,26 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource return excecutor.invokeAll(wrapper, timeout, unit); } + /** + * Submits all tasks for immediate execution (blocking) until a result can be returned. + * All other unfinished but started tasks are cancelled. + * @see ExecutorService#invokeAny(java.util.Collection) + */ + public <R> R invokeAny(Collection<? extends CLTask<? super C, R>> tasks) throws InterruptedException, ExecutionException { + List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks); + return excecutor.invokeAny(wrapper); + } + + /** + * Submits all tasks for immediate execution (blocking) until a result can be returned. + * All other unfinished but started tasks are cancelled. + * @see ExecutorService#invokeAny(java.util.Collection, long, java.util.concurrent.TimeUnit) + */ + public <R> R invokeAny(Collection<? extends CLTask<? super C, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks); + return excecutor.invokeAny(wrapper, timeout, unit); + } + private <R> List<TaskWrapper<C, R>> wrapTasks(Collection<? extends CLTask<? super C, R>> tasks) { List<TaskWrapper<C, R>> wrapper = new ArrayList<TaskWrapper<C, R>>(tasks.size()); for (CLTask<? super C, R> task : tasks) { @@ -221,6 +243,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource this.index = 0; } + @Override public synchronized Thread newThread(Runnable runnable) { SecurityManager sm = System.getSecurityManager(); @@ -253,6 +276,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource this.mode = mode; } + @Override public R call() throws Exception { CLQueueContext context = ((QueueThread)Thread.currentThread()).context; R result = task.execute((C)context); diff --git a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java index 81d34907..68ca33f9 100644 --- a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java +++ b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java @@ -78,6 +78,7 @@ public class CLMultiContextTest { this.data = buffer; } + @Override public Buffer execute(CLSimpleQueueContext qc) { CLCommandQueue queue = qc.getQueue(); @@ -134,27 +135,36 @@ public class CLMultiContextTest { out.println("invoking "+tasks.size()+" tasks on "+pool.getSize()+" queues"); // blocking invoke - pool.invokeAll(tasks); + List<Future<Buffer>> results = pool.invokeAll(tasks); + assertNotNull(results); checkBuffer(1, data); // submit blocking emediatly for (CLTestTask task : tasks) { - pool.submit(task).get(); + Buffer ret = pool.submit(task).get(); + assertNotNull(ret); } checkBuffer(2, data); // submitAll using futures List<Future<Buffer>> futures = pool.submitAll(tasks); for (Future<Buffer> future : futures) { - future.get(); + Buffer ret = future.get(); + assertNotNull(ret); } checkBuffer(3, data); // switching contexts using different program factory = CLQueueContextFactory.createSimple(programSource.replaceAll("\\+\\+", "--")); pool.switchContext(factory); - pool.invokeAll(tasks); + List<Future<Buffer>> results2 = pool.invokeAll(tasks); + assertNotNull(results2); checkBuffer(2, data); + + // submit any + Buffer buffer = pool.invokeAny(tasks); + assertNotNull(buffer); + checkContains(1, data); pool.release(); }finally{ @@ -169,4 +179,14 @@ public class CLMultiContextTest { data.rewind(); } + private void checkContains(int expected, IntBuffer data) { + while(data.hasRemaining()) { + if(expected == data.get()){ + data.rewind(); + return; + } + } + fail(); + } + } |