summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Bien <[email protected]>2011-07-05 23:39:46 +0200
committerMichael Bien <[email protected]>2011-07-05 23:39:46 +0200
commitb7390e0694f747ac51b6ad26b05b1a2a11df0361 (patch)
treefea96f2b01d1fb319880b152f07c83c333f50f9a
parent3b5da206886d9c73cbed643733ceee71d9416178 (diff)
- initial import of CLTaskCompletionService.
- fixed pool shutdown behavior. It will now wait till all started tasks finish to be able to release the queue context.
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java29
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java76
-rw-r--r--test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java87
3 files changed, 160 insertions, 32 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
index 31aeec47..e601cc7d 100644
--- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
+++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
@@ -80,7 +80,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
* @see ExecutorService#submit(java.util.concurrent.Callable)
*/
public <R> Future<R> submit(CLTask<? super C, R> task) {
- return excecutor.submit(new TaskWrapper(task, finishAction));
+ return excecutor.submit(wrapTask(task));
}
/**
@@ -133,6 +133,10 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
return excecutor.invokeAny(wrapper, timeout, unit);
}
+ <R> TaskWrapper<C, R> wrapTask(CLTask<? super C, R> task) {
+ return new TaskWrapper(task, finishAction);
+ }
+
private <R> List<TaskWrapper<C, R>> wrapTasks(Collection<? extends CLTask<? super C, R>> tasks) {
List<TaskWrapper<C, R>> wrapper = new ArrayList<TaskWrapper<C, R>>(tasks.size());
for (CLTask<? super C, R> task : tasks) {
@@ -178,7 +182,8 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
}
/**
- * Releases all queues.
+ * Releases the queue context, all queues including a shutdown of the internal threadpool.
+ * The call will block until all currently executing tasks have finished, no new tasks are started.
*/
@Override
public void release() {
@@ -186,13 +191,23 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
throw new RuntimeException(getClass().getSimpleName()+" already released");
}
released = true;
- excecutor.shutdown();
- for (CLQueueContext context : contexts) {
- context.queue.finish().release();
- context.release();
+ excecutor.shutdownNow();
+ try {
+ excecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }finally{
+ for (CLQueueContext context : contexts) {
+ context.queue.finish().release();
+ context.release();
+ }
}
}
+ ExecutorService getExcecutor() {
+ return excecutor;
+ }
+
/**
* Returns the command queues used in this pool.
*/
@@ -271,7 +286,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
private final CLTask<? super C, R> task;
private final FinishAction mode;
- public TaskWrapper(CLTask<? super C, R> task, FinishAction mode) {
+ private TaskWrapper(CLTask<? super C, R> task, FinishAction mode) {
this.task = task;
this.mode = mode;
}
diff --git a/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java b/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java
new file mode 100644
index 00000000..d1d26824
--- /dev/null
+++ b/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java
@@ -0,0 +1,76 @@
+/*
+ * Created on Tuesday, July 05 2011 00:26
+ */
+package com.jogamp.opencl.util.concurrent;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@link CompletionService} for {@link CLTask}s executed in a {@link CLCommandQueuePool}.
+ * It simplifies asynchronous execution of tasks with the same result type in a potentially shared pool.
+ * @see CompletionService
+ * @author Michael Bien
+ */
+public class CLTaskCompletionService<C extends CLQueueContext, R> {
+
+ private final ExecutorCompletionService<R> service;
+ private final CLCommandQueuePool pool;
+
+ /**
+ * Creates an CLTaskCompletionService using the supplied pool for base
+ * task execution and a LinkedBlockingQueue with the capacity of {@link Integer#MAX_VALUE}
+ * as a completion queue.
+ */
+ public CLTaskCompletionService(CLCommandQueuePool<C> pool) {
+ this.service = new ExecutorCompletionService<R>(pool.getExcecutor());
+ this.pool = pool;
+ }
+
+ /**
+ * Creates an CLTaskCompletionService using the supplied pool for base
+ * task execution the supplied queue as its completion queue.
+ */
+ public CLTaskCompletionService(CLCommandQueuePool<C> pool, BlockingQueue queue) {
+ this.service = new ExecutorCompletionService<R>(pool.getExcecutor(), queue);
+ this.pool = pool;
+ }
+
+ /**
+ * Submits a CLTask for execution and returns a Future representing the pending
+ * results of the task. Upon completion, this task may be taken or polled.
+ * @see CompletionService#submit(java.util.concurrent.Callable)
+ */
+ public Future<R> submit(CLTask<? super C, R> task) {
+ return service.submit(pool.wrapTask(task));
+ }
+
+ /**
+ * Retrieves and removes the Future representing the next completed task, waiting if none are yet present.
+ * @see CompletionService#take()
+ */
+ public Future<R> take() throws InterruptedException {
+ return service.take();
+ }
+
+ /**
+ * Retrieves and removes the Future representing the next completed task or null if none are present.
+ * @see CompletionService#poll()
+ */
+ public Future<R> poll() {
+ return service.poll();
+ }
+
+ /**
+ * Retrieves and removes the Future representing the next completed task, waiting if necessary
+ * up to the specified wait time if none are yet present.
+ * @see CompletionService#poll(long, java.util.concurrent.TimeUnit)
+ */
+ public Future<R> poll(long timeout, TimeUnit unit) throws InterruptedException {
+ return service.poll(timeout, unit);
+ }
+
+}
diff --git a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java
index 68ca33f9..6e5d5e99 100644
--- a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java
+++ b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java
@@ -19,7 +19,6 @@ import org.junit.Rule;
import org.junit.rules.MethodRule;
import org.junit.rules.Timeout;
import com.jogamp.opencl.util.CLMultiContext;
-import java.nio.Buffer;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
@@ -70,29 +69,36 @@ public class CLMultiContextTest {
+ " array[index]++; \n"
+ "} \n";
- private final class CLTestTask implements CLTask<CLSimpleQueueContext, Buffer> {
+ private final class CLTestTask implements CLTask<CLSimpleQueueContext, IntBuffer> {
- private final Buffer data;
+ private final IntBuffer data;
- public CLTestTask(Buffer buffer) {
+ public CLTestTask(IntBuffer buffer) {
this.data = buffer;
}
@Override
- public Buffer execute(CLSimpleQueueContext qc) {
+ public IntBuffer execute(CLSimpleQueueContext qc) {
CLCommandQueue queue = qc.getQueue();
CLContext context = qc.getCLContext();
CLKernel kernel = qc.getKernel("compute");
- CLBuffer<Buffer> buffer = null;
+// System.out.println(Thread.currentThread().getName()+" / "+queue);
+ assertFalse(qc.isReleased());
+ assertFalse(queue.isReleased());
+ assertFalse(context.isReleased());
+ assertFalse(kernel.isReleased());
+
+ CLBuffer<IntBuffer> buffer = null;
try{
+
buffer = context.createBuffer(data);
int gws = buffer.getCLCapacity();
kernel.putArg(buffer).putArg(gws).rewind();
- queue.putWriteBuffer(buffer, true);
+ queue.putWriteBuffer(buffer, false);
queue.put1DRangeKernel(kernel, 0, gws, 0);
queue.putReadBuffer(buffer, true);
}finally{
@@ -106,6 +112,16 @@ public class CLMultiContextTest {
}
+ private List<CLTestTask> createTasks(IntBuffer data, int taskCount, int slice) {
+ List<CLTestTask> tasks = new ArrayList<CLTestTask>(taskCount);
+ for (int i = 0; i < taskCount; i++) {
+ IntBuffer subBuffer = Buffers.slice(data, i*slice, slice);
+ assertEquals(slice, subBuffer.capacity());
+ tasks.add(new CLTestTask(subBuffer));
+ }
+ return tasks;
+ }
+
@Test
public void commandQueuePoolTest() throws InterruptedException, ExecutionException {
@@ -123,48 +139,62 @@ public class CLMultiContextTest {
final int taskCount = pool.getSize() * tasksPerQueue;
IntBuffer data = Buffers.newDirectIntBuffer(slice*taskCount);
-
- List<CLTestTask> tasks = new ArrayList<CLTestTask>(taskCount);
-
- for (int i = 0; i < taskCount; i++) {
- IntBuffer subBuffer = Buffers.slice(data, i*slice, slice);
- assertEquals(slice, subBuffer.capacity());
- tasks.add(new CLTestTask(subBuffer));
- }
+ List<CLTestTask> tasks = createTasks(data, taskCount, slice);
out.println("invoking "+tasks.size()+" tasks on "+pool.getSize()+" queues");
// blocking invoke
- List<Future<Buffer>> results = pool.invokeAll(tasks);
+ List<Future<IntBuffer>> results = pool.invokeAll(tasks);
assertNotNull(results);
checkBuffer(1, data);
// submit blocking emediatly
for (CLTestTask task : tasks) {
- Buffer ret = pool.submit(task).get();
+ IntBuffer ret = pool.submit(task).get();
assertNotNull(ret);
+ checkBuffer(2, ret);
}
checkBuffer(2, data);
// submitAll using futures
- List<Future<Buffer>> futures = pool.submitAll(tasks);
- for (Future<Buffer> future : futures) {
- Buffer ret = future.get();
+ List<Future<IntBuffer>> futures = pool.submitAll(tasks);
+ for (Future<IntBuffer> future : futures) {
+ IntBuffer ret = future.get();
assertNotNull(ret);
+ checkBuffer(3, ret);
}
checkBuffer(3, data);
// switching contexts using different program
factory = CLQueueContextFactory.createSimple(programSource.replaceAll("\\+\\+", "--"));
pool.switchContext(factory);
- List<Future<Buffer>> results2 = pool.invokeAll(tasks);
+ List<Future<IntBuffer>> results2 = pool.invokeAll(tasks);
assertNotNull(results2);
checkBuffer(2, data);
-
+
+ // Note: we have to make sure that we don't resubmit old tasks at this point since
+ // we wait only for completion of a subset of tasks.
// submit any
- Buffer buffer = pool.invokeAny(tasks);
- assertNotNull(buffer);
- checkContains(1, data);
+ data = Buffers.newDirectIntBuffer(slice*taskCount);
+ tasks = createTasks(data, taskCount, slice);
+
+ IntBuffer ret1 = pool.invokeAny(tasks);
+ assertNotNull(ret1);
+ checkBuffer(-1, ret1);
+ checkContains(-1, data);
+
+ // completionservice take/any test
+ data = Buffers.newDirectIntBuffer(slice*taskCount);
+ tasks = createTasks(data, taskCount, slice);
+
+ CLTaskCompletionService<CLSimpleQueueContext, IntBuffer> service = new CLTaskCompletionService(pool);
+ for (CLTestTask task : tasks) {
+ service.submit(task);
+ }
+ IntBuffer ret2 = service.take().get();
+ assertNotNull(ret2);
+ checkBuffer(-1, ret2);
+ checkContains(-1, data);
pool.release();
}finally{
@@ -189,4 +219,11 @@ public class CLMultiContextTest {
fail();
}
+// @Test
+ public void loadTest() throws InterruptedException, ExecutionException {
+ for (int i = 0; i < 40; i++) {
+ commandQueuePoolTest();
+ }
+ }
+
}