diff options
Diffstat (limited to 'src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java')
-rw-r--r-- | src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java | 82 |
1 files changed, 47 insertions, 35 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java index 92828e95..ee6dc86b 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java +++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java @@ -18,7 +18,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; /** - * A multithreaded pool of OpenCL command queues. + * A multithreaded fixed size pool of OpenCL command queues. * It serves as a multiplexer distributing tasks over N queues. * The usage of this pool is similar to {@link ExecutorService} but it uses {@link CLTask}s * instead of {@link Callable}s. @@ -26,29 +26,37 @@ import java.util.concurrent.ThreadFactory; */ public class CLCommandQueuePool implements CLResource { - private final List<CLCommandQueue> queues; - private final ExecutorService excecutor; + private final List<CLQueueContext> contexts; + private ExecutorService excecutor; private FinishAction finishAction = FinishAction.DO_NOTHING; - private CLCommandQueuePool(Collection<CLCommandQueue> queues) { - this.queues = Collections.unmodifiableList(new ArrayList<CLCommandQueue>(queues)); - this.excecutor = Executors.newFixedThreadPool(queues.size(), new QueueThreadFactory(this.queues)); + private CLCommandQueuePool(Collection<CLQueueContext> contexts) { + this.contexts = Collections.unmodifiableList(new ArrayList<CLQueueContext>(contexts)); + initExecutor(); } - public static CLCommandQueuePool create(CLMultiContext mc, CLCommandQueue.Mode... modes) { - return create(mc.getDevices(), modes); + private void initExecutor() { + this.excecutor = Executors.newFixedThreadPool(contexts.size(), new QueueThreadFactory(contexts)); } - public static CLCommandQueuePool create(Collection<CLDevice> devices, CLCommandQueue.Mode... modes) { + public static CLCommandQueuePool create(CLQueueContextFactory factory, CLMultiContext mc, CLCommandQueue.Mode... modes) { + return create(factory, mc.getDevices(), modes); + } + + public static CLCommandQueuePool create(CLQueueContextFactory factory, Collection<CLDevice> devices, CLCommandQueue.Mode... modes) { List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(devices.size()); for (CLDevice device : devices) { queues.add(device.createCommandQueue(modes)); } - return create(queues); + return create(factory, queues); } - public static CLCommandQueuePool create(Collection<CLCommandQueue> queues) { - return new CLCommandQueuePool(queues); + public static CLCommandQueuePool create(CLQueueContextFactory factory, Collection<CLCommandQueue> queues) { + List<CLQueueContext> contexts = new ArrayList<CLQueueContext>(queues.size()); + for (CLCommandQueue queue : queues) { + contexts.add(factory.setup(queue, null)); + } + return new CLCommandQueuePool(contexts); } public <R> Future<R> submit(CLTask<R> task) { @@ -66,18 +74,18 @@ public class CLCommandQueuePool implements CLResource { /** * Calls {@link CLCommandQueue#flush()} on all queues. */ - public void flush() { - for (CLCommandQueue queue : queues) { - queue.flush(); + public void flushQueues() { + for (CLQueueContext context : contexts) { + context.queue.flush(); } } /** * Calls {@link CLCommandQueue#finish()} on all queues. */ - public void finish() { - for (CLCommandQueue queue : queues) { - queue.finish(); + public void finishQueues() { + for (CLQueueContext context : contexts) { + context.queue.finish(); } } @@ -85,16 +93,20 @@ public class CLCommandQueuePool implements CLResource { * Releases all queues. */ public void release() { - for (CLCommandQueue queue : queues) { - queue.finish().release(); - } excecutor.shutdown(); + for (CLQueueContext context : contexts) { + context.queue.finish().release(); + } } /** * Returns the command queues used in this pool. */ public List<CLCommandQueue> getQueues() { + List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(contexts.size()); + for (CLQueueContext context : contexts) { + queues.add(context.queue); + } return queues; } @@ -102,7 +114,7 @@ public class CLCommandQueuePool implements CLResource { * Returns the size of this pool (number of command queues). */ public int getSize() { - return queues.size(); + return contexts.size(); } public FinishAction getFinishAction() { @@ -115,31 +127,31 @@ public class CLCommandQueuePool implements CLResource { @Override public String toString() { - return getClass().getSimpleName()+" [queues: "+queues.size()+" on finish: "+finishAction+"]"; + return getClass().getSimpleName()+" [queues: "+contexts.size()+" on finish: "+finishAction+"]"; } private static class QueueThreadFactory implements ThreadFactory { - private final List<CLCommandQueue> queues; + private final List<CLQueueContext> context; private int index; - private QueueThreadFactory(List<CLCommandQueue> queues) { - this.queues = queues; + private QueueThreadFactory(List<CLQueueContext> queues) { + this.context = queues; this.index = 0; } public synchronized Thread newThread(Runnable r) { - CLCommandQueue queue = queues.get(index); + CLQueueContext queue = context.get(index); return new QueueThread(queue, index++); } } private static class QueueThread extends Thread { - private final CLCommandQueue queue; - public QueueThread(CLCommandQueue queue, int index) { - super("queue-worker-thread-"+index+"["+queue+"]"); - this.queue = queue; + private final CLQueueContext context; + public QueueThread(CLQueueContext context, int index) { + super("queue-worker-thread-"+index+"["+context+"]"); + this.context = context; this.setDaemon(true); } } @@ -155,12 +167,12 @@ public class CLCommandQueuePool implements CLResource { } public T call() throws Exception { - CLCommandQueue queue = ((QueueThread)Thread.currentThread()).queue; - T result = task.run(queue); + CLQueueContext context = ((QueueThread)Thread.currentThread()).context; + T result = task.run(context); if(mode.equals(FinishAction.FLUSH)) { - queue.flush(); + context.queue.flush(); }else if(mode.equals(FinishAction.FINISH)) { - queue.finish(); + context.queue.finish(); } return result; } |