summaryrefslogtreecommitdiffstats
path: root/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java')
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java69
1 files changed, 49 insertions, 20 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
index b80f09e6..a6bbe4d0 100644
--- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
+++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
@@ -15,6 +15,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
/**
* A multithreaded fixed size pool of OpenCL command queues.
@@ -23,7 +24,7 @@ import java.util.concurrent.ThreadFactory;
* instead of {@link Callable}s.
* @author Michael Bien
*/
-public class CLCommandQueuePool implements CLResource {
+public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource {
private List<CLQueueContext> contexts;
private ExecutorService excecutor;
@@ -55,11 +56,11 @@ public class CLCommandQueuePool implements CLResource {
this.excecutor = Executors.newFixedThreadPool(contexts.size(), new QueueThreadFactory(contexts));
}
- public static CLCommandQueuePool create(CLQueueContextFactory factory, CLMultiContext mc, CLCommandQueue.Mode... modes) {
+ public static <C extends CLQueueContext> CLCommandQueuePool<C> create(CLQueueContextFactory<C> factory, CLMultiContext mc, CLCommandQueue.Mode... modes) {
return create(factory, mc.getDevices(), modes);
}
- public static CLCommandQueuePool create(CLQueueContextFactory factory, Collection<CLDevice> devices, CLCommandQueue.Mode... modes) {
+ public static <C extends CLQueueContext> CLCommandQueuePool<C> create(CLQueueContextFactory<C> factory, Collection<CLDevice> devices, CLCommandQueue.Mode... modes) {
List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(devices.size());
for (CLDevice device : devices) {
queues.add(device.createCommandQueue(modes));
@@ -67,21 +68,43 @@ public class CLCommandQueuePool implements CLResource {
return create(factory, queues);
}
- public static CLCommandQueuePool create(CLQueueContextFactory factory, Collection<CLCommandQueue> queues) {
+ public static <C extends CLQueueContext> CLCommandQueuePool create(CLQueueContextFactory<C> factory, Collection<CLCommandQueue> queues) {
return new CLCommandQueuePool(factory, queues);
}
- public <R> Future<R> submit(CLTask<R> task) {
+ /**
+ * @see ExecutorService#submit(java.util.concurrent.Callable)
+ */
+ public <R> Future<R> submit(CLTask<? extends C, R> task) {
return excecutor.submit(new TaskWrapper(task, finishAction));
}
- public <R> List<Future<R>> invokeAll(Collection<CLTask<R>> tasks) throws InterruptedException {
- List<TaskWrapper<R>> wrapper = new ArrayList<TaskWrapper<R>>(tasks.size());
- for (CLTask<R> task : tasks) {
- wrapper.add(new TaskWrapper<R>(task, finishAction));
- }
+ /**
+ * @see ExecutorService#invokeAll(java.util.Collection)
+ */
+ public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? super C, R>> tasks) throws InterruptedException {
+ List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks);
return excecutor.invokeAll(wrapper);
}
+
+ /**
+ * @see ExecutorService#invokeAll(java.util.Collection, long, java.util.concurrent.TimeUnit)
+ */
+ public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? super C, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks);
+ return excecutor.invokeAll(wrapper, timeout, unit);
+ }
+
+ private <R> List<TaskWrapper<C, R>> wrapTasks(Collection<? extends CLTask<? super C, R>> tasks) {
+ List<TaskWrapper<C, R>> wrapper = new ArrayList<TaskWrapper<C, R>>(tasks.size());
+ for (CLTask<? super C, R> task : tasks) {
+ if(task == null) {
+ throw new NullPointerException("at least one task was null");
+ }
+ wrapper.add(new TaskWrapper<C, R>(task, finishAction));
+ }
+ return wrapper;
+ }
/**
* Switches the context of all queues - this operation can be expensive.
@@ -171,35 +194,41 @@ public class CLCommandQueuePool implements CLResource {
this.index = 0;
}
- public synchronized Thread newThread(Runnable r) {
+ public synchronized Thread newThread(Runnable runnable) {
+
+ SecurityManager sm = System.getSecurityManager();
+ ThreadGroup group = (sm != null)? sm.getThreadGroup() : Thread.currentThread().getThreadGroup();
+
CLQueueContext queue = context.get(index);
- return new QueueThread(queue, index++);
+ QueueThread thread = new QueueThread(group, runnable, queue, index++);
+ thread.setDaemon(true);
+
+ return thread;
}
}
private static class QueueThread extends Thread {
private final CLQueueContext context;
- public QueueThread(CLQueueContext context, int index) {
- super("queue-worker-thread-"+index+"["+context+"]");
+ public QueueThread(ThreadGroup group, Runnable runnable, CLQueueContext context, int index) {
+ super(group, runnable, "queue-worker-thread-"+index+"["+context+"]");
this.context = context;
- this.setDaemon(true);
}
}
- private static class TaskWrapper<T> implements Callable<T> {
+ private static class TaskWrapper<C extends CLQueueContext, R> implements Callable<R> {
- private final CLTask<T> task;
+ private final CLTask<? super C, R> task;
private final FinishAction mode;
- public TaskWrapper(CLTask<T> task, FinishAction mode) {
+ public TaskWrapper(CLTask<? super C, R> task, FinishAction mode) {
this.task = task;
this.mode = mode;
}
- public T call() throws Exception {
+ public R call() throws Exception {
CLQueueContext context = ((QueueThread)Thread.currentThread()).context;
- T result = task.run(context);
+ R result = task.execute((C)context);
if(mode.equals(FinishAction.FLUSH)) {
context.queue.flush();
}else if(mode.equals(FinishAction.FINISH)) {