diff options
author | Michael Bien <[email protected]> | 2011-05-09 03:00:55 +0200 |
---|---|---|
committer | Michael Bien <[email protected]> | 2011-05-09 03:00:55 +0200 |
commit | c59bc50229181ab9cb0e5012d7bb5caf2faa781f (patch) | |
tree | 62230d2d14861c14814d6bfcc98b7ee2e7c170fc /src/com | |
parent | dedded707fc70fda3e40cf963d208202f8d6c42b (diff) |
concurrent utils bugfixes and improvements.
- more utility methods
- generics fixes
- basic junit test for CLCommandQueuePool
- javadoc and argument validation
Diffstat (limited to 'src/com')
4 files changed, 71 insertions, 22 deletions
diff --git a/src/com/jogamp/opencl/util/CLMultiContext.java b/src/com/jogamp/opencl/util/CLMultiContext.java index f588fcef..f74c0a35 100644 --- a/src/com/jogamp/opencl/util/CLMultiContext.java +++ b/src/com/jogamp/opencl/util/CLMultiContext.java @@ -41,6 +41,13 @@ public class CLMultiContext implements CLResource { * Creates a multi context with all devices of the specified platforms and types. */ public static CLMultiContext create(CLPlatform[] platforms, CLDevice.Type... types) { + + if(platforms == null) { + throw new NullPointerException("platform list was null"); + }else if(platforms.length == 0) { + throw new IllegalArgumentException("platform list was empty"); + } + List<CLDevice> devices = new ArrayList<CLDevice>(); for (CLPlatform platform : platforms) { devices.addAll(asList(platform.listCLDevices(types))); @@ -54,6 +61,10 @@ public class CLMultiContext implements CLResource { */ public static CLMultiContext create(Collection<CLDevice> devices) { + if(devices.isEmpty()) { + throw new IllegalArgumentException("device list was empty"); + } + Map<CLPlatform, List<CLDevice>> platformDevicesMap = filterPlatformConflicts(devices); // create contexts diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java index b80f09e6..a6bbe4d0 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java +++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java @@ -15,6 +15,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; /** * A multithreaded fixed size pool of OpenCL command queues. @@ -23,7 +24,7 @@ import java.util.concurrent.ThreadFactory; * instead of {@link Callable}s. * @author Michael Bien */ -public class CLCommandQueuePool implements CLResource { +public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource { private List<CLQueueContext> contexts; private ExecutorService excecutor; @@ -55,11 +56,11 @@ public class CLCommandQueuePool implements CLResource { this.excecutor = Executors.newFixedThreadPool(contexts.size(), new QueueThreadFactory(contexts)); } - public static CLCommandQueuePool create(CLQueueContextFactory factory, CLMultiContext mc, CLCommandQueue.Mode... modes) { + public static <C extends CLQueueContext> CLCommandQueuePool<C> create(CLQueueContextFactory<C> factory, CLMultiContext mc, CLCommandQueue.Mode... modes) { return create(factory, mc.getDevices(), modes); } - public static CLCommandQueuePool create(CLQueueContextFactory factory, Collection<CLDevice> devices, CLCommandQueue.Mode... modes) { + public static <C extends CLQueueContext> CLCommandQueuePool<C> create(CLQueueContextFactory<C> factory, Collection<CLDevice> devices, CLCommandQueue.Mode... modes) { List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(devices.size()); for (CLDevice device : devices) { queues.add(device.createCommandQueue(modes)); @@ -67,21 +68,43 @@ public class CLCommandQueuePool implements CLResource { return create(factory, queues); } - public static CLCommandQueuePool create(CLQueueContextFactory factory, Collection<CLCommandQueue> queues) { + public static <C extends CLQueueContext> CLCommandQueuePool create(CLQueueContextFactory<C> factory, Collection<CLCommandQueue> queues) { return new CLCommandQueuePool(factory, queues); } - public <R> Future<R> submit(CLTask<R> task) { + /** + * @see ExecutorService#submit(java.util.concurrent.Callable) + */ + public <R> Future<R> submit(CLTask<? extends C, R> task) { return excecutor.submit(new TaskWrapper(task, finishAction)); } - public <R> List<Future<R>> invokeAll(Collection<CLTask<R>> tasks) throws InterruptedException { - List<TaskWrapper<R>> wrapper = new ArrayList<TaskWrapper<R>>(tasks.size()); - for (CLTask<R> task : tasks) { - wrapper.add(new TaskWrapper<R>(task, finishAction)); - } + /** + * @see ExecutorService#invokeAll(java.util.Collection) + */ + public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? super C, R>> tasks) throws InterruptedException { + List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks); return excecutor.invokeAll(wrapper); } + + /** + * @see ExecutorService#invokeAll(java.util.Collection, long, java.util.concurrent.TimeUnit) + */ + public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? super C, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException { + List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks); + return excecutor.invokeAll(wrapper, timeout, unit); + } + + private <R> List<TaskWrapper<C, R>> wrapTasks(Collection<? extends CLTask<? super C, R>> tasks) { + List<TaskWrapper<C, R>> wrapper = new ArrayList<TaskWrapper<C, R>>(tasks.size()); + for (CLTask<? super C, R> task : tasks) { + if(task == null) { + throw new NullPointerException("at least one task was null"); + } + wrapper.add(new TaskWrapper<C, R>(task, finishAction)); + } + return wrapper; + } /** * Switches the context of all queues - this operation can be expensive. @@ -171,35 +194,41 @@ public class CLCommandQueuePool implements CLResource { this.index = 0; } - public synchronized Thread newThread(Runnable r) { + public synchronized Thread newThread(Runnable runnable) { + + SecurityManager sm = System.getSecurityManager(); + ThreadGroup group = (sm != null)? sm.getThreadGroup() : Thread.currentThread().getThreadGroup(); + CLQueueContext queue = context.get(index); - return new QueueThread(queue, index++); + QueueThread thread = new QueueThread(group, runnable, queue, index++); + thread.setDaemon(true); + + return thread; } } private static class QueueThread extends Thread { private final CLQueueContext context; - public QueueThread(CLQueueContext context, int index) { - super("queue-worker-thread-"+index+"["+context+"]"); + public QueueThread(ThreadGroup group, Runnable runnable, CLQueueContext context, int index) { + super(group, runnable, "queue-worker-thread-"+index+"["+context+"]"); this.context = context; - this.setDaemon(true); } } - private static class TaskWrapper<T> implements Callable<T> { + private static class TaskWrapper<C extends CLQueueContext, R> implements Callable<R> { - private final CLTask<T> task; + private final CLTask<? super C, R> task; private final FinishAction mode; - public TaskWrapper(CLTask<T> task, FinishAction mode) { + public TaskWrapper(CLTask<? super C, R> task, FinishAction mode) { this.task = task; this.mode = mode; } - public T call() throws Exception { + public R call() throws Exception { CLQueueContext context = ((QueueThread)Thread.currentThread()).context; - T result = task.run(context); + R result = task.execute((C)context); if(mode.equals(FinishAction.FLUSH)) { context.queue.flush(); }else if(mode.equals(FinishAction.FINISH)) { diff --git a/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java b/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java index 3956f93d..11b86889 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java +++ b/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java @@ -4,6 +4,7 @@ package com.jogamp.opencl.util.concurrent; import com.jogamp.opencl.CLCommandQueue; +import com.jogamp.opencl.CLContext; import com.jogamp.opencl.CLKernel; import com.jogamp.opencl.CLProgram; import com.jogamp.opencl.CLResource; @@ -24,6 +25,10 @@ public abstract class CLQueueContext implements CLResource { return queue; } + public CLContext getCLContext() { + return queue.getContext(); + } + public static class CLSimpleQueueContext extends CLQueueContext { public final CLProgram program; @@ -39,6 +44,10 @@ public abstract class CLQueueContext implements CLResource { return kernels; } + public CLKernel getKernel(String name) { + return kernels.get(name); + } + public CLProgram getProgram() { return program; } diff --git a/src/com/jogamp/opencl/util/concurrent/CLTask.java b/src/com/jogamp/opencl/util/concurrent/CLTask.java index ff0f7614..0cfd24a5 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLTask.java +++ b/src/com/jogamp/opencl/util/concurrent/CLTask.java @@ -8,11 +8,11 @@ package com.jogamp.opencl.util.concurrent; * A task executed on a command queue. * @author Michael Bien */ -public interface CLTask<R> { +public interface CLTask<C extends CLQueueContext, R> { /** * Runs the task on a queue and returns a result. */ - R run(CLQueueContext context); + R execute(C context); } |