summaryrefslogtreecommitdiffstats
path: root/src/com
diff options
context:
space:
mode:
authorMichael Bien <[email protected]>2011-07-05 23:39:46 +0200
committerMichael Bien <[email protected]>2011-07-05 23:39:46 +0200
commitb7390e0694f747ac51b6ad26b05b1a2a11df0361 (patch)
treefea96f2b01d1fb319880b152f07c83c333f50f9a /src/com
parent3b5da206886d9c73cbed643733ceee71d9416178 (diff)
- initial import of CLTaskCompletionService.
- fixed pool shutdown behavior. It will now wait till all started tasks finish to be able to release the queue context.
Diffstat (limited to 'src/com')
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java29
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java76
2 files changed, 98 insertions, 7 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
index 31aeec47..e601cc7d 100644
--- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
+++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
@@ -80,7 +80,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
* @see ExecutorService#submit(java.util.concurrent.Callable)
*/
public <R> Future<R> submit(CLTask<? super C, R> task) {
- return excecutor.submit(new TaskWrapper(task, finishAction));
+ return excecutor.submit(wrapTask(task));
}
/**
@@ -133,6 +133,10 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
return excecutor.invokeAny(wrapper, timeout, unit);
}
+ <R> TaskWrapper<C, R> wrapTask(CLTask<? super C, R> task) {
+ return new TaskWrapper(task, finishAction);
+ }
+
private <R> List<TaskWrapper<C, R>> wrapTasks(Collection<? extends CLTask<? super C, R>> tasks) {
List<TaskWrapper<C, R>> wrapper = new ArrayList<TaskWrapper<C, R>>(tasks.size());
for (CLTask<? super C, R> task : tasks) {
@@ -178,7 +182,8 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
}
/**
- * Releases all queues.
+ * Releases the queue context, all queues including a shutdown of the internal threadpool.
+ * The call will block until all currently executing tasks have finished, no new tasks are started.
*/
@Override
public void release() {
@@ -186,13 +191,23 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
throw new RuntimeException(getClass().getSimpleName()+" already released");
}
released = true;
- excecutor.shutdown();
- for (CLQueueContext context : contexts) {
- context.queue.finish().release();
- context.release();
+ excecutor.shutdownNow();
+ try {
+ excecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }finally{
+ for (CLQueueContext context : contexts) {
+ context.queue.finish().release();
+ context.release();
+ }
}
}
+ ExecutorService getExcecutor() {
+ return excecutor;
+ }
+
/**
* Returns the command queues used in this pool.
*/
@@ -271,7 +286,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
private final CLTask<? super C, R> task;
private final FinishAction mode;
- public TaskWrapper(CLTask<? super C, R> task, FinishAction mode) {
+ private TaskWrapper(CLTask<? super C, R> task, FinishAction mode) {
this.task = task;
this.mode = mode;
}
diff --git a/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java b/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java
new file mode 100644
index 00000000..d1d26824
--- /dev/null
+++ b/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java
@@ -0,0 +1,76 @@
+/*
+ * Created on Tuesday, July 05 2011 00:26
+ */
+package com.jogamp.opencl.util.concurrent;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@link CompletionService} for {@link CLTask}s executed in a {@link CLCommandQueuePool}.
+ * It simplifies asynchronous execution of tasks with the same result type in a potentially shared pool.
+ * @see CompletionService
+ * @author Michael Bien
+ */
+public class CLTaskCompletionService<C extends CLQueueContext, R> {
+
+ private final ExecutorCompletionService<R> service;
+ private final CLCommandQueuePool pool;
+
+ /**
+ * Creates an CLTaskCompletionService using the supplied pool for base
+ * task execution and a LinkedBlockingQueue with the capacity of {@link Integer#MAX_VALUE}
+ * as a completion queue.
+ */
+ public CLTaskCompletionService(CLCommandQueuePool<C> pool) {
+ this.service = new ExecutorCompletionService<R>(pool.getExcecutor());
+ this.pool = pool;
+ }
+
+ /**
+ * Creates an CLTaskCompletionService using the supplied pool for base
+ * task execution the supplied queue as its completion queue.
+ */
+ public CLTaskCompletionService(CLCommandQueuePool<C> pool, BlockingQueue queue) {
+ this.service = new ExecutorCompletionService<R>(pool.getExcecutor(), queue);
+ this.pool = pool;
+ }
+
+ /**
+ * Submits a CLTask for execution and returns a Future representing the pending
+ * results of the task. Upon completion, this task may be taken or polled.
+ * @see CompletionService#submit(java.util.concurrent.Callable)
+ */
+ public Future<R> submit(CLTask<? super C, R> task) {
+ return service.submit(pool.wrapTask(task));
+ }
+
+ /**
+ * Retrieves and removes the Future representing the next completed task, waiting if none are yet present.
+ * @see CompletionService#take()
+ */
+ public Future<R> take() throws InterruptedException {
+ return service.take();
+ }
+
+ /**
+ * Retrieves and removes the Future representing the next completed task or null if none are present.
+ * @see CompletionService#poll()
+ */
+ public Future<R> poll() {
+ return service.poll();
+ }
+
+ /**
+ * Retrieves and removes the Future representing the next completed task, waiting if necessary
+ * up to the specified wait time if none are yet present.
+ * @see CompletionService#poll(long, java.util.concurrent.TimeUnit)
+ */
+ public Future<R> poll(long timeout, TimeUnit unit) throws InterruptedException {
+ return service.poll(timeout, unit);
+ }
+
+}