aboutsummaryrefslogtreecommitdiffstats
path: root/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java')
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java95
1 files changed, 48 insertions, 47 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
index a1d376ab..dcd052cf 100644
--- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
+++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
@@ -31,23 +31,23 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
private FinishAction finishAction = FinishAction.DO_NOTHING;
private boolean released;
- private CLCommandQueuePool(CLQueueContextFactory<C> factory, Collection<CLCommandQueue> queues) {
+ private CLCommandQueuePool(final CLQueueContextFactory<C> factory, final Collection<CLCommandQueue> queues) {
this.contexts = initContexts(queues, factory);
initExecutor();
}
- private List<CLQueueContext> initContexts(Collection<CLCommandQueue> queues, CLQueueContextFactory<C> factory) {
- List<CLQueueContext> newContexts = new ArrayList<CLQueueContext>(queues.size());
-
+ private List<CLQueueContext> initContexts(final Collection<CLCommandQueue> queues, final CLQueueContextFactory<C> factory) {
+ final List<CLQueueContext> newContexts = new ArrayList<CLQueueContext>(queues.size());
+
int index = 0;
- for (CLCommandQueue queue : queues) {
-
+ for (final CLCommandQueue queue : queues) {
+
CLQueueContext old = null;
if(this.contexts != null && !this.contexts.isEmpty()) {
old = this.contexts.get(index++);
old.release();
}
-
+
newContexts.add(factory.setup(queue, old));
}
return newContexts;
@@ -57,19 +57,19 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
this.excecutor = Executors.newFixedThreadPool(contexts.size(), new QueueThreadFactory(contexts));
}
- public static <C extends CLQueueContext> CLCommandQueuePool<C> create(CLQueueContextFactory<C> factory, CLMultiContext mc, CLCommandQueue.Mode... modes) {
+ public static <C extends CLQueueContext> CLCommandQueuePool<C> create(final CLQueueContextFactory<C> factory, final CLMultiContext mc, final CLCommandQueue.Mode... modes) {
return create(factory, mc.getDevices(), modes);
}
- public static <C extends CLQueueContext> CLCommandQueuePool<C> create(CLQueueContextFactory<C> factory, Collection<CLDevice> devices, CLCommandQueue.Mode... modes) {
- List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(devices.size());
- for (CLDevice device : devices) {
+ public static <C extends CLQueueContext> CLCommandQueuePool<C> create(final CLQueueContextFactory<C> factory, final Collection<CLDevice> devices, final CLCommandQueue.Mode... modes) {
+ final List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(devices.size());
+ for (final CLDevice device : devices) {
queues.add(device.createCommandQueue(modes));
}
return create(factory, queues);
}
- public static <C extends CLQueueContext> CLCommandQueuePool<C> create(CLQueueContextFactory<C> factory, Collection<CLCommandQueue> queues) {
+ public static <C extends CLQueueContext> CLCommandQueuePool<C> create(final CLQueueContextFactory<C> factory, final Collection<CLCommandQueue> queues) {
return new CLCommandQueuePool<C>(factory, queues);
}
@@ -77,7 +77,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
* Submits this task to the pool for execution returning its {@link Future}.
* @see ExecutorService#submit(java.util.concurrent.Callable)
*/
- public <R> Future<R> submit(CLTask<? super C, R> task) {
+ public <R> Future<R> submit(final CLTask<? super C, R> task) {
return excecutor.submit(new TaskWrapper<C,R>(task, finishAction));
}
@@ -85,9 +85,9 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
* Submits all tasks to the pool for execution and returns their {@link Future}.
* Calls {@link #submit(com.jogamp.opencl.util.concurrent.CLTask)} for every task.
*/
- public <R> List<Future<R>> submitAll(Collection<? extends CLTask<? super C, R>> tasks) {
- List<Future<R>> futures = new ArrayList<Future<R>>(tasks.size());
- for (CLTask<? super C, R> task : tasks) {
+ public <R> List<Future<R>> submitAll(final Collection<? extends CLTask<? super C, R>> tasks) {
+ final List<Future<R>> futures = new ArrayList<Future<R>>(tasks.size());
+ for (final CLTask<? super C, R> task : tasks) {
futures.add(submit(task));
}
return futures;
@@ -95,10 +95,10 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
/**
* Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result.
- * @see ExecutorService#invokeAll(java.util.Collection)
+ * @see ExecutorService#invokeAll(java.util.Collection)
*/
- public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? super C, R>> tasks) throws InterruptedException {
- List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks);
+ public <R> List<Future<R>> invokeAll(final Collection<? extends CLTask<? super C, R>> tasks) throws InterruptedException {
+ final List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks);
return excecutor.invokeAll(wrapper);
}
@@ -106,14 +106,14 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
* Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result.
* @see ExecutorService#invokeAll(java.util.Collection, long, java.util.concurrent.TimeUnit)
*/
- public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? super C, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
- List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks);
+ public <R> List<Future<R>> invokeAll(final Collection<? extends CLTask<? super C, R>> tasks, final long timeout, final TimeUnit unit) throws InterruptedException {
+ final List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks);
return excecutor.invokeAll(wrapper, timeout, unit);
}
- private <R> List<TaskWrapper<C, R>> wrapTasks(Collection<? extends CLTask<? super C, R>> tasks) {
- List<TaskWrapper<C, R>> wrapper = new ArrayList<TaskWrapper<C, R>>(tasks.size());
- for (CLTask<? super C, R> task : tasks) {
+ private <R> List<TaskWrapper<C, R>> wrapTasks(final Collection<? extends CLTask<? super C, R>> tasks) {
+ final List<TaskWrapper<C, R>> wrapper = new ArrayList<TaskWrapper<C, R>>(tasks.size());
+ for (final CLTask<? super C, R> task : tasks) {
if(task == null) {
throw new NullPointerException("at least one task was null");
}
@@ -121,17 +121,17 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
}
return wrapper;
}
-
+
/**
* Switches the context of all queues - this operation can be expensive.
* Blocks until all tasks finish and sets up a new context for all queues.
* @return this
*/
- public CLCommandQueuePool<C> switchContext(CLQueueContextFactory<C> factory) {
-
+ public CLCommandQueuePool<C> switchContext(final CLQueueContextFactory<C> factory) {
+
excecutor.shutdown();
finishQueues(); // just to be sure
-
+
contexts = initContexts(getQueues(), factory);
initExecutor();
return this;
@@ -141,7 +141,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
* Calls {@link CLCommandQueue#flush()} on all queues.
*/
public void flushQueues() {
- for (CLQueueContext context : contexts) {
+ for (final CLQueueContext context : contexts) {
context.queue.flush();
}
}
@@ -150,7 +150,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
* Calls {@link CLCommandQueue#finish()} on all queues.
*/
public void finishQueues() {
- for (CLQueueContext context : contexts) {
+ for (final CLQueueContext context : contexts) {
context.queue.finish();
}
}
@@ -165,7 +165,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
}
released = true;
excecutor.shutdown();
- for (CLQueueContext context : contexts) {
+ for (final CLQueueContext context : contexts) {
context.queue.finish().release();
context.release();
}
@@ -175,8 +175,8 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
* Returns the command queues used in this pool.
*/
public List<CLCommandQueue> getQueues() {
- List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(contexts.size());
- for (CLQueueContext context : contexts) {
+ final List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(contexts.size());
+ for (final CLQueueContext context : contexts) {
queues.add(context.queue);
}
return queues;
@@ -202,7 +202,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
* Sets the action which is run after every completed task.
* This is mainly intended for debugging, default value is {@link FinishAction#DO_NOTHING}.
*/
- public void setFinishAction(FinishAction action) {
+ public void setFinishAction(final FinishAction action) {
this.finishAction = action;
}
@@ -216,28 +216,28 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
private final List<CLQueueContext> context;
private int index;
- private QueueThreadFactory(List<CLQueueContext> queues) {
+ private QueueThreadFactory(final List<CLQueueContext> queues) {
this.context = queues;
this.index = 0;
}
- public synchronized Thread newThread(Runnable runnable) {
+ public synchronized Thread newThread(final Runnable runnable) {
- SecurityManager sm = System.getSecurityManager();
- ThreadGroup group = (sm != null) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup();
+ final SecurityManager sm = System.getSecurityManager();
+ final ThreadGroup group = (sm != null) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup();
- CLQueueContext queue = context.get(index);
- QueueThread thread = new QueueThread(group, runnable, queue, index++);
+ final CLQueueContext queue = context.get(index);
+ final QueueThread thread = new QueueThread(group, runnable, queue, index++);
thread.setDaemon(true);
-
+
return thread;
}
}
-
+
private static class QueueThread extends Thread {
private final CLQueueContext context;
- public QueueThread(ThreadGroup group, Runnable runnable, CLQueueContext context, int index) {
+ public QueueThread(final ThreadGroup group, final Runnable runnable, final CLQueueContext context, final int index) {
super(group, runnable, "queue-worker-thread-"+index+"["+context+"]");
this.context = context;
}
@@ -247,17 +247,18 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
private final CLTask<? super C, R> task;
private final FinishAction mode;
-
- public TaskWrapper(CLTask<? super C, R> task, FinishAction mode) {
+
+ public TaskWrapper(final CLTask<? super C, R> task, final FinishAction mode) {
this.task = task;
this.mode = mode;
}
public R call() throws Exception {
- CLQueueContext context = ((QueueThread)Thread.currentThread()).context;
+ final CLQueueContext context = ((QueueThread)Thread.currentThread()).context;
// we make sure to only wrap tasks on the correct kind of thread, so this
// shouldn't fail (trying to genericize QueueThread properly becomes tricky)
@SuppressWarnings("unchecked")
+ final
R result = task.execute((C)context);
if(mode.equals(FinishAction.FLUSH)) {
context.queue.flush();
@@ -284,7 +285,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
* Flushes the queue on task completion.
*/
FLUSH,
-
+
/**
* Finishes the queue on task completion.
*/