diff options
author | Michael Bien <[email protected]> | 2011-07-05 23:39:46 +0200 |
---|---|---|
committer | Michael Bien <[email protected]> | 2011-07-05 23:39:46 +0200 |
commit | b7390e0694f747ac51b6ad26b05b1a2a11df0361 (patch) | |
tree | fea96f2b01d1fb319880b152f07c83c333f50f9a | |
parent | 3b5da206886d9c73cbed643733ceee71d9416178 (diff) |
- initial import of CLTaskCompletionService.
- fixed pool shutdown behavior. It will now wait till all started tasks finish to be able to release the queue context.
3 files changed, 160 insertions, 32 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java index 31aeec47..e601cc7d 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java +++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java @@ -80,7 +80,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource * @see ExecutorService#submit(java.util.concurrent.Callable) */ public <R> Future<R> submit(CLTask<? super C, R> task) { - return excecutor.submit(new TaskWrapper(task, finishAction)); + return excecutor.submit(wrapTask(task)); } /** @@ -133,6 +133,10 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource return excecutor.invokeAny(wrapper, timeout, unit); } + <R> TaskWrapper<C, R> wrapTask(CLTask<? super C, R> task) { + return new TaskWrapper(task, finishAction); + } + private <R> List<TaskWrapper<C, R>> wrapTasks(Collection<? extends CLTask<? super C, R>> tasks) { List<TaskWrapper<C, R>> wrapper = new ArrayList<TaskWrapper<C, R>>(tasks.size()); for (CLTask<? super C, R> task : tasks) { @@ -178,7 +182,8 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource } /** - * Releases all queues. + * Releases the queue context, all queues including a shutdown of the internal threadpool. + * The call will block until all currently executing tasks have finished, no new tasks are started. */ @Override public void release() { @@ -186,13 +191,23 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource throw new RuntimeException(getClass().getSimpleName()+" already released"); } released = true; - excecutor.shutdown(); - for (CLQueueContext context : contexts) { - context.queue.finish().release(); - context.release(); + excecutor.shutdownNow(); + try { + excecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + }finally{ + for (CLQueueContext context : contexts) { + context.queue.finish().release(); + context.release(); + } } } + ExecutorService getExcecutor() { + return excecutor; + } + /** * Returns the command queues used in this pool. */ @@ -271,7 +286,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource private final CLTask<? super C, R> task; private final FinishAction mode; - public TaskWrapper(CLTask<? super C, R> task, FinishAction mode) { + private TaskWrapper(CLTask<? super C, R> task, FinishAction mode) { this.task = task; this.mode = mode; } diff --git a/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java b/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java new file mode 100644 index 00000000..d1d26824 --- /dev/null +++ b/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java @@ -0,0 +1,76 @@ +/* + * Created on Tuesday, July 05 2011 00:26 + */ +package com.jogamp.opencl.util.concurrent; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * A {@link CompletionService} for {@link CLTask}s executed in a {@link CLCommandQueuePool}. + * It simplifies asynchronous execution of tasks with the same result type in a potentially shared pool. + * @see CompletionService + * @author Michael Bien + */ +public class CLTaskCompletionService<C extends CLQueueContext, R> { + + private final ExecutorCompletionService<R> service; + private final CLCommandQueuePool pool; + + /** + * Creates an CLTaskCompletionService using the supplied pool for base + * task execution and a LinkedBlockingQueue with the capacity of {@link Integer#MAX_VALUE} + * as a completion queue. + */ + public CLTaskCompletionService(CLCommandQueuePool<C> pool) { + this.service = new ExecutorCompletionService<R>(pool.getExcecutor()); + this.pool = pool; + } + + /** + * Creates an CLTaskCompletionService using the supplied pool for base + * task execution the supplied queue as its completion queue. + */ + public CLTaskCompletionService(CLCommandQueuePool<C> pool, BlockingQueue queue) { + this.service = new ExecutorCompletionService<R>(pool.getExcecutor(), queue); + this.pool = pool; + } + + /** + * Submits a CLTask for execution and returns a Future representing the pending + * results of the task. Upon completion, this task may be taken or polled. + * @see CompletionService#submit(java.util.concurrent.Callable) + */ + public Future<R> submit(CLTask<? super C, R> task) { + return service.submit(pool.wrapTask(task)); + } + + /** + * Retrieves and removes the Future representing the next completed task, waiting if none are yet present. + * @see CompletionService#take() + */ + public Future<R> take() throws InterruptedException { + return service.take(); + } + + /** + * Retrieves and removes the Future representing the next completed task or null if none are present. + * @see CompletionService#poll() + */ + public Future<R> poll() { + return service.poll(); + } + + /** + * Retrieves and removes the Future representing the next completed task, waiting if necessary + * up to the specified wait time if none are yet present. + * @see CompletionService#poll(long, java.util.concurrent.TimeUnit) + */ + public Future<R> poll(long timeout, TimeUnit unit) throws InterruptedException { + return service.poll(timeout, unit); + } + +} diff --git a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java index 68ca33f9..6e5d5e99 100644 --- a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java +++ b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java @@ -19,7 +19,6 @@ import org.junit.Rule; import org.junit.rules.MethodRule; import org.junit.rules.Timeout; import com.jogamp.opencl.util.CLMultiContext; -import java.nio.Buffer; import java.util.ArrayList; import java.util.List; import org.junit.Test; @@ -70,29 +69,36 @@ public class CLMultiContextTest { + " array[index]++; \n" + "} \n"; - private final class CLTestTask implements CLTask<CLSimpleQueueContext, Buffer> { + private final class CLTestTask implements CLTask<CLSimpleQueueContext, IntBuffer> { - private final Buffer data; + private final IntBuffer data; - public CLTestTask(Buffer buffer) { + public CLTestTask(IntBuffer buffer) { this.data = buffer; } @Override - public Buffer execute(CLSimpleQueueContext qc) { + public IntBuffer execute(CLSimpleQueueContext qc) { CLCommandQueue queue = qc.getQueue(); CLContext context = qc.getCLContext(); CLKernel kernel = qc.getKernel("compute"); - CLBuffer<Buffer> buffer = null; +// System.out.println(Thread.currentThread().getName()+" / "+queue); + assertFalse(qc.isReleased()); + assertFalse(queue.isReleased()); + assertFalse(context.isReleased()); + assertFalse(kernel.isReleased()); + + CLBuffer<IntBuffer> buffer = null; try{ + buffer = context.createBuffer(data); int gws = buffer.getCLCapacity(); kernel.putArg(buffer).putArg(gws).rewind(); - queue.putWriteBuffer(buffer, true); + queue.putWriteBuffer(buffer, false); queue.put1DRangeKernel(kernel, 0, gws, 0); queue.putReadBuffer(buffer, true); }finally{ @@ -106,6 +112,16 @@ public class CLMultiContextTest { } + private List<CLTestTask> createTasks(IntBuffer data, int taskCount, int slice) { + List<CLTestTask> tasks = new ArrayList<CLTestTask>(taskCount); + for (int i = 0; i < taskCount; i++) { + IntBuffer subBuffer = Buffers.slice(data, i*slice, slice); + assertEquals(slice, subBuffer.capacity()); + tasks.add(new CLTestTask(subBuffer)); + } + return tasks; + } + @Test public void commandQueuePoolTest() throws InterruptedException, ExecutionException { @@ -123,48 +139,62 @@ public class CLMultiContextTest { final int taskCount = pool.getSize() * tasksPerQueue; IntBuffer data = Buffers.newDirectIntBuffer(slice*taskCount); - - List<CLTestTask> tasks = new ArrayList<CLTestTask>(taskCount); - - for (int i = 0; i < taskCount; i++) { - IntBuffer subBuffer = Buffers.slice(data, i*slice, slice); - assertEquals(slice, subBuffer.capacity()); - tasks.add(new CLTestTask(subBuffer)); - } + List<CLTestTask> tasks = createTasks(data, taskCount, slice); out.println("invoking "+tasks.size()+" tasks on "+pool.getSize()+" queues"); // blocking invoke - List<Future<Buffer>> results = pool.invokeAll(tasks); + List<Future<IntBuffer>> results = pool.invokeAll(tasks); assertNotNull(results); checkBuffer(1, data); // submit blocking emediatly for (CLTestTask task : tasks) { - Buffer ret = pool.submit(task).get(); + IntBuffer ret = pool.submit(task).get(); assertNotNull(ret); + checkBuffer(2, ret); } checkBuffer(2, data); // submitAll using futures - List<Future<Buffer>> futures = pool.submitAll(tasks); - for (Future<Buffer> future : futures) { - Buffer ret = future.get(); + List<Future<IntBuffer>> futures = pool.submitAll(tasks); + for (Future<IntBuffer> future : futures) { + IntBuffer ret = future.get(); assertNotNull(ret); + checkBuffer(3, ret); } checkBuffer(3, data); // switching contexts using different program factory = CLQueueContextFactory.createSimple(programSource.replaceAll("\\+\\+", "--")); pool.switchContext(factory); - List<Future<Buffer>> results2 = pool.invokeAll(tasks); + List<Future<IntBuffer>> results2 = pool.invokeAll(tasks); assertNotNull(results2); checkBuffer(2, data); - + + // Note: we have to make sure that we don't resubmit old tasks at this point since + // we wait only for completion of a subset of tasks. // submit any - Buffer buffer = pool.invokeAny(tasks); - assertNotNull(buffer); - checkContains(1, data); + data = Buffers.newDirectIntBuffer(slice*taskCount); + tasks = createTasks(data, taskCount, slice); + + IntBuffer ret1 = pool.invokeAny(tasks); + assertNotNull(ret1); + checkBuffer(-1, ret1); + checkContains(-1, data); + + // completionservice take/any test + data = Buffers.newDirectIntBuffer(slice*taskCount); + tasks = createTasks(data, taskCount, slice); + + CLTaskCompletionService<CLSimpleQueueContext, IntBuffer> service = new CLTaskCompletionService(pool); + for (CLTestTask task : tasks) { + service.submit(task); + } + IntBuffer ret2 = service.take().get(); + assertNotNull(ret2); + checkBuffer(-1, ret2); + checkContains(-1, data); pool.release(); }finally{ @@ -189,4 +219,11 @@ public class CLMultiContextTest { fail(); } +// @Test + public void loadTest() throws InterruptedException, ExecutionException { + for (int i = 0; i < 40; i++) { + commandQueuePoolTest(); + } + } + } |