summaryrefslogtreecommitdiffstats
path: root/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java')
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java82
1 files changed, 47 insertions, 35 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
index 92828e95..ee6dc86b 100644
--- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
+++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
@@ -18,7 +18,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
/**
- * A multithreaded pool of OpenCL command queues.
+ * A multithreaded fixed size pool of OpenCL command queues.
* It serves as a multiplexer distributing tasks over N queues.
* The usage of this pool is similar to {@link ExecutorService} but it uses {@link CLTask}s
* instead of {@link Callable}s.
@@ -26,29 +26,37 @@ import java.util.concurrent.ThreadFactory;
*/
public class CLCommandQueuePool implements CLResource {
- private final List<CLCommandQueue> queues;
- private final ExecutorService excecutor;
+ private final List<CLQueueContext> contexts;
+ private ExecutorService excecutor;
private FinishAction finishAction = FinishAction.DO_NOTHING;
- private CLCommandQueuePool(Collection<CLCommandQueue> queues) {
- this.queues = Collections.unmodifiableList(new ArrayList<CLCommandQueue>(queues));
- this.excecutor = Executors.newFixedThreadPool(queues.size(), new QueueThreadFactory(this.queues));
+ private CLCommandQueuePool(Collection<CLQueueContext> contexts) {
+ this.contexts = Collections.unmodifiableList(new ArrayList<CLQueueContext>(contexts));
+ initExecutor();
}
- public static CLCommandQueuePool create(CLMultiContext mc, CLCommandQueue.Mode... modes) {
- return create(mc.getDevices(), modes);
+ private void initExecutor() {
+ this.excecutor = Executors.newFixedThreadPool(contexts.size(), new QueueThreadFactory(contexts));
}
- public static CLCommandQueuePool create(Collection<CLDevice> devices, CLCommandQueue.Mode... modes) {
+ public static CLCommandQueuePool create(CLQueueContextFactory factory, CLMultiContext mc, CLCommandQueue.Mode... modes) {
+ return create(factory, mc.getDevices(), modes);
+ }
+
+ public static CLCommandQueuePool create(CLQueueContextFactory factory, Collection<CLDevice> devices, CLCommandQueue.Mode... modes) {
List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(devices.size());
for (CLDevice device : devices) {
queues.add(device.createCommandQueue(modes));
}
- return create(queues);
+ return create(factory, queues);
}
- public static CLCommandQueuePool create(Collection<CLCommandQueue> queues) {
- return new CLCommandQueuePool(queues);
+ public static CLCommandQueuePool create(CLQueueContextFactory factory, Collection<CLCommandQueue> queues) {
+ List<CLQueueContext> contexts = new ArrayList<CLQueueContext>(queues.size());
+ for (CLCommandQueue queue : queues) {
+ contexts.add(factory.setup(queue, null));
+ }
+ return new CLCommandQueuePool(contexts);
}
public <R> Future<R> submit(CLTask<R> task) {
@@ -66,18 +74,18 @@ public class CLCommandQueuePool implements CLResource {
/**
* Calls {@link CLCommandQueue#flush()} on all queues.
*/
- public void flush() {
- for (CLCommandQueue queue : queues) {
- queue.flush();
+ public void flushQueues() {
+ for (CLQueueContext context : contexts) {
+ context.queue.flush();
}
}
/**
* Calls {@link CLCommandQueue#finish()} on all queues.
*/
- public void finish() {
- for (CLCommandQueue queue : queues) {
- queue.finish();
+ public void finishQueues() {
+ for (CLQueueContext context : contexts) {
+ context.queue.finish();
}
}
@@ -85,16 +93,20 @@ public class CLCommandQueuePool implements CLResource {
* Releases all queues.
*/
public void release() {
- for (CLCommandQueue queue : queues) {
- queue.finish().release();
- }
excecutor.shutdown();
+ for (CLQueueContext context : contexts) {
+ context.queue.finish().release();
+ }
}
/**
* Returns the command queues used in this pool.
*/
public List<CLCommandQueue> getQueues() {
+ List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(contexts.size());
+ for (CLQueueContext context : contexts) {
+ queues.add(context.queue);
+ }
return queues;
}
@@ -102,7 +114,7 @@ public class CLCommandQueuePool implements CLResource {
* Returns the size of this pool (number of command queues).
*/
public int getSize() {
- return queues.size();
+ return contexts.size();
}
public FinishAction getFinishAction() {
@@ -115,31 +127,31 @@ public class CLCommandQueuePool implements CLResource {
@Override
public String toString() {
- return getClass().getSimpleName()+" [queues: "+queues.size()+" on finish: "+finishAction+"]";
+ return getClass().getSimpleName()+" [queues: "+contexts.size()+" on finish: "+finishAction+"]";
}
private static class QueueThreadFactory implements ThreadFactory {
- private final List<CLCommandQueue> queues;
+ private final List<CLQueueContext> context;
private int index;
- private QueueThreadFactory(List<CLCommandQueue> queues) {
- this.queues = queues;
+ private QueueThreadFactory(List<CLQueueContext> queues) {
+ this.context = queues;
this.index = 0;
}
public synchronized Thread newThread(Runnable r) {
- CLCommandQueue queue = queues.get(index);
+ CLQueueContext queue = context.get(index);
return new QueueThread(queue, index++);
}
}
private static class QueueThread extends Thread {
- private final CLCommandQueue queue;
- public QueueThread(CLCommandQueue queue, int index) {
- super("queue-worker-thread-"+index+"["+queue+"]");
- this.queue = queue;
+ private final CLQueueContext context;
+ public QueueThread(CLQueueContext context, int index) {
+ super("queue-worker-thread-"+index+"["+context+"]");
+ this.context = context;
this.setDaemon(true);
}
}
@@ -155,12 +167,12 @@ public class CLCommandQueuePool implements CLResource {
}
public T call() throws Exception {
- CLCommandQueue queue = ((QueueThread)Thread.currentThread()).queue;
- T result = task.run(queue);
+ CLQueueContext context = ((QueueThread)Thread.currentThread()).context;
+ T result = task.run(context);
if(mode.equals(FinishAction.FLUSH)) {
- queue.flush();
+ context.queue.flush();
}else if(mode.equals(FinishAction.FINISH)) {
- queue.finish();
+ context.queue.finish();
}
return result;
}