diff options
Diffstat (limited to 'src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java')
-rw-r--r-- | src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java | 95 |
1 files changed, 48 insertions, 47 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java index a1d376ab..dcd052cf 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java +++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java @@ -31,23 +31,23 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource private FinishAction finishAction = FinishAction.DO_NOTHING; private boolean released; - private CLCommandQueuePool(CLQueueContextFactory<C> factory, Collection<CLCommandQueue> queues) { + private CLCommandQueuePool(final CLQueueContextFactory<C> factory, final Collection<CLCommandQueue> queues) { this.contexts = initContexts(queues, factory); initExecutor(); } - private List<CLQueueContext> initContexts(Collection<CLCommandQueue> queues, CLQueueContextFactory<C> factory) { - List<CLQueueContext> newContexts = new ArrayList<CLQueueContext>(queues.size()); - + private List<CLQueueContext> initContexts(final Collection<CLCommandQueue> queues, final CLQueueContextFactory<C> factory) { + final List<CLQueueContext> newContexts = new ArrayList<CLQueueContext>(queues.size()); + int index = 0; - for (CLCommandQueue queue : queues) { - + for (final CLCommandQueue queue : queues) { + CLQueueContext old = null; if(this.contexts != null && !this.contexts.isEmpty()) { old = this.contexts.get(index++); old.release(); } - + newContexts.add(factory.setup(queue, old)); } return newContexts; @@ -57,19 +57,19 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource this.excecutor = Executors.newFixedThreadPool(contexts.size(), new QueueThreadFactory(contexts)); } - public static <C extends CLQueueContext> CLCommandQueuePool<C> create(CLQueueContextFactory<C> factory, CLMultiContext mc, CLCommandQueue.Mode... modes) { + public static <C extends CLQueueContext> CLCommandQueuePool<C> create(final CLQueueContextFactory<C> factory, final CLMultiContext mc, final CLCommandQueue.Mode... modes) { return create(factory, mc.getDevices(), modes); } - public static <C extends CLQueueContext> CLCommandQueuePool<C> create(CLQueueContextFactory<C> factory, Collection<CLDevice> devices, CLCommandQueue.Mode... modes) { - List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(devices.size()); - for (CLDevice device : devices) { + public static <C extends CLQueueContext> CLCommandQueuePool<C> create(final CLQueueContextFactory<C> factory, final Collection<CLDevice> devices, final CLCommandQueue.Mode... modes) { + final List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(devices.size()); + for (final CLDevice device : devices) { queues.add(device.createCommandQueue(modes)); } return create(factory, queues); } - public static <C extends CLQueueContext> CLCommandQueuePool<C> create(CLQueueContextFactory<C> factory, Collection<CLCommandQueue> queues) { + public static <C extends CLQueueContext> CLCommandQueuePool<C> create(final CLQueueContextFactory<C> factory, final Collection<CLCommandQueue> queues) { return new CLCommandQueuePool<C>(factory, queues); } @@ -77,7 +77,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource * Submits this task to the pool for execution returning its {@link Future}. * @see ExecutorService#submit(java.util.concurrent.Callable) */ - public <R> Future<R> submit(CLTask<? super C, R> task) { + public <R> Future<R> submit(final CLTask<? super C, R> task) { return excecutor.submit(new TaskWrapper<C,R>(task, finishAction)); } @@ -85,9 +85,9 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource * Submits all tasks to the pool for execution and returns their {@link Future}. * Calls {@link #submit(com.jogamp.opencl.util.concurrent.CLTask)} for every task. */ - public <R> List<Future<R>> submitAll(Collection<? extends CLTask<? super C, R>> tasks) { - List<Future<R>> futures = new ArrayList<Future<R>>(tasks.size()); - for (CLTask<? super C, R> task : tasks) { + public <R> List<Future<R>> submitAll(final Collection<? extends CLTask<? super C, R>> tasks) { + final List<Future<R>> futures = new ArrayList<Future<R>>(tasks.size()); + for (final CLTask<? super C, R> task : tasks) { futures.add(submit(task)); } return futures; @@ -95,10 +95,10 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource /** * Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result. - * @see ExecutorService#invokeAll(java.util.Collection) + * @see ExecutorService#invokeAll(java.util.Collection) */ - public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? super C, R>> tasks) throws InterruptedException { - List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks); + public <R> List<Future<R>> invokeAll(final Collection<? extends CLTask<? super C, R>> tasks) throws InterruptedException { + final List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks); return excecutor.invokeAll(wrapper); } @@ -106,14 +106,14 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource * Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result. * @see ExecutorService#invokeAll(java.util.Collection, long, java.util.concurrent.TimeUnit) */ - public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? super C, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException { - List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks); + public <R> List<Future<R>> invokeAll(final Collection<? extends CLTask<? super C, R>> tasks, final long timeout, final TimeUnit unit) throws InterruptedException { + final List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks); return excecutor.invokeAll(wrapper, timeout, unit); } - private <R> List<TaskWrapper<C, R>> wrapTasks(Collection<? extends CLTask<? super C, R>> tasks) { - List<TaskWrapper<C, R>> wrapper = new ArrayList<TaskWrapper<C, R>>(tasks.size()); - for (CLTask<? super C, R> task : tasks) { + private <R> List<TaskWrapper<C, R>> wrapTasks(final Collection<? extends CLTask<? super C, R>> tasks) { + final List<TaskWrapper<C, R>> wrapper = new ArrayList<TaskWrapper<C, R>>(tasks.size()); + for (final CLTask<? super C, R> task : tasks) { if(task == null) { throw new NullPointerException("at least one task was null"); } @@ -121,17 +121,17 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource } return wrapper; } - + /** * Switches the context of all queues - this operation can be expensive. * Blocks until all tasks finish and sets up a new context for all queues. * @return this */ - public CLCommandQueuePool<C> switchContext(CLQueueContextFactory<C> factory) { - + public CLCommandQueuePool<C> switchContext(final CLQueueContextFactory<C> factory) { + excecutor.shutdown(); finishQueues(); // just to be sure - + contexts = initContexts(getQueues(), factory); initExecutor(); return this; @@ -141,7 +141,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource * Calls {@link CLCommandQueue#flush()} on all queues. */ public void flushQueues() { - for (CLQueueContext context : contexts) { + for (final CLQueueContext context : contexts) { context.queue.flush(); } } @@ -150,7 +150,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource * Calls {@link CLCommandQueue#finish()} on all queues. */ public void finishQueues() { - for (CLQueueContext context : contexts) { + for (final CLQueueContext context : contexts) { context.queue.finish(); } } @@ -165,7 +165,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource } released = true; excecutor.shutdown(); - for (CLQueueContext context : contexts) { + for (final CLQueueContext context : contexts) { context.queue.finish().release(); context.release(); } @@ -175,8 +175,8 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource * Returns the command queues used in this pool. */ public List<CLCommandQueue> getQueues() { - List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(contexts.size()); - for (CLQueueContext context : contexts) { + final List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(contexts.size()); + for (final CLQueueContext context : contexts) { queues.add(context.queue); } return queues; @@ -202,7 +202,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource * Sets the action which is run after every completed task. * This is mainly intended for debugging, default value is {@link FinishAction#DO_NOTHING}. */ - public void setFinishAction(FinishAction action) { + public void setFinishAction(final FinishAction action) { this.finishAction = action; } @@ -216,28 +216,28 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource private final List<CLQueueContext> context; private int index; - private QueueThreadFactory(List<CLQueueContext> queues) { + private QueueThreadFactory(final List<CLQueueContext> queues) { this.context = queues; this.index = 0; } - public synchronized Thread newThread(Runnable runnable) { + public synchronized Thread newThread(final Runnable runnable) { - SecurityManager sm = System.getSecurityManager(); - ThreadGroup group = (sm != null) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup(); + final SecurityManager sm = System.getSecurityManager(); + final ThreadGroup group = (sm != null) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup(); - CLQueueContext queue = context.get(index); - QueueThread thread = new QueueThread(group, runnable, queue, index++); + final CLQueueContext queue = context.get(index); + final QueueThread thread = new QueueThread(group, runnable, queue, index++); thread.setDaemon(true); - + return thread; } } - + private static class QueueThread extends Thread { private final CLQueueContext context; - public QueueThread(ThreadGroup group, Runnable runnable, CLQueueContext context, int index) { + public QueueThread(final ThreadGroup group, final Runnable runnable, final CLQueueContext context, final int index) { super(group, runnable, "queue-worker-thread-"+index+"["+context+"]"); this.context = context; } @@ -247,17 +247,18 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource private final CLTask<? super C, R> task; private final FinishAction mode; - - public TaskWrapper(CLTask<? super C, R> task, FinishAction mode) { + + public TaskWrapper(final CLTask<? super C, R> task, final FinishAction mode) { this.task = task; this.mode = mode; } public R call() throws Exception { - CLQueueContext context = ((QueueThread)Thread.currentThread()).context; + final CLQueueContext context = ((QueueThread)Thread.currentThread()).context; // we make sure to only wrap tasks on the correct kind of thread, so this // shouldn't fail (trying to genericize QueueThread properly becomes tricky) @SuppressWarnings("unchecked") + final R result = task.execute((C)context); if(mode.equals(FinishAction.FLUSH)) { context.queue.flush(); @@ -284,7 +285,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource * Flushes the queue on task completion. */ FLUSH, - + /** * Finishes the queue on task completion. */ |