diff options
author | Michael Bien <[email protected]> | 2011-08-03 16:50:06 +0200 |
---|---|---|
committer | Michael Bien <[email protected]> | 2011-08-03 16:50:06 +0200 |
commit | b709c6156393cdb48106e8db8626cbfc332c0541 (patch) | |
tree | 65ecc151ff6de65c2169145f64ce61cbd8505cfb | |
parent | c40dfd633ef0f841851ed64c2817a425148378b9 (diff) |
refactoring; extracted CLAbstractExecutorService and CLPoolable.
6 files changed, 375 insertions, 256 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLAbstractExecutorService.java b/src/com/jogamp/opencl/util/concurrent/CLAbstractExecutorService.java new file mode 100644 index 00000000..843e08e0 --- /dev/null +++ b/src/com/jogamp/opencl/util/concurrent/CLAbstractExecutorService.java @@ -0,0 +1,273 @@ +/* + * Copyright (c) 2011, Michael Bien + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are + * permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this list of + * conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this list + * of conditions and the following disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND + * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* + * Created on Tuesday, August 02 2011 02:20 + */ +package com.jogamp.opencl.util.concurrent; + +import com.jogamp.opencl.CLCommandQueue; +import com.jogamp.opencl.CLResource; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Common superclass for Executor services supporting OpenCL driven tasks. + * @author Michael Bien + */ +public abstract class CLAbstractExecutorService implements CLResource { + + protected final ExecutorService excecutor; + protected final List<CLCommandQueue> queues; + + private FinishAction finishAction = FinishAction.DO_NOTHING; + private boolean released; + + protected CLAbstractExecutorService(ExecutorService executor, List<CLCommandQueue> queues) { + this.queues = queues; + this.excecutor = executor; + } + + + <R> TaskWrapper<R> wrapTask(CLPoolable<? extends CLQueueContext, R> task) { + return new TaskWrapper(task, finishAction); + } + + private <R> List<TaskWrapper<R>> wrapTasks(Collection<? extends CLPoolable<? extends CLQueueContext, R>> tasks) { + List<TaskWrapper<R>> wrapper = new ArrayList<TaskWrapper<R>>(tasks.size()); + for (CLPoolable<? extends CLQueueContext, R> task : tasks) { + if(task == null) { + throw new NullPointerException("at least one task was null"); + } + wrapper.add(new TaskWrapper<R>((CLPoolable<CLQueueContext, R>)task, finishAction)); + } + return wrapper; + } + + /** + * Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result. + * @see ExecutorService#invokeAll(java.util.Collection) + */ + public <R> List<Future<R>> invokeAll(Collection<? extends CLPoolable<? extends CLQueueContext, R>> tasks) throws InterruptedException { + List<TaskWrapper<R>> wrapper = wrapTasks(tasks); + return excecutor.invokeAll(wrapper); + } + + /** + * Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result. + * @see ExecutorService#invokeAll(java.util.Collection, long, java.util.concurrent.TimeUnit) + */ + public <R> List<Future<R>> invokeAll(Collection<? extends CLPoolable<? extends CLQueueContext, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException { + List<TaskWrapper<R>> wrapper = wrapTasks(tasks); + return excecutor.invokeAll(wrapper, timeout, unit); + } + + /** + * Submits all tasks for immediate execution (blocking) until a result can be returned. + * All other unfinished but started tasks are cancelled. + * @see ExecutorService#invokeAny(java.util.Collection) + */ + public <R> R invokeAny(Collection<? extends CLPoolable<? extends CLQueueContext, R>> tasks) throws InterruptedException, ExecutionException { + List<TaskWrapper<R>> wrapper = wrapTasks(tasks); + return excecutor.invokeAny(wrapper); + } + + /** + * Submits all tasks for immediate execution (blocking) until a result can be returned. + * All other unfinished but started tasks are cancelled. + * @see ExecutorService#invokeAny(java.util.Collection, long, java.util.concurrent.TimeUnit) + */ + public <R> R invokeAny(Collection<? extends CLPoolable<? super CLQueueContext, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + List<TaskWrapper<R>> wrapper = wrapTasks(tasks); + return excecutor.invokeAny(wrapper, timeout, unit); + } + + /** + * Submits this task to the pool for execution returning its {@link Future}. + * @see ExecutorService#submit(java.util.concurrent.Callable) + */ + public <R> Future<R> submit(CLPoolable<? extends CLQueueContext, R> task) { + return excecutor.submit(wrapTask(task)); + } + + /** + * Submits all tasks to the pool for execution and returns their {@link Future}. + * Calls {@link #submit(com.jogamp.opencl.util.concurrent.CLPoolable)} for every task. + */ + public <R> List<Future<R>> submitAll(Collection<? extends CLPoolable<? extends CLQueueContext, R>> tasks) { + List<Future<R>> futures = new ArrayList<Future<R>>(tasks.size()); + for (CLPoolable<? extends CLQueueContext, R> task : tasks) { + futures.add(submit(task)); + } + return futures; + } + + /** + * Calls {@link CLCommandQueue#flush()} on all queues. + */ + public void flushQueues() { + for (CLCommandQueue queue : queues) { + queue.flush(); + } + } + + /** + * Calls {@link CLCommandQueue#finish()} on all queues. + */ + public void finishQueues() { + for (CLCommandQueue queue : queues) { + queue.finish(); + } + } + + /** + * Returns the command queues used in this pool. + */ + public List<CLCommandQueue> getQueues() { + return Collections.unmodifiableList(queues); + } + + /** + * Returns the size of this pool (number of command queues). + */ + public int getPoolSize() { + return queues.size(); + } + /** + * Returns the action which is executed when a task finishes. + */ + public FinishAction getFinishAction() { + return finishAction; + } + + ExecutorService getExcecutor() { + return excecutor; + } + + @Override + public boolean isReleased() { + return released; + } + + /** + * Sets the action which is run after every completed task. + * This is mainly intended for debugging, default value is {@link FinishAction#DO_NOTHING}. + */ + public void setFinishAction(FinishAction action) { + this.finishAction = action; + } + + /** + * Releases the queue context, all queues including a shutdown of the internal threadpool. + * The call will block until all currently executing tasks have finished, no new tasks are started. + */ + @Override + public void release() { + if (released) { + throw new RuntimeException(getClass().getSimpleName() + " already released"); + } + released = true; + excecutor.shutdownNow(); // threads will cleanup CL resources on exit + try { + excecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + + protected static interface CommandQueueThread { + + Map<Object, CLQueueContext> getContextMap(); + + CLCommandQueue getQueue(); + } + + protected static class TaskWrapper<R> implements Callable<R> { + + protected final CLPoolable<CLQueueContext, R> task; + private final FinishAction mode; + + private TaskWrapper(CLPoolable<CLQueueContext, R> task, FinishAction mode) { + this.task = task; + this.mode = mode; + } + + @Override + public R call() throws Exception { + + CommandQueueThread thread = (CommandQueueThread)Thread.currentThread(); + + final Object key = task.getContextKey(); + + CLQueueContext context = thread.getContextMap().get(key); + if(context == null) { + context = task.createQueueContext(thread.getQueue()); + thread.getContextMap().put(key, context); + } + + R result = task.execute(context); + if(mode.equals(FinishAction.FLUSH)) { + context.queue.flush(); + }else if(mode.equals(FinishAction.FINISH)) { + context.queue.finish(); + } + return result; + } + + } + + /** + * The action executed after a task completes. + */ + public enum FinishAction { + + /** + * Does nothing, the task is responsible to make sure all computations + * have finished when the task finishes + */ + DO_NOTHING, + + /** + * Flushes the queue on task completion. + */ + FLUSH, + + /** + * Finishes the queue on task completion. + */ + FINISH + } + +} diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java index beafcc93..f6fadd66 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java +++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java @@ -31,26 +31,21 @@ package com.jogamp.opencl.util.concurrent; import com.jogamp.opencl.CLCommandQueue; import com.jogamp.opencl.CLDevice; -import com.jogamp.opencl.CLResource; import com.jogamp.opencl.util.CLMultiContext; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** * A multithreaded, fixed size pool of OpenCL command queues. @@ -59,23 +54,11 @@ import java.util.concurrent.TimeoutException; * instead of {@link Callable}s and provides a per-queue context for resource sharing across all tasks of one queue. * @author Michael Bien */ -public class CLCommandQueuePool implements CLResource { +public class CLCommandQueuePool extends CLAbstractExecutorService { - private ThreadPoolExecutor excecutor; - private FinishAction finishAction = FinishAction.DO_NOTHING; - private boolean released; - private final List<CLCommandQueue> queues; - private CLCommandQueuePool(Collection<CLCommandQueue> queues) { - this.queues = new ArrayList<CLCommandQueue>(queues); - initExecutor(); - } - - private void initExecutor() { - BlockingQueue<Runnable> queue = new LinkedBlockingDeque<Runnable>(); - QueueThreadFactory factory = new QueueThreadFactory(queues); - int size = queues.size(); - this.excecutor = new CLThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, queue, factory); + private CLCommandQueuePool(ExecutorService executor, List<CLCommandQueue> queues) { + super(executor, queues); } public static CLCommandQueuePool create(CLMultiContext mc, CLCommandQueue.Mode... modes) { @@ -91,145 +74,19 @@ public class CLCommandQueuePool implements CLResource { } public static CLCommandQueuePool create(Collection<CLCommandQueue> queues) { - return new CLCommandQueuePool(queues); - } - - /** - * Submits this task to the pool for execution returning its {@link Future}. - * @see ExecutorService#submit(java.util.concurrent.Callable) - */ - public <R> Future<R> submit(CLTask<? extends CLQueueContext, R> task) { - return excecutor.submit(wrapTask(task)); - } - - /** - * Submits all tasks to the pool for execution and returns their {@link Future}. - * Calls {@link #submit(com.jogamp.opencl.util.concurrent.CLTask)} for every task. - */ - public <R> List<Future<R>> submitAll(Collection<? extends CLTask<? extends CLQueueContext, R>> tasks) { - List<Future<R>> futures = new ArrayList<Future<R>>(tasks.size()); - for (CLTask<? extends CLQueueContext, R> task : tasks) { - futures.add(submit(task)); - } - return futures; - } - - /** - * Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result. - * @see ExecutorService#invokeAll(java.util.Collection) - */ - public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? extends CLQueueContext, R>> tasks) throws InterruptedException { - List<TaskWrapper<R>> wrapper = wrapTasks(tasks); - return excecutor.invokeAll(wrapper); - } - - /** - * Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result. - * @see ExecutorService#invokeAll(java.util.Collection, long, java.util.concurrent.TimeUnit) - */ - public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? extends CLQueueContext, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException { - List<TaskWrapper<R>> wrapper = wrapTasks(tasks); - return excecutor.invokeAll(wrapper, timeout, unit); - } - - /** - * Submits all tasks for immediate execution (blocking) until a result can be returned. - * All other unfinished but started tasks are cancelled. - * @see ExecutorService#invokeAny(java.util.Collection) - */ - public <R> R invokeAny(Collection<? extends CLTask<? extends CLQueueContext, R>> tasks) throws InterruptedException, ExecutionException { - List<TaskWrapper<R>> wrapper = wrapTasks(tasks); - return excecutor.invokeAny(wrapper); - } - - /*public*/ CLTask<? extends CLQueueContext, ?> takeCLTask() throws InterruptedException { - return ((CLFutureTask<?>)excecutor.getQueue().take()).getCLTask(); - } - - /** - * Submits all tasks for immediate execution (blocking) until a result can be returned. - * All other unfinished but started tasks are cancelled. - * @see ExecutorService#invokeAny(java.util.Collection, long, java.util.concurrent.TimeUnit) - */ - public <R> R invokeAny(Collection<? extends CLTask<? super CLQueueContext, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - List<TaskWrapper<R>> wrapper = wrapTasks(tasks); - return excecutor.invokeAny(wrapper, timeout, unit); - } - - <R> TaskWrapper<R> wrapTask(CLTask<? extends CLQueueContext, R> task) { - return new TaskWrapper(task, finishAction); - } - - private <R> List<TaskWrapper<R>> wrapTasks(Collection<? extends CLTask<? extends CLQueueContext, R>> tasks) { - List<TaskWrapper<R>> wrapper = new ArrayList<TaskWrapper<R>>(tasks.size()); - for (CLTask<? extends CLQueueContext, R> task : tasks) { - if(task == null) { - throw new NullPointerException("at least one task was null"); - } - wrapper.add(new TaskWrapper<R>((CLTask<CLQueueContext, R>)task, finishAction)); - } - return wrapper; - } - - /** - * Calls {@link CLCommandQueue#flush()} on all queues. - */ - public void flushQueues() { - for (CLCommandQueue queue : queues) { - queue.flush(); - } - } - - /** - * Calls {@link CLCommandQueue#finish()} on all queues. - */ - public void finishQueues() { - for (CLCommandQueue queue : queues) { - queue.finish(); - } - } - - /** - * Releases the queue context, all queues including a shutdown of the internal threadpool. - * The call will block until all currently executing tasks have finished, no new tasks are started. - */ - @Override - public void release() { - if(released) { - throw new RuntimeException(getClass().getSimpleName()+" already released"); - } - released = true; - excecutor.shutdownNow(); // threads will cleanup CL resources on exit - try { - excecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); - } catch (InterruptedException ex) { - throw new RuntimeException(ex); - } - } - - ExecutorService getExcecutor() { - return excecutor; - } + + List<CLCommandQueue> list = new ArrayList<CLCommandQueue>(queues); - /** - * Returns the command queues used in this pool. - */ - public List<CLCommandQueue> getQueues() { - return Collections.unmodifiableList(queues); - } + BlockingQueue<Runnable> queue = new LinkedBlockingDeque<Runnable>(); + CommandQueuePoolThreadFactory factory = new CommandQueuePoolThreadFactory(list); + int size = list.size(); - /** - * Returns the size of this pool (number of command queues). - */ - public int getPoolSize() { - return queues.size(); + ExecutorService executor = new CLThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, queue, factory); + return new CLCommandQueuePool(executor, list); } - /** - * Returns the action which is executed when a task finishes. - */ - public FinishAction getFinishAction() { - return finishAction; + /*public*/ CLPoolable<? extends CLQueueContext, ?> takeCLTask() throws InterruptedException { + return ((CLFutureTask<?>)getExcecutor().getQueue().take()).getCLPoolable(); } /** @@ -238,7 +95,7 @@ public class CLCommandQueuePool implements CLResource { * value is only an approximation. */ public long getTaskCount() { - return excecutor.getTaskCount(); + return getExcecutor().getTaskCount(); } /** @@ -247,40 +104,32 @@ public class CLCommandQueuePool implements CLResource { * the returned value is only an approximation, but one that does not ever decrease across successive calls. */ public long getCompletedTaskCount() { - return excecutor.getCompletedTaskCount(); + return getExcecutor().getCompletedTaskCount(); } /** * Returns the approximate number of queues that are actively executing tasks. */ public int getActiveCount() { - return excecutor.getActiveCount(); + return getExcecutor().getActiveCount(); } @Override - public boolean isReleased() { - return released; - } - - /** - * Sets the action which is run after every completed task. - * This is mainly intended for debugging, default value is {@link FinishAction#DO_NOTHING}. - */ - public void setFinishAction(FinishAction action) { - this.finishAction = action; + ThreadPoolExecutor getExcecutor() { + return (ThreadPoolExecutor) excecutor; } @Override public String toString() { - return getClass().getSimpleName()+" [queues: "+getPoolSize()+" on finish: "+finishAction+"]"; + return getClass().getSimpleName()+" [queues: "+getPoolSize()+" on finish: "+getFinishAction()+"]"; } - private static class QueueThreadFactory implements ThreadFactory { + private static class CommandQueuePoolThreadFactory implements ThreadFactory { private final List<CLCommandQueue> queues; private int index; - private QueueThreadFactory(List<CLCommandQueue> queues) { + public CommandQueuePoolThreadFactory(List<CLCommandQueue> queues) { this.queues = queues; this.index = 0; } @@ -292,20 +141,20 @@ public class CLCommandQueuePool implements CLResource { ThreadGroup group = (sm != null) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup(); CLCommandQueue queue = queues.get(index); - QueueThread thread = new QueueThread(group, runnable, queue, index++); + Thread thread = new CommandQueuePoolThread(group, runnable, queue, index++); thread.setDaemon(true); - + return thread; } } - - private static class QueueThread extends Thread { + + private static class CommandQueuePoolThread extends Thread implements CommandQueueThread { private final CLCommandQueue queue; private final Map<Object, CLQueueContext> contextMap; - public QueueThread(ThreadGroup group, Runnable runnable, CLCommandQueue queue, int index) { + public CommandQueuePoolThread(ThreadGroup group, Runnable runnable, CLCommandQueue queue, int index) { super(group, runnable, "queue-worker-thread-"+index+"["+queue+"]"); this.queue = queue; this.contextMap = new HashMap<Object, CLQueueContext>(); @@ -321,38 +170,14 @@ public class CLCommandQueuePool implements CLResource { } } - } - - private static class TaskWrapper<R> implements Callable<R> { - - private final CLTask<CLQueueContext, R> task; - private final FinishAction mode; - - private TaskWrapper(CLTask<CLQueueContext, R> task, FinishAction mode) { - this.task = task; - this.mode = mode; + @Override + public CLCommandQueue getQueue() { + return queue; } @Override - public R call() throws Exception { - - QueueThread thread = (QueueThread)Thread.currentThread(); - - final Object key = task.getContextKey(); - - CLQueueContext context = thread.contextMap.get(key); - if(context == null) { - context = task.createQueueContext(thread.queue); - thread.contextMap.put(key, context); - } - - R result = task.execute(context); - if(mode.equals(FinishAction.FLUSH)) { - context.queue.flush(); - }else if(mode.equals(FinishAction.FINISH)) { - context.queue.finish(); - } - return result; + public Map<Object, CLQueueContext> getContextMap() { + return contextMap; } } @@ -366,12 +191,12 @@ public class CLCommandQueuePool implements CLResource { this.wrapper = wrapper; } - public CLTask<? extends CLQueueContext, R> getCLTask() { + public CLPoolable<? extends CLQueueContext, R> getCLPoolable() { return wrapper.task; } } - + private static class CLThreadPoolExecutor extends ThreadPoolExecutor { public CLThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { @@ -386,26 +211,4 @@ public class CLCommandQueuePool implements CLResource { } - /** - * The action executed after a task completes. - */ - public enum FinishAction { - - /** - * Does nothing, the task is responsible to make sure all computations - * have finished when the task finishes - */ - DO_NOTHING, - - /** - * Flushes the queue on task completion. - */ - FLUSH, - - /** - * Finishes the queue on task completion. - */ - FINISH - } - } diff --git a/src/com/jogamp/opencl/util/concurrent/CLPoolable.java b/src/com/jogamp/opencl/util/concurrent/CLPoolable.java new file mode 100644 index 00000000..6efd8dee --- /dev/null +++ b/src/com/jogamp/opencl/util/concurrent/CLPoolable.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2011, Michael Bien + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are + * permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this list of + * conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this list + * of conditions and the following disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND + * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* + * Created on Tuesday, August 02 2011 21:51 + */ +package com.jogamp.opencl.util.concurrent; + +import com.jogamp.opencl.CLCommandQueue; + +/** + * A OpenCL task designed to be executed in a managed environment. + * @author Michael Bien + */ +public interface CLPoolable<C extends CLQueueContext, R> { + + /** + * Creates a CLQueueContext for this task. A context may contain static resources + * like OpenCL program binaries or pre allocated buffers. A context can be used by an group + * of tasks identified by a common context key ({@link #getContextKey()}). This method + * won't be called if a context was already created by an previously executed task having the + * same context key. + */ + C createQueueContext(CLCommandQueue queue); + + /** + * Runs the task on a queue and returns a result. + */ + R execute(C context); + + /** + * Returns the context key for this task. + */ + Object getContextKey(); + +} diff --git a/src/com/jogamp/opencl/util/concurrent/CLTask.java b/src/com/jogamp/opencl/util/concurrent/CLTask.java index 287dc32c..25f9fd4c 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLTask.java +++ b/src/com/jogamp/opencl/util/concurrent/CLTask.java @@ -29,35 +29,19 @@ */ package com.jogamp.opencl.util.concurrent; -import com.jogamp.opencl.CLCommandQueue; - /** * A task executed on a command queue. * @author Michael Bien */ -public abstract class CLTask<C extends CLQueueContext, R> { - - - /** - * Creates a CLQueueContext for this task. A context may contain static resources - * like OpenCL program binaries or pre allocated buffers. A context can be used by an group - * of tasks identified by a common context key ({@link #getContextKey()}). This method - * won't be called if a context was already created by an previously executed task having the - * same context key. - */ - public abstract C createQueueContext(CLCommandQueue queue); +public abstract class CLTask<C extends CLQueueContext, R> implements CLPoolable<C, R> { /** * Returns the context key for this task. Default implementation returns {@link #getClass()}. */ + @Override public Object getContextKey() { return getClass(); } - /** - * Runs the task on a queue and returns a result. - */ - public abstract R execute(C context); - } diff --git a/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java b/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java index fe0b7e1c..815ffc8b 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java +++ b/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java @@ -44,25 +44,25 @@ import java.util.concurrent.TimeUnit; public class CLTaskCompletionService<R> { private final ExecutorCompletionService<R> service; - private final CLCommandQueuePool pool; + private final CLAbstractExecutorService executor; /** * Creates an CLTaskCompletionService using the supplied pool for base * task execution and a LinkedBlockingQueue with the capacity of {@link Integer#MAX_VALUE} * as a completion queue. */ - public CLTaskCompletionService(CLCommandQueuePool pool) { - this.service = new ExecutorCompletionService<R>(pool.getExcecutor()); - this.pool = pool; + public CLTaskCompletionService(CLAbstractExecutorService executor) { + this.service = new ExecutorCompletionService<R>(executor.getExcecutor()); + this.executor = executor; } /** * Creates an CLTaskCompletionService using the supplied pool for base * task execution the supplied queue as its completion queue. */ - public CLTaskCompletionService(CLCommandQueuePool pool, BlockingQueue<Future<R>> queue) { + public CLTaskCompletionService(CLAbstractExecutorService pool, BlockingQueue<Future<R>> queue) { this.service = new ExecutorCompletionService<R>(pool.getExcecutor(), queue); - this.pool = pool; + this.executor = pool; } /** @@ -70,8 +70,8 @@ public class CLTaskCompletionService<R> { * results of the task. Upon completion, this task may be taken or polled. * @see CompletionService#submit(java.util.concurrent.Callable) */ - public Future<R> submit(CLTask<? extends CLQueueContext, R> task) { - return service.submit(pool.wrapTask(task)); + public Future<R> submit(CLPoolable<? extends CLQueueContext, R> task) { + return service.submit(executor.wrapTask(task)); } /** diff --git a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java index 9690fcd1..a76f86fc 100644 --- a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java +++ b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java @@ -223,7 +223,7 @@ public class CLMultiContextTest { data = Buffers.newDirectIntBuffer(slice*taskCount); tasks = createTasks(decrementProgramSource, data, taskCount, slice); - CLTaskCompletionService<IntBuffer> service = new CLTaskCompletionService(pool); + CLTaskCompletionService<IntBuffer> service = new CLTaskCompletionService<IntBuffer>(pool); for (CLTestTask task : tasks) { service.submit(task); } |