diff options
author | Michael Bien <[email protected]> | 2011-07-11 19:36:33 +0200 |
---|---|---|
committer | Michael Bien <[email protected]> | 2011-07-11 19:36:33 +0200 |
commit | 6bd00879eec56c2753d84708f551557a2684904b (patch) | |
tree | af16bbc18053dade601f202b85c6efbb08f6a0a7 | |
parent | 29deee58472b1c475955718db7b1246fbb1df9d6 (diff) |
redesigned CLCommandQueuePool.
6 files changed, 147 insertions, 174 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java index eac3dc13..12bfba82 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java +++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java @@ -9,7 +9,10 @@ import com.jogamp.opencl.CLResource; import com.jogamp.opencl.util.CLMultiContext; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -30,63 +33,46 @@ import java.util.concurrent.TimeoutException; * instead of {@link Callable}s and provides a per-queue context for resource sharing across all tasks of one queue. * @author Michael Bien */ -public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource { +public class CLCommandQueuePool implements CLResource { - private List<CLQueueContext> contexts; private ThreadPoolExecutor excecutor; private FinishAction finishAction = FinishAction.DO_NOTHING; private boolean released; + private final List<CLCommandQueue> queues; - private CLCommandQueuePool(CLQueueContextFactory factory, Collection<CLCommandQueue> queues) { - this.contexts = initContexts(queues, factory); + private CLCommandQueuePool(Collection<CLCommandQueue> queues) { + this.queues = new ArrayList<CLCommandQueue>(queues); initExecutor(); } - private List<CLQueueContext> initContexts(Collection<CLCommandQueue> queues, CLQueueContextFactory factory) { - List<CLQueueContext> newContexts = new ArrayList<CLQueueContext>(queues.size()); - - int index = 0; - for (CLCommandQueue queue : queues) { - - CLQueueContext old = null; - if(this.contexts != null && !this.contexts.isEmpty()) { - old = this.contexts.get(index++); - old.release(); - } - - newContexts.add(factory.setup(queue, old)); - } - return newContexts; - } - private void initExecutor() { BlockingQueue<Runnable> queue = new LinkedBlockingDeque<Runnable>(); - QueueThreadFactory factory = new QueueThreadFactory(contexts); - int size = contexts.size(); + QueueThreadFactory factory = new QueueThreadFactory(queues); + int size = queues.size(); this.excecutor = new CLThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, queue, factory); } - public static <C extends CLQueueContext> CLCommandQueuePool<C> create(CLQueueContextFactory<C> factory, CLMultiContext mc, CLCommandQueue.Mode... modes) { - return create(factory, mc.getDevices(), modes); + public static CLCommandQueuePool create(CLMultiContext mc, CLCommandQueue.Mode... modes) { + return create(mc.getDevices(), modes); } - public static <C extends CLQueueContext> CLCommandQueuePool<C> create(CLQueueContextFactory<C> factory, Collection<CLDevice> devices, CLCommandQueue.Mode... modes) { + public static CLCommandQueuePool create(Collection<CLDevice> devices, CLCommandQueue.Mode... modes) { List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(devices.size()); for (CLDevice device : devices) { queues.add(device.createCommandQueue(modes)); } - return create(factory, queues); + return create(queues); } - public static <C extends CLQueueContext> CLCommandQueuePool create(CLQueueContextFactory<C> factory, Collection<CLCommandQueue> queues) { - return new CLCommandQueuePool(factory, queues); + public static CLCommandQueuePool create(Collection<CLCommandQueue> queues) { + return new CLCommandQueuePool(queues); } /** * Submits this task to the pool for execution returning its {@link Future}. * @see ExecutorService#submit(java.util.concurrent.Callable) */ - public <R> Future<R> submit(CLTask<? super C, R> task) { + public <R> Future<R> submit(CLTask<? extends CLQueueContext, R> task) { return excecutor.submit(wrapTask(task)); } @@ -94,9 +80,9 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource * Submits all tasks to the pool for execution and returns their {@link Future}. * Calls {@link #submit(com.jogamp.opencl.util.concurrent.CLTask)} for every task. */ - public <R> List<Future<R>> submitAll(Collection<? extends CLTask<? super C, R>> tasks) { + public <R> List<Future<R>> submitAll(Collection<? extends CLTask<? extends CLQueueContext, R>> tasks) { List<Future<R>> futures = new ArrayList<Future<R>>(tasks.size()); - for (CLTask<? super C, R> task : tasks) { + for (CLTask<? extends CLQueueContext, R> task : tasks) { futures.add(submit(task)); } return futures; @@ -106,8 +92,8 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource * Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result. * @see ExecutorService#invokeAll(java.util.Collection) */ - public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? super C, R>> tasks) throws InterruptedException { - List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks); + public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? extends CLQueueContext, R>> tasks) throws InterruptedException { + List<TaskWrapper<R>> wrapper = wrapTasks(tasks); return excecutor.invokeAll(wrapper); } @@ -115,8 +101,8 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource * Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result. * @see ExecutorService#invokeAll(java.util.Collection, long, java.util.concurrent.TimeUnit) */ - public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? super C, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException { - List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks); + public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? extends CLQueueContext, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException { + List<TaskWrapper<R>> wrapper = wrapTasks(tasks); return excecutor.invokeAll(wrapper, timeout, unit); } @@ -125,13 +111,13 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource * All other unfinished but started tasks are cancelled. * @see ExecutorService#invokeAny(java.util.Collection) */ - public <R> R invokeAny(Collection<? extends CLTask<? super C, R>> tasks) throws InterruptedException, ExecutionException { - List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks); + public <R> R invokeAny(Collection<? extends CLTask<? extends CLQueueContext, R>> tasks) throws InterruptedException, ExecutionException { + List<TaskWrapper<R>> wrapper = wrapTasks(tasks); return excecutor.invokeAny(wrapper); } - /*public*/ CLTask<? super C, ?> takeCLTask() throws InterruptedException { - return ((CLFutureTask<? super C, ?>)excecutor.getQueue().take()).getCLTask(); + /*public*/ CLTask<? extends CLQueueContext, ?> takeCLTask() throws InterruptedException { + return ((CLFutureTask<?>)excecutor.getQueue().take()).getCLTask(); } /** @@ -139,47 +125,32 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource * All other unfinished but started tasks are cancelled. * @see ExecutorService#invokeAny(java.util.Collection, long, java.util.concurrent.TimeUnit) */ - public <R> R invokeAny(Collection<? extends CLTask<? super C, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks); + public <R> R invokeAny(Collection<? extends CLTask<? super CLQueueContext, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + List<TaskWrapper<R>> wrapper = wrapTasks(tasks); return excecutor.invokeAny(wrapper, timeout, unit); } - <R> TaskWrapper<C, R> wrapTask(CLTask<? super C, R> task) { + <R> TaskWrapper<R> wrapTask(CLTask<? extends CLQueueContext, R> task) { return new TaskWrapper(task, finishAction); } - private <R> List<TaskWrapper<C, R>> wrapTasks(Collection<? extends CLTask<? super C, R>> tasks) { - List<TaskWrapper<C, R>> wrapper = new ArrayList<TaskWrapper<C, R>>(tasks.size()); - for (CLTask<? super C, R> task : tasks) { + private <R> List<TaskWrapper<R>> wrapTasks(Collection<? extends CLTask<? extends CLQueueContext, R>> tasks) { + List<TaskWrapper<R>> wrapper = new ArrayList<TaskWrapper<R>>(tasks.size()); + for (CLTask<? extends CLQueueContext, R> task : tasks) { if(task == null) { throw new NullPointerException("at least one task was null"); } - wrapper.add(new TaskWrapper<C, R>(task, finishAction)); + wrapper.add(new TaskWrapper<R>((CLTask<CLQueueContext, R>)task, finishAction)); } return wrapper; } - - /** - * Switches the context of all queues - this operation can be expensive. - * Blocks until all tasks finish and sets up a new context for all queues. - * @return this - */ - public <C extends CLQueueContext> CLCommandQueuePool switchContext(CLQueueContextFactory<C> factory) { - - excecutor.shutdown(); - finishQueues(); // just to be sure - - contexts = initContexts(getQueues(), factory); - initExecutor(); - return this; - } /** * Calls {@link CLCommandQueue#flush()} on all queues. */ public void flushQueues() { - for (CLQueueContext context : contexts) { - context.queue.flush(); + for (CLCommandQueue queue : queues) { + queue.flush(); } } @@ -187,8 +158,8 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource * Calls {@link CLCommandQueue#finish()} on all queues. */ public void finishQueues() { - for (CLQueueContext context : contexts) { - context.queue.finish(); + for (CLCommandQueue queue : queues) { + queue.finish(); } } @@ -202,16 +173,11 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource throw new RuntimeException(getClass().getSimpleName()+" already released"); } released = true; - excecutor.shutdownNow(); + excecutor.shutdownNow(); // threads will cleanup CL resources on exit try { excecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); } catch (InterruptedException ex) { throw new RuntimeException(ex); - }finally{ - for (CLQueueContext context : contexts) { - context.queue.finish().release(); - context.release(); - } } } @@ -223,18 +189,14 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource * Returns the command queues used in this pool. */ public List<CLCommandQueue> getQueues() { - List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(contexts.size()); - for (CLQueueContext context : contexts) { - queues.add(context.queue); - } - return queues; + return Collections.unmodifiableList(queues); } /** * Returns the size of this pool (number of command queues). */ public int getPoolSize() { - return contexts.size(); + return queues.size(); } /** @@ -284,16 +246,16 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource @Override public String toString() { - return getClass().getSimpleName()+" [queues: "+contexts.size()+" on finish: "+finishAction+"]"; + return getClass().getSimpleName()+" [queues: "+getPoolSize()+" on finish: "+finishAction+"]"; } private static class QueueThreadFactory implements ThreadFactory { - private final List<CLQueueContext> context; + private final List<CLCommandQueue> queues; private int index; - private QueueThreadFactory(List<CLQueueContext> queues) { - this.context = queues; + private QueueThreadFactory(List<CLCommandQueue> queues) { + this.queues = queues; this.index = 0; } @@ -303,7 +265,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource SecurityManager sm = System.getSecurityManager(); ThreadGroup group = (sm != null) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup(); - CLQueueContext queue = context.get(index); + CLCommandQueue queue = queues.get(index); QueueThread thread = new QueueThread(group, runnable, queue, index++); thread.setDaemon(true); @@ -313,27 +275,52 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource } private static class QueueThread extends Thread { - private final CLQueueContext context; - public QueueThread(ThreadGroup group, Runnable runnable, CLQueueContext context, int index) { - super(group, runnable, "queue-worker-thread-"+index+"["+context+"]"); - this.context = context; + + private final CLCommandQueue queue; + private final Map<Object, CLQueueContext> contextMap; + + public QueueThread(ThreadGroup group, Runnable runnable, CLCommandQueue queue, int index) { + super(group, runnable, "queue-worker-thread-"+index+"["+queue+"]"); + this.queue = queue; + this.contextMap = new HashMap<Object, CLQueueContext>(); + } + + @Override + public void run() { + super.run(); + //release threadlocal contexts + queue.finish(); + for (CLQueueContext context : contextMap.values()) { + context.release(); + } } + } - private static class TaskWrapper<C extends CLQueueContext, R> implements Callable<R> { + private static class TaskWrapper<R> implements Callable<R> { - private final CLTask<? super C, R> task; + private final CLTask<CLQueueContext, R> task; private final FinishAction mode; - private TaskWrapper(CLTask<? super C, R> task, FinishAction mode) { + private TaskWrapper(CLTask<CLQueueContext, R> task, FinishAction mode) { this.task = task; this.mode = mode; } @Override public R call() throws Exception { - CLQueueContext context = ((QueueThread)Thread.currentThread()).context; - R result = task.execute((C)context); + + QueueThread thread = (QueueThread)Thread.currentThread(); + + final Object key = task.getContextKey(); + + CLQueueContext context = thread.contextMap.get(key); + if(context == null) { + context = task.createQueueContext(thread.queue); + thread.contextMap.put(key, context); + } + + R result = task.execute(context); if(mode.equals(FinishAction.FLUSH)) { context.queue.flush(); }else if(mode.equals(FinishAction.FINISH)) { @@ -344,16 +331,16 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource } - private static class CLFutureTask<C extends CLQueueContext, R> extends FutureTask<R> { + private static class CLFutureTask<R> extends FutureTask<R> { - private final TaskWrapper<C, R> wrapper; + private final TaskWrapper<R> wrapper; - public CLFutureTask(TaskWrapper<C, R> wrapper) { + public CLFutureTask(TaskWrapper<R> wrapper) { super(wrapper); this.wrapper = wrapper; } - public CLTask<? super C, R> getCLTask() { + public CLTask<? extends CLQueueContext, R> getCLTask() { return wrapper.task; } @@ -366,9 +353,9 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource } @Override - protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { - TaskWrapper<CLQueueContext, T> wrapper = (TaskWrapper<CLQueueContext, T>)callable; - return new CLFutureTask<CLQueueContext, T>(wrapper); + protected <R> RunnableFuture<R> newTaskFor(Callable<R> callable) { + TaskWrapper<R> wrapper = (TaskWrapper<R>)callable; + return new CLFutureTask<R>(wrapper); } } diff --git a/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java b/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java index 9f92b9a3..93c2d226 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java +++ b/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java @@ -40,17 +40,21 @@ public abstract class CLQueueContext implements CLResource { * A simple queue context holding a precompiled program and its kernels. * @author Michael Bien */ - public static class CLSimpleQueueContext extends CLQueueContext { + public static class CLSingleProgramQueueContext extends CLQueueContext { public final CLProgram program; public final Map<String, CLKernel> kernels; - public CLSimpleQueueContext(CLCommandQueue queue, CLProgram program) { + public CLSingleProgramQueueContext(CLCommandQueue queue, CLProgram program) { super(queue); this.program = program; this.kernels = program.createCLKernels(); } + public CLSingleProgramQueueContext(CLCommandQueue queue, String... source) { + this(queue, queue.getContext().createProgram(source).build()); + } + public Map<String, CLKernel> getKernels() { return kernels; } @@ -65,7 +69,11 @@ public abstract class CLQueueContext implements CLResource { @Override public void release() { - program.release(); + synchronized(program) { + if(!program.isReleased()) { + program.release(); + } + } } @Override diff --git a/src/com/jogamp/opencl/util/concurrent/CLQueueContextFactory.java b/src/com/jogamp/opencl/util/concurrent/CLQueueContextFactory.java deleted file mode 100644 index 58f389bf..00000000 --- a/src/com/jogamp/opencl/util/concurrent/CLQueueContextFactory.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Created onSaturday, May 07 2011 00:40 - */ -package com.jogamp.opencl.util.concurrent; - -import com.jogamp.opencl.CLCommandQueue; -import com.jogamp.opencl.CLProgram; -import com.jogamp.opencl.util.concurrent.CLQueueContext.CLSimpleQueueContext; - -/** - * Creates {@link CLQueueContext}s. - * @author Michael Bien - */ -public abstract class CLQueueContextFactory<C extends CLQueueContext> { - - /** - * Creates a new queue context for the given queue. - * @param old the old context or null. - */ - public abstract C setup(CLCommandQueue queue, CLQueueContext old); - - - /** - * Creates a simple context factory producing single program contexts. - * @param source sourcecode of a OpenCL program. - */ - public static CLSimpleContextFactory createSimple(String source) { - return new CLSimpleContextFactory(source); - } - - /** - * Creates {@link CLSimpleQueueContext}s containing a precompiled program. - * @author Michael Bien - */ - public static class CLSimpleContextFactory extends CLQueueContextFactory<CLSimpleQueueContext> { - - private final String source; - - public CLSimpleContextFactory(String source) { - this.source = source; - } - - @Override - public CLSimpleQueueContext setup(CLCommandQueue queue, CLQueueContext old) { - CLProgram program = queue.getContext().createProgram(source).build(queue.getDevice()); - return new CLSimpleQueueContext(queue, program); - } - - } - -} diff --git a/src/com/jogamp/opencl/util/concurrent/CLTask.java b/src/com/jogamp/opencl/util/concurrent/CLTask.java index 0cfd24a5..04d433c8 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLTask.java +++ b/src/com/jogamp/opencl/util/concurrent/CLTask.java @@ -3,16 +3,35 @@ */ package com.jogamp.opencl.util.concurrent; +import com.jogamp.opencl.CLCommandQueue; + /** * A task executed on a command queue. * @author Michael Bien */ -public interface CLTask<C extends CLQueueContext, R> { +public abstract class CLTask<C extends CLQueueContext, R> { + + + /** + * Creates a CLQueueContext for this task. A context may contain static resources + * like OpenCL program binaries or pre allocated buffers. A context can be used by an group + * of tasks identified by a common context key ({@link #getContextKey()}). This method + * won't be called if a context was already created by an previously executed task with the + * same context key as this task. + */ + public abstract C createQueueContext(CLCommandQueue queue); + + /** + * Returns the context key for this task. Default implementation returns {@link #getClass()}. + */ + public Object getContextKey() { + return getClass(); + } /** * Runs the task on a queue and returns a result. */ - R execute(C context); + public abstract R execute(C context); } diff --git a/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java b/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java index d1d26824..630ee1c7 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java +++ b/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java @@ -15,7 +15,7 @@ import java.util.concurrent.TimeUnit; * @see CompletionService * @author Michael Bien */ -public class CLTaskCompletionService<C extends CLQueueContext, R> { +public class CLTaskCompletionService<R> { private final ExecutorCompletionService<R> service; private final CLCommandQueuePool pool; @@ -25,7 +25,7 @@ public class CLTaskCompletionService<C extends CLQueueContext, R> { * task execution and a LinkedBlockingQueue with the capacity of {@link Integer#MAX_VALUE} * as a completion queue. */ - public CLTaskCompletionService(CLCommandQueuePool<C> pool) { + public CLTaskCompletionService(CLCommandQueuePool pool) { this.service = new ExecutorCompletionService<R>(pool.getExcecutor()); this.pool = pool; } @@ -34,7 +34,7 @@ public class CLTaskCompletionService<C extends CLQueueContext, R> { * Creates an CLTaskCompletionService using the supplied pool for base * task execution the supplied queue as its completion queue. */ - public CLTaskCompletionService(CLCommandQueuePool<C> pool, BlockingQueue queue) { + public CLTaskCompletionService(CLCommandQueuePool pool, BlockingQueue queue) { this.service = new ExecutorCompletionService<R>(pool.getExcecutor(), queue); this.pool = pool; } @@ -44,7 +44,7 @@ public class CLTaskCompletionService<C extends CLQueueContext, R> { * results of the task. Upon completion, this task may be taken or polled. * @see CompletionService#submit(java.util.concurrent.Callable) */ - public Future<R> submit(CLTask<? super C, R> task) { + public Future<R> submit(CLTask<? extends CLQueueContext, R> task) { return service.submit(pool.wrapTask(task)); } diff --git a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java index 1b2575f5..48425d5e 100644 --- a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java +++ b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java @@ -10,8 +10,7 @@ import com.jogamp.opencl.CLContext; import com.jogamp.opencl.CLDevice; import com.jogamp.opencl.CLKernel; import com.jogamp.opencl.CLPlatform; -import com.jogamp.opencl.util.concurrent.CLQueueContext.CLSimpleQueueContext; -import com.jogamp.opencl.util.concurrent.CLQueueContextFactory.CLSimpleContextFactory; +import com.jogamp.opencl.util.concurrent.CLQueueContext.CLSingleProgramQueueContext; import java.nio.IntBuffer; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -69,16 +68,23 @@ public class CLMultiContextTest { + " array[index]++; \n" + "} \n"; - private final class CLTestTask implements CLTask<CLSimpleQueueContext, IntBuffer> { + private final class CLTestTask extends CLTask<CLSingleProgramQueueContext, IntBuffer> { private final IntBuffer data; + private final String source; - public CLTestTask(IntBuffer buffer) { + public CLTestTask(String source, IntBuffer buffer) { this.data = buffer; + this.source = source; } @Override - public IntBuffer execute(CLSimpleQueueContext qc) { + public CLSingleProgramQueueContext createQueueContext(CLCommandQueue queue) { + return new CLSingleProgramQueueContext(queue, source); + } + + @Override + public IntBuffer execute(CLSingleProgramQueueContext qc) { CLCommandQueue queue = qc.getQueue(); CLContext context = qc.getCLContext(); @@ -110,14 +116,19 @@ public class CLMultiContextTest { return data; } + @Override + public Object getContextKey() { + return source.hashCode(); + } + } - private List<CLTestTask> createTasks(IntBuffer data, int taskCount, int slice) { + private List<CLTestTask> createTasks(String source, IntBuffer data, int taskCount, int slice) { List<CLTestTask> tasks = new ArrayList<CLTestTask>(taskCount); for (int i = 0; i < taskCount; i++) { IntBuffer subBuffer = Buffers.slice(data, i*slice, slice); assertEquals(slice, subBuffer.capacity()); - tasks.add(new CLTestTask(subBuffer)); + tasks.add(new CLTestTask(source, subBuffer)); } return tasks; } @@ -129,8 +140,7 @@ public class CLMultiContextTest { try { - CLSimpleContextFactory factory = CLQueueContextFactory.createSimple(programSource); - CLCommandQueuePool<CLSimpleQueueContext> pool = CLCommandQueuePool.create(factory, mc); + CLCommandQueuePool pool = CLCommandQueuePool.create(mc); assertTrue(pool.getPoolSize() > 0); @@ -139,7 +149,7 @@ public class CLMultiContextTest { final int taskCount = pool.getPoolSize() * tasksPerQueue; IntBuffer data = Buffers.newDirectIntBuffer(slice*taskCount); - List<CLTestTask> tasks = createTasks(data, taskCount, slice); + List<CLTestTask> tasks = createTasks(programSource, data, taskCount, slice); out.println("invoking "+tasks.size()+" tasks on "+pool.getPoolSize()+" queues"); @@ -166,8 +176,8 @@ public class CLMultiContextTest { checkBuffer(3, data); // switching contexts using different program - factory = CLQueueContextFactory.createSimple(programSource.replaceAll("\\+\\+", "--")); - pool.switchContext(factory); + final String decrementProgramSource = programSource.replaceAll("\\+\\+", "--"); + tasks = createTasks(decrementProgramSource, data, taskCount, slice); List<Future<IntBuffer>> results2 = pool.invokeAll(tasks); assertNotNull(results2); checkBuffer(2, data); @@ -176,7 +186,7 @@ public class CLMultiContextTest { // we wait only for completion of a subset of tasks. // submit any data = Buffers.newDirectIntBuffer(slice*taskCount); - tasks = createTasks(data, taskCount, slice); + tasks = createTasks(decrementProgramSource, data, taskCount, slice); IntBuffer ret1 = pool.invokeAny(tasks); assertNotNull(ret1); @@ -185,9 +195,9 @@ public class CLMultiContextTest { // completionservice take/any test data = Buffers.newDirectIntBuffer(slice*taskCount); - tasks = createTasks(data, taskCount, slice); + tasks = createTasks(decrementProgramSource, data, taskCount, slice); - CLTaskCompletionService<CLSimpleQueueContext, IntBuffer> service = new CLTaskCompletionService(pool); + CLTaskCompletionService<IntBuffer> service = new CLTaskCompletionService(pool); for (CLTestTask task : tasks) { service.submit(task); } |