diff options
author | Michael Bien <[email protected]> | 2011-07-09 00:43:29 +0200 |
---|---|---|
committer | Michael Bien <[email protected]> | 2011-07-09 00:43:29 +0200 |
commit | 519cfb8a41e28e4d10e40496893a9aacf0bce6b1 (patch) | |
tree | a3b7812f29ab283afade071f6b8cd3ec42bb8709 /src | |
parent | 4fe7110357d2631960e23861a3221489d313c467 (diff) |
changed impl to use an extended ThreadPoolExecutor directly.
Diffstat (limited to 'src')
-rw-r--r-- | src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java | 78 |
1 files changed, 73 insertions, 5 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java index e601cc7d..eac3dc13 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java +++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java @@ -10,18 +10,22 @@ import com.jogamp.opencl.util.CLMultiContext; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * A multithreaded, fixed size pool of OpenCL command queues. - * It serves as a multiplexer distributing tasks over N queues usually run on N devices. + * CLCommandQueuePool serves as a multiplexer distributing tasks over N queues usually connected to N devices. * The usage of this pool is similar to {@link ExecutorService} but it uses {@link CLTask}s * instead of {@link Callable}s and provides a per-queue context for resource sharing across all tasks of one queue. * @author Michael Bien @@ -29,7 +33,7 @@ import java.util.concurrent.TimeoutException; public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource { private List<CLQueueContext> contexts; - private ExecutorService excecutor; + private ThreadPoolExecutor excecutor; private FinishAction finishAction = FinishAction.DO_NOTHING; private boolean released; @@ -56,7 +60,10 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource } private void initExecutor() { - this.excecutor = Executors.newFixedThreadPool(contexts.size(), new QueueThreadFactory(contexts)); + BlockingQueue<Runnable> queue = new LinkedBlockingDeque<Runnable>(); + QueueThreadFactory factory = new QueueThreadFactory(contexts); + int size = contexts.size(); + this.excecutor = new CLThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, queue, factory); } public static <C extends CLQueueContext> CLCommandQueuePool<C> create(CLQueueContextFactory<C> factory, CLMultiContext mc, CLCommandQueue.Mode... modes) { @@ -123,6 +130,10 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource return excecutor.invokeAny(wrapper); } + /*public*/ CLTask<? super C, ?> takeCLTask() throws InterruptedException { + return ((CLFutureTask<? super C, ?>)excecutor.getQueue().take()).getCLTask(); + } + /** * Submits all tasks for immediate execution (blocking) until a result can be returned. * All other unfinished but started tasks are cancelled. @@ -222,14 +233,42 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource /** * Returns the size of this pool (number of command queues). */ - public int getSize() { + public int getPoolSize() { return contexts.size(); } + /** + * Returns the action which is executed when a task finishes. + */ public FinishAction getFinishAction() { return finishAction; } + /** + * Returns the approximate total number of tasks that have ever been scheduled for execution. + * Because the states of tasks and threads may change dynamically during computation, the returned + * value is only an approximation. + */ + public long getTaskCount() { + return excecutor.getTaskCount(); + } + + /** + * Returns the approximate total number of tasks that have completed execution. + * Because the states of tasks and threads may change dynamically during computation, + * the returned value is only an approximation, but one that does not ever decrease across successive calls. + */ + public long getCompletedTaskCount() { + return excecutor.getCompletedTaskCount(); + } + + /** + * Returns the approximate number of queues that are actively executing tasks. + */ + public int getActiveCount() { + return excecutor.getActiveCount(); + } + @Override public boolean isReleased() { return released; @@ -305,6 +344,35 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource } + private static class CLFutureTask<C extends CLQueueContext, R> extends FutureTask<R> { + + private final TaskWrapper<C, R> wrapper; + + public CLFutureTask(TaskWrapper<C, R> wrapper) { + super(wrapper); + this.wrapper = wrapper; + } + + public CLTask<? super C, R> getCLTask() { + return wrapper.task; + } + + } + + private static class CLThreadPoolExecutor extends ThreadPoolExecutor { + + public CLThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + } + + @Override + protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { + TaskWrapper<CLQueueContext, T> wrapper = (TaskWrapper<CLQueueContext, T>)callable; + return new CLFutureTask<CLQueueContext, T>(wrapper); + } + + } + /** * The action executed after a task completes. */ |