diff options
author | Michael Bien <[email protected]> | 2011-05-04 22:39:18 +0200 |
---|---|---|
committer | Michael Bien <[email protected]> | 2011-05-04 22:39:18 +0200 |
commit | ba3c4f8e28235c1e0780a88d7cd087abfaddc61b (patch) | |
tree | 6a5f2fef4d3a0609c29b9510d6c8e2ec62f4b044 /src/com/jogamp/opencl/util/concurrent | |
parent | 3a20670487663cfbadea480de6e0322c3055afcf (diff) |
added finish action and several utility methods, WIP.
Diffstat (limited to 'src/com/jogamp/opencl/util/concurrent')
-rw-r--r-- | src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java | 76 |
1 files changed, 68 insertions, 8 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java index ef788d61..205f0393 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java +++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java @@ -20,20 +20,23 @@ import java.util.concurrent.ThreadFactory; /** * A multithreaded pool of OpenCL command queues. * It serves as a multiplexer distributing tasks over N queues. + * The usage of this pool is similar to {@link ExecutorService} but it uses {@link CLTask}s + * instead of {@link Callable}s. * @author Michael Bien */ public class CLCommandQueuePool implements CLResource { private final List<CLCommandQueue> queues; private final ExecutorService excecutor; + private FinishAction finishAction = FinishAction.DO_NOTHING; private CLCommandQueuePool(Collection<CLCommandQueue> queues) { this.queues = Collections.unmodifiableList(new ArrayList<CLCommandQueue>(queues)); this.excecutor = Executors.newFixedThreadPool(queues.size(), new QueueThreadFactory(this.queues)); } - public static CLCommandQueuePool create(CLMultiContext mc) { - return create(mc.getDevices()); + public static CLCommandQueuePool create(CLMultiContext mc, CLCommandQueue.Mode... modes) { + return create(mc.getDevices(), modes); } public static CLCommandQueuePool create(Collection<CLDevice> devices, CLCommandQueue.Mode... modes) { @@ -48,8 +51,16 @@ public class CLCommandQueuePool implements CLResource { return new CLCommandQueuePool(queues); } - public <T> Future<T> submit(CLTask<T> task) { - return excecutor.submit(new TaskWrapper(task)); + public <R> Future<R> submit(CLTask<R> task) { + return excecutor.submit(new TaskWrapper(task, finishAction)); + } + + public <R> List<Future<R>> invokeAll(Collection<CLTask<R>> tasks) throws InterruptedException { + List<TaskWrapper<R>> wrapper = new ArrayList<TaskWrapper<R>>(tasks.size()); + for (CLTask<R> task : tasks) { + wrapper.add(new TaskWrapper<R>(task, finishAction)); + } + return excecutor.invokeAll(wrapper); } /** @@ -77,15 +88,34 @@ public class CLCommandQueuePool implements CLResource { for (CLCommandQueue queue : queues) { queue.finish().release(); } + excecutor.shutdown(); } + /** + * Returns the command queues used in this pool. + */ public List<CLCommandQueue> getQueues() { return queues; } + /** + * Returns the size of this pool (number of command queues). + */ + public int getSize() { + return queues.size(); + } + + public FinishAction getFinishAction() { + return finishAction; + } + + public void setFinishAction(FinishAction action) { + this.finishAction = action; + } + @Override public String toString() { - return getClass().getSimpleName()+" [queues: "+queues.size()+"]"; + return getClass().getSimpleName()+" [queues: "+queues.size()+" on finish: "+finishAction+"]"; } private static class QueueThreadFactory implements ThreadFactory { @@ -115,16 +145,46 @@ public class CLCommandQueuePool implements CLResource { private static class TaskWrapper<T> implements Callable<T> { private final CLTask<T> task; + private final FinishAction mode; - public TaskWrapper(CLTask<T> task) { + public TaskWrapper(CLTask<T> task, FinishAction mode) { this.task = task; + this.mode = mode; } public T call() throws Exception { - QueueThread thread = (QueueThread) Thread.currentThread(); - return task.run(thread.queue); + CLCommandQueue queue = ((QueueThread)Thread.currentThread()).queue; + T result = task.run(queue); + if(mode.equals(FinishAction.FLUSH)) { + queue.flush(); + }else if(mode.equals(FinishAction.FINISH)) { + queue.finish(); + } + return result; } } + /** + * The action executed after a task completes. + */ + public enum FinishAction { + + /** + * Does nothing, the task is responsible to make sure all computations + * have finished when the task finishes + */ + DO_NOTHING, + + /** + * Flushes the queue on task completion. + */ + FLUSH, + + /** + * Finishes the queue on task completion. + */ + FINISH + } + } |