aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Bien <[email protected]>2011-05-04 16:26:46 +0200
committerMichael Bien <[email protected]>2011-05-04 16:26:46 +0200
commit3a20670487663cfbadea480de6e0322c3055afcf (patch)
treed0c29209ca14aa635c194aaff9c5a578f9d55f4f
parent6fcf15f11e2a982b480855fbc75e430e5f2b9ad6 (diff)
initial import of CLCommandQueuePool and CLTask.
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java130
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLTask.java19
-rw-r--r--test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java (renamed from test/com/jogamp/opencl/CLMultiContextTest.java)5
3 files changed, 153 insertions, 1 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
new file mode 100644
index 00000000..ef788d61
--- /dev/null
+++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
@@ -0,0 +1,130 @@
+/*
+ * Created on Tuesday, May 03 2011
+ */
+package com.jogamp.opencl.util.concurrent;
+
+import com.jogamp.opencl.CLCommandQueue;
+import com.jogamp.opencl.CLDevice;
+import com.jogamp.opencl.CLResource;
+import com.jogamp.opencl.util.CLMultiContext;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * A multithreaded pool of OpenCL command queues.
+ * It serves as a multiplexer distributing tasks over N queues.
+ * @author Michael Bien
+ */
+public class CLCommandQueuePool implements CLResource {
+
+ private final List<CLCommandQueue> queues;
+ private final ExecutorService excecutor;
+
+ private CLCommandQueuePool(Collection<CLCommandQueue> queues) {
+ this.queues = Collections.unmodifiableList(new ArrayList<CLCommandQueue>(queues));
+ this.excecutor = Executors.newFixedThreadPool(queues.size(), new QueueThreadFactory(this.queues));
+ }
+
+ public static CLCommandQueuePool create(CLMultiContext mc) {
+ return create(mc.getDevices());
+ }
+
+ public static CLCommandQueuePool create(Collection<CLDevice> devices, CLCommandQueue.Mode... modes) {
+ List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(devices.size());
+ for (CLDevice device : devices) {
+ queues.add(device.createCommandQueue(modes));
+ }
+ return create(queues);
+ }
+
+ public static CLCommandQueuePool create(Collection<CLCommandQueue> queues) {
+ return new CLCommandQueuePool(queues);
+ }
+
+ public <T> Future<T> submit(CLTask<T> task) {
+ return excecutor.submit(new TaskWrapper(task));
+ }
+
+ /**
+ * Calls {@link CLCommandQueue#flush()} on all queues.
+ */
+ public void flush() {
+ for (CLCommandQueue queue : queues) {
+ queue.flush();
+ }
+ }
+
+ /**
+ * Calls {@link CLCommandQueue#finish()} on all queues.
+ */
+ public void finish() {
+ for (CLCommandQueue queue : queues) {
+ queue.finish();
+ }
+ }
+
+ /**
+ * Releases all queues.
+ */
+ public void release() {
+ for (CLCommandQueue queue : queues) {
+ queue.finish().release();
+ }
+ }
+
+ public List<CLCommandQueue> getQueues() {
+ return queues;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName()+" [queues: "+queues.size()+"]";
+ }
+
+ private static class QueueThreadFactory implements ThreadFactory {
+
+ private final List<CLCommandQueue> queues;
+ private int index;
+
+ private QueueThreadFactory(List<CLCommandQueue> queues) {
+ this.queues = queues;
+ this.index = 0;
+ }
+
+ public synchronized Thread newThread(Runnable r) {
+ CLCommandQueue queue = queues.get(index++);
+ return new QueueThread(queue);
+ }
+
+ }
+
+ private static class QueueThread extends Thread {
+ private final CLCommandQueue queue;
+ public QueueThread(CLCommandQueue queue) {
+ this.queue = queue;
+ }
+ }
+
+ private static class TaskWrapper<T> implements Callable<T> {
+
+ private final CLTask<T> task;
+
+ public TaskWrapper(CLTask<T> task) {
+ this.task = task;
+ }
+
+ public T call() throws Exception {
+ QueueThread thread = (QueueThread) Thread.currentThread();
+ return task.run(thread.queue);
+ }
+
+ }
+
+}
diff --git a/src/com/jogamp/opencl/util/concurrent/CLTask.java b/src/com/jogamp/opencl/util/concurrent/CLTask.java
new file mode 100644
index 00000000..ebecb936
--- /dev/null
+++ b/src/com/jogamp/opencl/util/concurrent/CLTask.java
@@ -0,0 +1,19 @@
+/*
+ * Created on Tuesday, May 03 2011 18:09
+ */
+package com.jogamp.opencl.util.concurrent;
+
+import com.jogamp.opencl.CLCommandQueue;
+
+/**
+ * A task executed on a command queue.
+ * @author Michael Bien
+ */
+public interface CLTask<R> {
+
+ /**
+ * Runs the task on a queue and returns its result.
+ */
+ R run(CLCommandQueue queue);
+
+}
diff --git a/test/com/jogamp/opencl/CLMultiContextTest.java b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java
index 20274601..8e96dafa 100644
--- a/test/com/jogamp/opencl/CLMultiContextTest.java
+++ b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java
@@ -1,8 +1,11 @@
/*
* Created on Tuesday, May 03 2011
*/
-package com.jogamp.opencl;
+package com.jogamp.opencl.util.concurrent;
+import com.jogamp.opencl.CLContext;
+import com.jogamp.opencl.CLDevice;
+import com.jogamp.opencl.CLPlatform;
import org.junit.Rule;
import org.junit.rules.MethodRule;
import org.junit.rules.Timeout;