From b7390e0694f747ac51b6ad26b05b1a2a11df0361 Mon Sep 17 00:00:00 2001 From: Michael Bien Date: Tue, 5 Jul 2011 23:39:46 +0200 Subject: - initial import of CLTaskCompletionService. - fixed pool shutdown behavior. It will now wait till all started tasks finish to be able to release the queue context. --- .../opencl/util/concurrent/CLCommandQueuePool.java | 29 +++++++-- .../util/concurrent/CLTaskCompletionService.java | 76 ++++++++++++++++++++++ 2 files changed, 98 insertions(+), 7 deletions(-) create mode 100644 src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java (limited to 'src/com') diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java index 31aeec47..e601cc7d 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java +++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java @@ -80,7 +80,7 @@ public class CLCommandQueuePool implements CLResource * @see ExecutorService#submit(java.util.concurrent.Callable) */ public Future submit(CLTask task) { - return excecutor.submit(new TaskWrapper(task, finishAction)); + return excecutor.submit(wrapTask(task)); } /** @@ -133,6 +133,10 @@ public class CLCommandQueuePool implements CLResource return excecutor.invokeAny(wrapper, timeout, unit); } + TaskWrapper wrapTask(CLTask task) { + return new TaskWrapper(task, finishAction); + } + private List> wrapTasks(Collection> tasks) { List> wrapper = new ArrayList>(tasks.size()); for (CLTask task : tasks) { @@ -178,7 +182,8 @@ public class CLCommandQueuePool implements CLResource } /** - * Releases all queues. + * Releases the queue context, all queues including a shutdown of the internal threadpool. + * The call will block until all currently executing tasks have finished, no new tasks are started. */ @Override public void release() { @@ -186,13 +191,23 @@ public class CLCommandQueuePool implements CLResource throw new RuntimeException(getClass().getSimpleName()+" already released"); } released = true; - excecutor.shutdown(); - for (CLQueueContext context : contexts) { - context.queue.finish().release(); - context.release(); + excecutor.shutdownNow(); + try { + excecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + }finally{ + for (CLQueueContext context : contexts) { + context.queue.finish().release(); + context.release(); + } } } + ExecutorService getExcecutor() { + return excecutor; + } + /** * Returns the command queues used in this pool. */ @@ -271,7 +286,7 @@ public class CLCommandQueuePool implements CLResource private final CLTask task; private final FinishAction mode; - public TaskWrapper(CLTask task, FinishAction mode) { + private TaskWrapper(CLTask task, FinishAction mode) { this.task = task; this.mode = mode; } diff --git a/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java b/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java new file mode 100644 index 00000000..d1d26824 --- /dev/null +++ b/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java @@ -0,0 +1,76 @@ +/* + * Created on Tuesday, July 05 2011 00:26 + */ +package com.jogamp.opencl.util.concurrent; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * A {@link CompletionService} for {@link CLTask}s executed in a {@link CLCommandQueuePool}. + * It simplifies asynchronous execution of tasks with the same result type in a potentially shared pool. + * @see CompletionService + * @author Michael Bien + */ +public class CLTaskCompletionService { + + private final ExecutorCompletionService service; + private final CLCommandQueuePool pool; + + /** + * Creates an CLTaskCompletionService using the supplied pool for base + * task execution and a LinkedBlockingQueue with the capacity of {@link Integer#MAX_VALUE} + * as a completion queue. + */ + public CLTaskCompletionService(CLCommandQueuePool pool) { + this.service = new ExecutorCompletionService(pool.getExcecutor()); + this.pool = pool; + } + + /** + * Creates an CLTaskCompletionService using the supplied pool for base + * task execution the supplied queue as its completion queue. + */ + public CLTaskCompletionService(CLCommandQueuePool pool, BlockingQueue queue) { + this.service = new ExecutorCompletionService(pool.getExcecutor(), queue); + this.pool = pool; + } + + /** + * Submits a CLTask for execution and returns a Future representing the pending + * results of the task. Upon completion, this task may be taken or polled. + * @see CompletionService#submit(java.util.concurrent.Callable) + */ + public Future submit(CLTask task) { + return service.submit(pool.wrapTask(task)); + } + + /** + * Retrieves and removes the Future representing the next completed task, waiting if none are yet present. + * @see CompletionService#take() + */ + public Future take() throws InterruptedException { + return service.take(); + } + + /** + * Retrieves and removes the Future representing the next completed task or null if none are present. + * @see CompletionService#poll() + */ + public Future poll() { + return service.poll(); + } + + /** + * Retrieves and removes the Future representing the next completed task, waiting if necessary + * up to the specified wait time if none are yet present. + * @see CompletionService#poll(long, java.util.concurrent.TimeUnit) + */ + public Future poll(long timeout, TimeUnit unit) throws InterruptedException { + return service.poll(timeout, unit); + } + +} -- cgit v1.2.3