diff options
author | Michael Bien <[email protected]> | 2011-07-05 23:39:46 +0200 |
---|---|---|
committer | Michael Bien <[email protected]> | 2011-07-05 23:39:46 +0200 |
commit | b7390e0694f747ac51b6ad26b05b1a2a11df0361 (patch) | |
tree | fea96f2b01d1fb319880b152f07c83c333f50f9a /src/com | |
parent | 3b5da206886d9c73cbed643733ceee71d9416178 (diff) |
- initial import of CLTaskCompletionService.
- fixed pool shutdown behavior. It will now wait till all started tasks finish to be able to release the queue context.
Diffstat (limited to 'src/com')
-rw-r--r-- | src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java | 29 | ||||
-rw-r--r-- | src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java | 76 |
2 files changed, 98 insertions, 7 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java index 31aeec47..e601cc7d 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java +++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java @@ -80,7 +80,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource * @see ExecutorService#submit(java.util.concurrent.Callable) */ public <R> Future<R> submit(CLTask<? super C, R> task) { - return excecutor.submit(new TaskWrapper(task, finishAction)); + return excecutor.submit(wrapTask(task)); } /** @@ -133,6 +133,10 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource return excecutor.invokeAny(wrapper, timeout, unit); } + <R> TaskWrapper<C, R> wrapTask(CLTask<? super C, R> task) { + return new TaskWrapper(task, finishAction); + } + private <R> List<TaskWrapper<C, R>> wrapTasks(Collection<? extends CLTask<? super C, R>> tasks) { List<TaskWrapper<C, R>> wrapper = new ArrayList<TaskWrapper<C, R>>(tasks.size()); for (CLTask<? super C, R> task : tasks) { @@ -178,7 +182,8 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource } /** - * Releases all queues. + * Releases the queue context, all queues including a shutdown of the internal threadpool. + * The call will block until all currently executing tasks have finished, no new tasks are started. */ @Override public void release() { @@ -186,13 +191,23 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource throw new RuntimeException(getClass().getSimpleName()+" already released"); } released = true; - excecutor.shutdown(); - for (CLQueueContext context : contexts) { - context.queue.finish().release(); - context.release(); + excecutor.shutdownNow(); + try { + excecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + }finally{ + for (CLQueueContext context : contexts) { + context.queue.finish().release(); + context.release(); + } } } + ExecutorService getExcecutor() { + return excecutor; + } + /** * Returns the command queues used in this pool. */ @@ -271,7 +286,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource private final CLTask<? super C, R> task; private final FinishAction mode; - public TaskWrapper(CLTask<? super C, R> task, FinishAction mode) { + private TaskWrapper(CLTask<? super C, R> task, FinishAction mode) { this.task = task; this.mode = mode; } diff --git a/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java b/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java new file mode 100644 index 00000000..d1d26824 --- /dev/null +++ b/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java @@ -0,0 +1,76 @@ +/* + * Created on Tuesday, July 05 2011 00:26 + */ +package com.jogamp.opencl.util.concurrent; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * A {@link CompletionService} for {@link CLTask}s executed in a {@link CLCommandQueuePool}. + * It simplifies asynchronous execution of tasks with the same result type in a potentially shared pool. + * @see CompletionService + * @author Michael Bien + */ +public class CLTaskCompletionService<C extends CLQueueContext, R> { + + private final ExecutorCompletionService<R> service; + private final CLCommandQueuePool pool; + + /** + * Creates an CLTaskCompletionService using the supplied pool for base + * task execution and a LinkedBlockingQueue with the capacity of {@link Integer#MAX_VALUE} + * as a completion queue. + */ + public CLTaskCompletionService(CLCommandQueuePool<C> pool) { + this.service = new ExecutorCompletionService<R>(pool.getExcecutor()); + this.pool = pool; + } + + /** + * Creates an CLTaskCompletionService using the supplied pool for base + * task execution the supplied queue as its completion queue. + */ + public CLTaskCompletionService(CLCommandQueuePool<C> pool, BlockingQueue queue) { + this.service = new ExecutorCompletionService<R>(pool.getExcecutor(), queue); + this.pool = pool; + } + + /** + * Submits a CLTask for execution and returns a Future representing the pending + * results of the task. Upon completion, this task may be taken or polled. + * @see CompletionService#submit(java.util.concurrent.Callable) + */ + public Future<R> submit(CLTask<? super C, R> task) { + return service.submit(pool.wrapTask(task)); + } + + /** + * Retrieves and removes the Future representing the next completed task, waiting if none are yet present. + * @see CompletionService#take() + */ + public Future<R> take() throws InterruptedException { + return service.take(); + } + + /** + * Retrieves and removes the Future representing the next completed task or null if none are present. + * @see CompletionService#poll() + */ + public Future<R> poll() { + return service.poll(); + } + + /** + * Retrieves and removes the Future representing the next completed task, waiting if necessary + * up to the specified wait time if none are yet present. + * @see CompletionService#poll(long, java.util.concurrent.TimeUnit) + */ + public Future<R> poll(long timeout, TimeUnit unit) throws InterruptedException { + return service.poll(timeout, unit); + } + +} |