summaryrefslogtreecommitdiffstats
path: root/src/com
diff options
context:
space:
mode:
authorMichael Bien <[email protected]>2011-07-09 00:43:29 +0200
committerMichael Bien <[email protected]>2011-07-09 00:43:29 +0200
commit519cfb8a41e28e4d10e40496893a9aacf0bce6b1 (patch)
treea3b7812f29ab283afade071f6b8cd3ec42bb8709 /src/com
parent4fe7110357d2631960e23861a3221489d313c467 (diff)
changed impl to use an extended ThreadPoolExecutor directly.
Diffstat (limited to 'src/com')
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java78
1 files changed, 73 insertions, 5 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
index e601cc7d..eac3dc13 100644
--- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
+++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
@@ -10,18 +10,22 @@ import com.jogamp.opencl.util.CLMultiContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* A multithreaded, fixed size pool of OpenCL command queues.
- * It serves as a multiplexer distributing tasks over N queues usually run on N devices.
+ * CLCommandQueuePool serves as a multiplexer distributing tasks over N queues usually connected to N devices.
* The usage of this pool is similar to {@link ExecutorService} but it uses {@link CLTask}s
* instead of {@link Callable}s and provides a per-queue context for resource sharing across all tasks of one queue.
* @author Michael Bien
@@ -29,7 +33,7 @@ import java.util.concurrent.TimeoutException;
public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource {
private List<CLQueueContext> contexts;
- private ExecutorService excecutor;
+ private ThreadPoolExecutor excecutor;
private FinishAction finishAction = FinishAction.DO_NOTHING;
private boolean released;
@@ -56,7 +60,10 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
}
private void initExecutor() {
- this.excecutor = Executors.newFixedThreadPool(contexts.size(), new QueueThreadFactory(contexts));
+ BlockingQueue<Runnable> queue = new LinkedBlockingDeque<Runnable>();
+ QueueThreadFactory factory = new QueueThreadFactory(contexts);
+ int size = contexts.size();
+ this.excecutor = new CLThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, queue, factory);
}
public static <C extends CLQueueContext> CLCommandQueuePool<C> create(CLQueueContextFactory<C> factory, CLMultiContext mc, CLCommandQueue.Mode... modes) {
@@ -123,6 +130,10 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
return excecutor.invokeAny(wrapper);
}
+ /*public*/ CLTask<? super C, ?> takeCLTask() throws InterruptedException {
+ return ((CLFutureTask<? super C, ?>)excecutor.getQueue().take()).getCLTask();
+ }
+
/**
* Submits all tasks for immediate execution (blocking) until a result can be returned.
* All other unfinished but started tasks are cancelled.
@@ -222,14 +233,42 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
/**
* Returns the size of this pool (number of command queues).
*/
- public int getSize() {
+ public int getPoolSize() {
return contexts.size();
}
+ /**
+ * Returns the action which is executed when a task finishes.
+ */
public FinishAction getFinishAction() {
return finishAction;
}
+ /**
+ * Returns the approximate total number of tasks that have ever been scheduled for execution.
+ * Because the states of tasks and threads may change dynamically during computation, the returned
+ * value is only an approximation.
+ */
+ public long getTaskCount() {
+ return excecutor.getTaskCount();
+ }
+
+ /**
+ * Returns the approximate total number of tasks that have completed execution.
+ * Because the states of tasks and threads may change dynamically during computation,
+ * the returned value is only an approximation, but one that does not ever decrease across successive calls.
+ */
+ public long getCompletedTaskCount() {
+ return excecutor.getCompletedTaskCount();
+ }
+
+ /**
+ * Returns the approximate number of queues that are actively executing tasks.
+ */
+ public int getActiveCount() {
+ return excecutor.getActiveCount();
+ }
+
@Override
public boolean isReleased() {
return released;
@@ -305,6 +344,35 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
}
+ private static class CLFutureTask<C extends CLQueueContext, R> extends FutureTask<R> {
+
+ private final TaskWrapper<C, R> wrapper;
+
+ public CLFutureTask(TaskWrapper<C, R> wrapper) {
+ super(wrapper);
+ this.wrapper = wrapper;
+ }
+
+ public CLTask<? super C, R> getCLTask() {
+ return wrapper.task;
+ }
+
+ }
+
+ private static class CLThreadPoolExecutor extends ThreadPoolExecutor {
+
+ public CLThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+ }
+
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+ TaskWrapper<CLQueueContext, T> wrapper = (TaskWrapper<CLQueueContext, T>)callable;
+ return new CLFutureTask<CLQueueContext, T>(wrapper);
+ }
+
+ }
+
/**
* The action executed after a task completes.
*/