diff options
Diffstat (limited to 'src/com')
-rw-r--r-- | src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java | 130 | ||||
-rw-r--r-- | src/com/jogamp/opencl/util/concurrent/CLTask.java | 19 |
2 files changed, 149 insertions, 0 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java new file mode 100644 index 00000000..ef788d61 --- /dev/null +++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java @@ -0,0 +1,130 @@ +/* + * Created on Tuesday, May 03 2011 + */ +package com.jogamp.opencl.util.concurrent; + +import com.jogamp.opencl.CLCommandQueue; +import com.jogamp.opencl.CLDevice; +import com.jogamp.opencl.CLResource; +import com.jogamp.opencl.util.CLMultiContext; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; + +/** + * A multithreaded pool of OpenCL command queues. + * It serves as a multiplexer distributing tasks over N queues. + * @author Michael Bien + */ +public class CLCommandQueuePool implements CLResource { + + private final List<CLCommandQueue> queues; + private final ExecutorService excecutor; + + private CLCommandQueuePool(Collection<CLCommandQueue> queues) { + this.queues = Collections.unmodifiableList(new ArrayList<CLCommandQueue>(queues)); + this.excecutor = Executors.newFixedThreadPool(queues.size(), new QueueThreadFactory(this.queues)); + } + + public static CLCommandQueuePool create(CLMultiContext mc) { + return create(mc.getDevices()); + } + + public static CLCommandQueuePool create(Collection<CLDevice> devices, CLCommandQueue.Mode... modes) { + List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(devices.size()); + for (CLDevice device : devices) { + queues.add(device.createCommandQueue(modes)); + } + return create(queues); + } + + public static CLCommandQueuePool create(Collection<CLCommandQueue> queues) { + return new CLCommandQueuePool(queues); + } + + public <T> Future<T> submit(CLTask<T> task) { + return excecutor.submit(new TaskWrapper(task)); + } + + /** + * Calls {@link CLCommandQueue#flush()} on all queues. + */ + public void flush() { + for (CLCommandQueue queue : queues) { + queue.flush(); + } + } + + /** + * Calls {@link CLCommandQueue#finish()} on all queues. + */ + public void finish() { + for (CLCommandQueue queue : queues) { + queue.finish(); + } + } + + /** + * Releases all queues. + */ + public void release() { + for (CLCommandQueue queue : queues) { + queue.finish().release(); + } + } + + public List<CLCommandQueue> getQueues() { + return queues; + } + + @Override + public String toString() { + return getClass().getSimpleName()+" [queues: "+queues.size()+"]"; + } + + private static class QueueThreadFactory implements ThreadFactory { + + private final List<CLCommandQueue> queues; + private int index; + + private QueueThreadFactory(List<CLCommandQueue> queues) { + this.queues = queues; + this.index = 0; + } + + public synchronized Thread newThread(Runnable r) { + CLCommandQueue queue = queues.get(index++); + return new QueueThread(queue); + } + + } + + private static class QueueThread extends Thread { + private final CLCommandQueue queue; + public QueueThread(CLCommandQueue queue) { + this.queue = queue; + } + } + + private static class TaskWrapper<T> implements Callable<T> { + + private final CLTask<T> task; + + public TaskWrapper(CLTask<T> task) { + this.task = task; + } + + public T call() throws Exception { + QueueThread thread = (QueueThread) Thread.currentThread(); + return task.run(thread.queue); + } + + } + +} diff --git a/src/com/jogamp/opencl/util/concurrent/CLTask.java b/src/com/jogamp/opencl/util/concurrent/CLTask.java new file mode 100644 index 00000000..ebecb936 --- /dev/null +++ b/src/com/jogamp/opencl/util/concurrent/CLTask.java @@ -0,0 +1,19 @@ +/* + * Created on Tuesday, May 03 2011 18:09 + */ +package com.jogamp.opencl.util.concurrent; + +import com.jogamp.opencl.CLCommandQueue; + +/** + * A task executed on a command queue. + * @author Michael Bien + */ +public interface CLTask<R> { + + /** + * Runs the task on a queue and returns its result. + */ + R run(CLCommandQueue queue); + +} |