summaryrefslogtreecommitdiffstats
path: root/src/com
diff options
context:
space:
mode:
Diffstat (limited to 'src/com')
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java76
1 files changed, 68 insertions, 8 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
index ef788d61..205f0393 100644
--- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
+++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
@@ -20,20 +20,23 @@ import java.util.concurrent.ThreadFactory;
/**
* A multithreaded pool of OpenCL command queues.
* It serves as a multiplexer distributing tasks over N queues.
+ * The usage of this pool is similar to {@link ExecutorService} but it uses {@link CLTask}s
+ * instead of {@link Callable}s.
* @author Michael Bien
*/
public class CLCommandQueuePool implements CLResource {
private final List<CLCommandQueue> queues;
private final ExecutorService excecutor;
+ private FinishAction finishAction = FinishAction.DO_NOTHING;
private CLCommandQueuePool(Collection<CLCommandQueue> queues) {
this.queues = Collections.unmodifiableList(new ArrayList<CLCommandQueue>(queues));
this.excecutor = Executors.newFixedThreadPool(queues.size(), new QueueThreadFactory(this.queues));
}
- public static CLCommandQueuePool create(CLMultiContext mc) {
- return create(mc.getDevices());
+ public static CLCommandQueuePool create(CLMultiContext mc, CLCommandQueue.Mode... modes) {
+ return create(mc.getDevices(), modes);
}
public static CLCommandQueuePool create(Collection<CLDevice> devices, CLCommandQueue.Mode... modes) {
@@ -48,8 +51,16 @@ public class CLCommandQueuePool implements CLResource {
return new CLCommandQueuePool(queues);
}
- public <T> Future<T> submit(CLTask<T> task) {
- return excecutor.submit(new TaskWrapper(task));
+ public <R> Future<R> submit(CLTask<R> task) {
+ return excecutor.submit(new TaskWrapper(task, finishAction));
+ }
+
+ public <R> List<Future<R>> invokeAll(Collection<CLTask<R>> tasks) throws InterruptedException {
+ List<TaskWrapper<R>> wrapper = new ArrayList<TaskWrapper<R>>(tasks.size());
+ for (CLTask<R> task : tasks) {
+ wrapper.add(new TaskWrapper<R>(task, finishAction));
+ }
+ return excecutor.invokeAll(wrapper);
}
/**
@@ -77,15 +88,34 @@ public class CLCommandQueuePool implements CLResource {
for (CLCommandQueue queue : queues) {
queue.finish().release();
}
+ excecutor.shutdown();
}
+ /**
+ * Returns the command queues used in this pool.
+ */
public List<CLCommandQueue> getQueues() {
return queues;
}
+ /**
+ * Returns the size of this pool (number of command queues).
+ */
+ public int getSize() {
+ return queues.size();
+ }
+
+ public FinishAction getFinishAction() {
+ return finishAction;
+ }
+
+ public void setFinishAction(FinishAction action) {
+ this.finishAction = action;
+ }
+
@Override
public String toString() {
- return getClass().getSimpleName()+" [queues: "+queues.size()+"]";
+ return getClass().getSimpleName()+" [queues: "+queues.size()+" on finish: "+finishAction+"]";
}
private static class QueueThreadFactory implements ThreadFactory {
@@ -115,16 +145,46 @@ public class CLCommandQueuePool implements CLResource {
private static class TaskWrapper<T> implements Callable<T> {
private final CLTask<T> task;
+ private final FinishAction mode;
- public TaskWrapper(CLTask<T> task) {
+ public TaskWrapper(CLTask<T> task, FinishAction mode) {
this.task = task;
+ this.mode = mode;
}
public T call() throws Exception {
- QueueThread thread = (QueueThread) Thread.currentThread();
- return task.run(thread.queue);
+ CLCommandQueue queue = ((QueueThread)Thread.currentThread()).queue;
+ T result = task.run(queue);
+ if(mode.equals(FinishAction.FLUSH)) {
+ queue.flush();
+ }else if(mode.equals(FinishAction.FINISH)) {
+ queue.finish();
+ }
+ return result;
}
}
+ /**
+ * The action executed after a task completes.
+ */
+ public enum FinishAction {
+
+ /**
+ * Does nothing, the task is responsible to make sure all computations
+ * have finished when the task finishes
+ */
+ DO_NOTHING,
+
+ /**
+ * Flushes the queue on task completion.
+ */
+ FLUSH,
+
+ /**
+ * Finishes the queue on task completion.
+ */
+ FINISH
+ }
+
}