aboutsummaryrefslogtreecommitdiffstats
path: root/src/com/jogamp/opencl/util/concurrent
diff options
context:
space:
mode:
authorMichael Bien <[email protected]>2011-05-09 03:00:55 +0200
committerMichael Bien <[email protected]>2011-05-09 03:00:55 +0200
commitc59bc50229181ab9cb0e5012d7bb5caf2faa781f (patch)
tree62230d2d14861c14814d6bfcc98b7ee2e7c170fc /src/com/jogamp/opencl/util/concurrent
parentdedded707fc70fda3e40cf963d208202f8d6c42b (diff)
concurrent utils bugfixes and improvements.
- more utility methods - generics fixes - basic junit test for CLCommandQueuePool - javadoc and argument validation
Diffstat (limited to 'src/com/jogamp/opencl/util/concurrent')
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java69
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLQueueContext.java9
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLTask.java4
3 files changed, 60 insertions, 22 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
index b80f09e6..a6bbe4d0 100644
--- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
+++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
@@ -15,6 +15,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
/**
* A multithreaded fixed size pool of OpenCL command queues.
@@ -23,7 +24,7 @@ import java.util.concurrent.ThreadFactory;
* instead of {@link Callable}s.
* @author Michael Bien
*/
-public class CLCommandQueuePool implements CLResource {
+public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource {
private List<CLQueueContext> contexts;
private ExecutorService excecutor;
@@ -55,11 +56,11 @@ public class CLCommandQueuePool implements CLResource {
this.excecutor = Executors.newFixedThreadPool(contexts.size(), new QueueThreadFactory(contexts));
}
- public static CLCommandQueuePool create(CLQueueContextFactory factory, CLMultiContext mc, CLCommandQueue.Mode... modes) {
+ public static <C extends CLQueueContext> CLCommandQueuePool<C> create(CLQueueContextFactory<C> factory, CLMultiContext mc, CLCommandQueue.Mode... modes) {
return create(factory, mc.getDevices(), modes);
}
- public static CLCommandQueuePool create(CLQueueContextFactory factory, Collection<CLDevice> devices, CLCommandQueue.Mode... modes) {
+ public static <C extends CLQueueContext> CLCommandQueuePool<C> create(CLQueueContextFactory<C> factory, Collection<CLDevice> devices, CLCommandQueue.Mode... modes) {
List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(devices.size());
for (CLDevice device : devices) {
queues.add(device.createCommandQueue(modes));
@@ -67,21 +68,43 @@ public class CLCommandQueuePool implements CLResource {
return create(factory, queues);
}
- public static CLCommandQueuePool create(CLQueueContextFactory factory, Collection<CLCommandQueue> queues) {
+ public static <C extends CLQueueContext> CLCommandQueuePool create(CLQueueContextFactory<C> factory, Collection<CLCommandQueue> queues) {
return new CLCommandQueuePool(factory, queues);
}
- public <R> Future<R> submit(CLTask<R> task) {
+ /**
+ * @see ExecutorService#submit(java.util.concurrent.Callable)
+ */
+ public <R> Future<R> submit(CLTask<? extends C, R> task) {
return excecutor.submit(new TaskWrapper(task, finishAction));
}
- public <R> List<Future<R>> invokeAll(Collection<CLTask<R>> tasks) throws InterruptedException {
- List<TaskWrapper<R>> wrapper = new ArrayList<TaskWrapper<R>>(tasks.size());
- for (CLTask<R> task : tasks) {
- wrapper.add(new TaskWrapper<R>(task, finishAction));
- }
+ /**
+ * @see ExecutorService#invokeAll(java.util.Collection)
+ */
+ public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? super C, R>> tasks) throws InterruptedException {
+ List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks);
return excecutor.invokeAll(wrapper);
}
+
+ /**
+ * @see ExecutorService#invokeAll(java.util.Collection, long, java.util.concurrent.TimeUnit)
+ */
+ public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? super C, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks);
+ return excecutor.invokeAll(wrapper, timeout, unit);
+ }
+
+ private <R> List<TaskWrapper<C, R>> wrapTasks(Collection<? extends CLTask<? super C, R>> tasks) {
+ List<TaskWrapper<C, R>> wrapper = new ArrayList<TaskWrapper<C, R>>(tasks.size());
+ for (CLTask<? super C, R> task : tasks) {
+ if(task == null) {
+ throw new NullPointerException("at least one task was null");
+ }
+ wrapper.add(new TaskWrapper<C, R>(task, finishAction));
+ }
+ return wrapper;
+ }
/**
* Switches the context of all queues - this operation can be expensive.
@@ -171,35 +194,41 @@ public class CLCommandQueuePool implements CLResource {
this.index = 0;
}
- public synchronized Thread newThread(Runnable r) {
+ public synchronized Thread newThread(Runnable runnable) {
+
+ SecurityManager sm = System.getSecurityManager();
+ ThreadGroup group = (sm != null)? sm.getThreadGroup() : Thread.currentThread().getThreadGroup();
+
CLQueueContext queue = context.get(index);
- return new QueueThread(queue, index++);
+ QueueThread thread = new QueueThread(group, runnable, queue, index++);
+ thread.setDaemon(true);
+
+ return thread;
}
}
private static class QueueThread extends Thread {
private final CLQueueContext context;
- public QueueThread(CLQueueContext context, int index) {
- super("queue-worker-thread-"+index+"["+context+"]");
+ public QueueThread(ThreadGroup group, Runnable runnable, CLQueueContext context, int index) {
+ super(group, runnable, "queue-worker-thread-"+index+"["+context+"]");
this.context = context;
- this.setDaemon(true);
}
}
- private static class TaskWrapper<T> implements Callable<T> {
+ private static class TaskWrapper<C extends CLQueueContext, R> implements Callable<R> {
- private final CLTask<T> task;
+ private final CLTask<? super C, R> task;
private final FinishAction mode;
- public TaskWrapper(CLTask<T> task, FinishAction mode) {
+ public TaskWrapper(CLTask<? super C, R> task, FinishAction mode) {
this.task = task;
this.mode = mode;
}
- public T call() throws Exception {
+ public R call() throws Exception {
CLQueueContext context = ((QueueThread)Thread.currentThread()).context;
- T result = task.run(context);
+ R result = task.execute((C)context);
if(mode.equals(FinishAction.FLUSH)) {
context.queue.flush();
}else if(mode.equals(FinishAction.FINISH)) {
diff --git a/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java b/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java
index 3956f93d..11b86889 100644
--- a/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java
+++ b/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java
@@ -4,6 +4,7 @@
package com.jogamp.opencl.util.concurrent;
import com.jogamp.opencl.CLCommandQueue;
+import com.jogamp.opencl.CLContext;
import com.jogamp.opencl.CLKernel;
import com.jogamp.opencl.CLProgram;
import com.jogamp.opencl.CLResource;
@@ -24,6 +25,10 @@ public abstract class CLQueueContext implements CLResource {
return queue;
}
+ public CLContext getCLContext() {
+ return queue.getContext();
+ }
+
public static class CLSimpleQueueContext extends CLQueueContext {
public final CLProgram program;
@@ -39,6 +44,10 @@ public abstract class CLQueueContext implements CLResource {
return kernels;
}
+ public CLKernel getKernel(String name) {
+ return kernels.get(name);
+ }
+
public CLProgram getProgram() {
return program;
}
diff --git a/src/com/jogamp/opencl/util/concurrent/CLTask.java b/src/com/jogamp/opencl/util/concurrent/CLTask.java
index ff0f7614..0cfd24a5 100644
--- a/src/com/jogamp/opencl/util/concurrent/CLTask.java
+++ b/src/com/jogamp/opencl/util/concurrent/CLTask.java
@@ -8,11 +8,11 @@ package com.jogamp.opencl.util.concurrent;
* A task executed on a command queue.
* @author Michael Bien
*/
-public interface CLTask<R> {
+public interface CLTask<C extends CLQueueContext, R> {
/**
* Runs the task on a queue and returns a result.
*/
- R run(CLQueueContext context);
+ R execute(C context);
}