diff options
4 files changed, 59 insertions, 25 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java index a6bbe4d0..9ea960ae 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java +++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java @@ -18,10 +18,10 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; /** - * A multithreaded fixed size pool of OpenCL command queues. - * It serves as a multiplexer distributing tasks over N queues. + * A multithreaded, fixed size pool of OpenCL command queues. + * It serves as a multiplexer distributing tasks over N queues usually run on N devices. * The usage of this pool is similar to {@link ExecutorService} but it uses {@link CLTask}s - * instead of {@link Callable}s. + * instead of {@link Callable}s and provides a per-queue context for resource sharing across all tasks of one queue. * @author Michael Bien */ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource { @@ -73,13 +73,27 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource } /** + * Submits this task to the pool for execution returning its {@link Future}. * @see ExecutorService#submit(java.util.concurrent.Callable) */ - public <R> Future<R> submit(CLTask<? extends C, R> task) { + public <R> Future<R> submit(CLTask<? super C, R> task) { return excecutor.submit(new TaskWrapper(task, finishAction)); } /** + * Submits all tasks to the pool for execution and returns their {@link Future}. + * Calls {@link #submit(com.jogamp.opencl.util.concurrent.CLTask)} for every task. + */ + public <R> List<Future<R>> submitAll(Collection<? extends CLTask<? super C, R>> tasks) { + List<Future<R>> futures = new ArrayList<Future<R>>(tasks.size()); + for (CLTask<? super C, R> task : tasks) { + futures.add(submit(task)); + } + return futures; + } + + /** + * Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result. * @see ExecutorService#invokeAll(java.util.Collection) */ public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? super C, R>> tasks) throws InterruptedException { @@ -88,6 +102,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource } /** + * Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result. * @see ExecutorService#invokeAll(java.util.Collection, long, java.util.concurrent.TimeUnit) */ public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? super C, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException { @@ -109,6 +124,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource /** * Switches the context of all queues - this operation can be expensive. * Blocks until all tasks finish and sets up a new context for all queues. + * @return this */ public <C extends CLQueueContext> CLCommandQueuePool switchContext(CLQueueContextFactory<C> factory) { @@ -197,7 +213,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource public synchronized Thread newThread(Runnable runnable) { SecurityManager sm = System.getSecurityManager(); - ThreadGroup group = (sm != null)? sm.getThreadGroup() : Thread.currentThread().getThreadGroup(); + ThreadGroup group = (sm != null) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup(); CLQueueContext queue = context.get(index); QueueThread thread = new QueueThread(group, runnable, queue, index++); diff --git a/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java b/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java index 11b86889..3f89ad0e 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java +++ b/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java @@ -11,6 +11,13 @@ import com.jogamp.opencl.CLResource; import java.util.Map; /** + * Superclass for all per-queue contexts as used in {@link CLCommandQueuePool}s. + * A context will usually hold queue (and therefore often device) specific resources used + * in tasks of the same queue. + * <p> + * Possible candidates for those resources can be compiled CLPrograms, CLKernels + * or even pre allocated CLBuffers. + * </p> * @author Michael Bien */ public abstract class CLQueueContext implements CLResource { @@ -29,6 +36,10 @@ public abstract class CLQueueContext implements CLResource { return queue.getContext(); } + /** + * A simple queue context holding a precompiled program and its kernels. + * @author Michael Bien + */ public static class CLSimpleQueueContext extends CLQueueContext { public final CLProgram program; diff --git a/src/com/jogamp/opencl/util/concurrent/CLQueueContextFactory.java b/src/com/jogamp/opencl/util/concurrent/CLQueueContextFactory.java index 64fdfbcd..58f389bf 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLQueueContextFactory.java +++ b/src/com/jogamp/opencl/util/concurrent/CLQueueContextFactory.java @@ -5,9 +5,10 @@ package com.jogamp.opencl.util.concurrent; import com.jogamp.opencl.CLCommandQueue; import com.jogamp.opencl.CLProgram; +import com.jogamp.opencl.util.concurrent.CLQueueContext.CLSimpleQueueContext; /** - * + * Creates {@link CLQueueContext}s. * @author Michael Bien */ public abstract class CLQueueContextFactory<C extends CLQueueContext> { @@ -27,7 +28,11 @@ public abstract class CLQueueContextFactory<C extends CLQueueContext> { return new CLSimpleContextFactory(source); } - public static class CLSimpleContextFactory extends CLQueueContextFactory<CLQueueContext.CLSimpleQueueContext> { + /** + * Creates {@link CLSimpleQueueContext}s containing a precompiled program. + * @author Michael Bien + */ + public static class CLSimpleContextFactory extends CLQueueContextFactory<CLSimpleQueueContext> { private final String source; @@ -36,9 +41,9 @@ public abstract class CLQueueContextFactory<C extends CLQueueContext> { } @Override - public CLQueueContext.CLSimpleQueueContext setup(CLCommandQueue queue, CLQueueContext old) { + public CLSimpleQueueContext setup(CLCommandQueue queue, CLQueueContext old) { CLProgram program = queue.getContext().createProgram(source).build(queue.getDevice()); - return new CLQueueContext.CLSimpleQueueContext(queue, program); + return new CLSimpleQueueContext(queue, program); } } diff --git a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java index e5bcb1c5..81d34907 100644 --- a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java +++ b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java @@ -62,13 +62,13 @@ public class CLMultiContextTest { } private final static String programSource = - "kernel void increment(global int* array, int numElements) { \n" - + " int index = get_global_id(0); \n" - + " if (index >= numElements) { \n" - + " return; \n" - + " } \n" - + " array[index]++; \n" - + "} \n"; + "kernel void compute(global int* array, int numElements) { \n" + + " int index = get_global_id(0); \n" + + " if (index >= numElements) { \n" + + " return; \n" + + " } \n" + + " array[index]++; \n" + + "} \n"; private final class CLTestTask implements CLTask<CLSimpleQueueContext, Buffer> { @@ -82,7 +82,7 @@ public class CLMultiContextTest { CLCommandQueue queue = qc.getQueue(); CLContext context = qc.getCLContext(); - CLKernel kernel = qc.getKernel("increment"); + CLKernel kernel = qc.getKernel("compute"); CLBuffer<Buffer> buffer = null; try{ @@ -133,26 +133,28 @@ public class CLMultiContextTest { out.println("invoking "+tasks.size()+" tasks on "+pool.getSize()+" queues"); + // blocking invoke pool.invokeAll(tasks); checkBuffer(1, data); - + // submit blocking emediatly for (CLTestTask task : tasks) { pool.submit(task).get(); } checkBuffer(2, data); - - List<Future<Buffer>> futures = new ArrayList<Future<Buffer>>(taskCount); - for (CLTestTask task : tasks) { - futures.add(pool.submit(task)); - } + // submitAll using futures + List<Future<Buffer>> futures = pool.submitAll(tasks); for (Future<Buffer> future : futures) { future.get(); } checkBuffer(3, data); - -// pool.switchContext(factory); + + // switching contexts using different program + factory = CLQueueContextFactory.createSimple(programSource.replaceAll("\\+\\+", "--")); + pool.switchContext(factory); + pool.invokeAll(tasks); + checkBuffer(2, data); pool.release(); }finally{ |