summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java185
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLQueueContext.java14
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLQueueContextFactory.java51
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLTask.java23
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java8
-rw-r--r--test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java40
6 files changed, 147 insertions, 174 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
index eac3dc13..12bfba82 100644
--- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
+++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
@@ -9,7 +9,10 @@ import com.jogamp.opencl.CLResource;
import com.jogamp.opencl.util.CLMultiContext;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -30,63 +33,46 @@ import java.util.concurrent.TimeoutException;
* instead of {@link Callable}s and provides a per-queue context for resource sharing across all tasks of one queue.
* @author Michael Bien
*/
-public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource {
+public class CLCommandQueuePool implements CLResource {
- private List<CLQueueContext> contexts;
private ThreadPoolExecutor excecutor;
private FinishAction finishAction = FinishAction.DO_NOTHING;
private boolean released;
+ private final List<CLCommandQueue> queues;
- private CLCommandQueuePool(CLQueueContextFactory factory, Collection<CLCommandQueue> queues) {
- this.contexts = initContexts(queues, factory);
+ private CLCommandQueuePool(Collection<CLCommandQueue> queues) {
+ this.queues = new ArrayList<CLCommandQueue>(queues);
initExecutor();
}
- private List<CLQueueContext> initContexts(Collection<CLCommandQueue> queues, CLQueueContextFactory factory) {
- List<CLQueueContext> newContexts = new ArrayList<CLQueueContext>(queues.size());
-
- int index = 0;
- for (CLCommandQueue queue : queues) {
-
- CLQueueContext old = null;
- if(this.contexts != null && !this.contexts.isEmpty()) {
- old = this.contexts.get(index++);
- old.release();
- }
-
- newContexts.add(factory.setup(queue, old));
- }
- return newContexts;
- }
-
private void initExecutor() {
BlockingQueue<Runnable> queue = new LinkedBlockingDeque<Runnable>();
- QueueThreadFactory factory = new QueueThreadFactory(contexts);
- int size = contexts.size();
+ QueueThreadFactory factory = new QueueThreadFactory(queues);
+ int size = queues.size();
this.excecutor = new CLThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, queue, factory);
}
- public static <C extends CLQueueContext> CLCommandQueuePool<C> create(CLQueueContextFactory<C> factory, CLMultiContext mc, CLCommandQueue.Mode... modes) {
- return create(factory, mc.getDevices(), modes);
+ public static CLCommandQueuePool create(CLMultiContext mc, CLCommandQueue.Mode... modes) {
+ return create(mc.getDevices(), modes);
}
- public static <C extends CLQueueContext> CLCommandQueuePool<C> create(CLQueueContextFactory<C> factory, Collection<CLDevice> devices, CLCommandQueue.Mode... modes) {
+ public static CLCommandQueuePool create(Collection<CLDevice> devices, CLCommandQueue.Mode... modes) {
List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(devices.size());
for (CLDevice device : devices) {
queues.add(device.createCommandQueue(modes));
}
- return create(factory, queues);
+ return create(queues);
}
- public static <C extends CLQueueContext> CLCommandQueuePool create(CLQueueContextFactory<C> factory, Collection<CLCommandQueue> queues) {
- return new CLCommandQueuePool(factory, queues);
+ public static CLCommandQueuePool create(Collection<CLCommandQueue> queues) {
+ return new CLCommandQueuePool(queues);
}
/**
* Submits this task to the pool for execution returning its {@link Future}.
* @see ExecutorService#submit(java.util.concurrent.Callable)
*/
- public <R> Future<R> submit(CLTask<? super C, R> task) {
+ public <R> Future<R> submit(CLTask<? extends CLQueueContext, R> task) {
return excecutor.submit(wrapTask(task));
}
@@ -94,9 +80,9 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
* Submits all tasks to the pool for execution and returns their {@link Future}.
* Calls {@link #submit(com.jogamp.opencl.util.concurrent.CLTask)} for every task.
*/
- public <R> List<Future<R>> submitAll(Collection<? extends CLTask<? super C, R>> tasks) {
+ public <R> List<Future<R>> submitAll(Collection<? extends CLTask<? extends CLQueueContext, R>> tasks) {
List<Future<R>> futures = new ArrayList<Future<R>>(tasks.size());
- for (CLTask<? super C, R> task : tasks) {
+ for (CLTask<? extends CLQueueContext, R> task : tasks) {
futures.add(submit(task));
}
return futures;
@@ -106,8 +92,8 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
* Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result.
* @see ExecutorService#invokeAll(java.util.Collection)
*/
- public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? super C, R>> tasks) throws InterruptedException {
- List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks);
+ public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? extends CLQueueContext, R>> tasks) throws InterruptedException {
+ List<TaskWrapper<R>> wrapper = wrapTasks(tasks);
return excecutor.invokeAll(wrapper);
}
@@ -115,8 +101,8 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
* Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result.
* @see ExecutorService#invokeAll(java.util.Collection, long, java.util.concurrent.TimeUnit)
*/
- public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? super C, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
- List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks);
+ public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? extends CLQueueContext, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ List<TaskWrapper<R>> wrapper = wrapTasks(tasks);
return excecutor.invokeAll(wrapper, timeout, unit);
}
@@ -125,13 +111,13 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
* All other unfinished but started tasks are cancelled.
* @see ExecutorService#invokeAny(java.util.Collection)
*/
- public <R> R invokeAny(Collection<? extends CLTask<? super C, R>> tasks) throws InterruptedException, ExecutionException {
- List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks);
+ public <R> R invokeAny(Collection<? extends CLTask<? extends CLQueueContext, R>> tasks) throws InterruptedException, ExecutionException {
+ List<TaskWrapper<R>> wrapper = wrapTasks(tasks);
return excecutor.invokeAny(wrapper);
}
- /*public*/ CLTask<? super C, ?> takeCLTask() throws InterruptedException {
- return ((CLFutureTask<? super C, ?>)excecutor.getQueue().take()).getCLTask();
+ /*public*/ CLTask<? extends CLQueueContext, ?> takeCLTask() throws InterruptedException {
+ return ((CLFutureTask<?>)excecutor.getQueue().take()).getCLTask();
}
/**
@@ -139,47 +125,32 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
* All other unfinished but started tasks are cancelled.
* @see ExecutorService#invokeAny(java.util.Collection, long, java.util.concurrent.TimeUnit)
*/
- public <R> R invokeAny(Collection<? extends CLTask<? super C, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks);
+ public <R> R invokeAny(Collection<? extends CLTask<? super CLQueueContext, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ List<TaskWrapper<R>> wrapper = wrapTasks(tasks);
return excecutor.invokeAny(wrapper, timeout, unit);
}
- <R> TaskWrapper<C, R> wrapTask(CLTask<? super C, R> task) {
+ <R> TaskWrapper<R> wrapTask(CLTask<? extends CLQueueContext, R> task) {
return new TaskWrapper(task, finishAction);
}
- private <R> List<TaskWrapper<C, R>> wrapTasks(Collection<? extends CLTask<? super C, R>> tasks) {
- List<TaskWrapper<C, R>> wrapper = new ArrayList<TaskWrapper<C, R>>(tasks.size());
- for (CLTask<? super C, R> task : tasks) {
+ private <R> List<TaskWrapper<R>> wrapTasks(Collection<? extends CLTask<? extends CLQueueContext, R>> tasks) {
+ List<TaskWrapper<R>> wrapper = new ArrayList<TaskWrapper<R>>(tasks.size());
+ for (CLTask<? extends CLQueueContext, R> task : tasks) {
if(task == null) {
throw new NullPointerException("at least one task was null");
}
- wrapper.add(new TaskWrapper<C, R>(task, finishAction));
+ wrapper.add(new TaskWrapper<R>((CLTask<CLQueueContext, R>)task, finishAction));
}
return wrapper;
}
-
- /**
- * Switches the context of all queues - this operation can be expensive.
- * Blocks until all tasks finish and sets up a new context for all queues.
- * @return this
- */
- public <C extends CLQueueContext> CLCommandQueuePool switchContext(CLQueueContextFactory<C> factory) {
-
- excecutor.shutdown();
- finishQueues(); // just to be sure
-
- contexts = initContexts(getQueues(), factory);
- initExecutor();
- return this;
- }
/**
* Calls {@link CLCommandQueue#flush()} on all queues.
*/
public void flushQueues() {
- for (CLQueueContext context : contexts) {
- context.queue.flush();
+ for (CLCommandQueue queue : queues) {
+ queue.flush();
}
}
@@ -187,8 +158,8 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
* Calls {@link CLCommandQueue#finish()} on all queues.
*/
public void finishQueues() {
- for (CLQueueContext context : contexts) {
- context.queue.finish();
+ for (CLCommandQueue queue : queues) {
+ queue.finish();
}
}
@@ -202,16 +173,11 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
throw new RuntimeException(getClass().getSimpleName()+" already released");
}
released = true;
- excecutor.shutdownNow();
+ excecutor.shutdownNow(); // threads will cleanup CL resources on exit
try {
excecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
- }finally{
- for (CLQueueContext context : contexts) {
- context.queue.finish().release();
- context.release();
- }
}
}
@@ -223,18 +189,14 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
* Returns the command queues used in this pool.
*/
public List<CLCommandQueue> getQueues() {
- List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(contexts.size());
- for (CLQueueContext context : contexts) {
- queues.add(context.queue);
- }
- return queues;
+ return Collections.unmodifiableList(queues);
}
/**
* Returns the size of this pool (number of command queues).
*/
public int getPoolSize() {
- return contexts.size();
+ return queues.size();
}
/**
@@ -284,16 +246,16 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
@Override
public String toString() {
- return getClass().getSimpleName()+" [queues: "+contexts.size()+" on finish: "+finishAction+"]";
+ return getClass().getSimpleName()+" [queues: "+getPoolSize()+" on finish: "+finishAction+"]";
}
private static class QueueThreadFactory implements ThreadFactory {
- private final List<CLQueueContext> context;
+ private final List<CLCommandQueue> queues;
private int index;
- private QueueThreadFactory(List<CLQueueContext> queues) {
- this.context = queues;
+ private QueueThreadFactory(List<CLCommandQueue> queues) {
+ this.queues = queues;
this.index = 0;
}
@@ -303,7 +265,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
SecurityManager sm = System.getSecurityManager();
ThreadGroup group = (sm != null) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup();
- CLQueueContext queue = context.get(index);
+ CLCommandQueue queue = queues.get(index);
QueueThread thread = new QueueThread(group, runnable, queue, index++);
thread.setDaemon(true);
@@ -313,27 +275,52 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
}
private static class QueueThread extends Thread {
- private final CLQueueContext context;
- public QueueThread(ThreadGroup group, Runnable runnable, CLQueueContext context, int index) {
- super(group, runnable, "queue-worker-thread-"+index+"["+context+"]");
- this.context = context;
+
+ private final CLCommandQueue queue;
+ private final Map<Object, CLQueueContext> contextMap;
+
+ public QueueThread(ThreadGroup group, Runnable runnable, CLCommandQueue queue, int index) {
+ super(group, runnable, "queue-worker-thread-"+index+"["+queue+"]");
+ this.queue = queue;
+ this.contextMap = new HashMap<Object, CLQueueContext>();
+ }
+
+ @Override
+ public void run() {
+ super.run();
+ //release threadlocal contexts
+ queue.finish();
+ for (CLQueueContext context : contextMap.values()) {
+ context.release();
+ }
}
+
}
- private static class TaskWrapper<C extends CLQueueContext, R> implements Callable<R> {
+ private static class TaskWrapper<R> implements Callable<R> {
- private final CLTask<? super C, R> task;
+ private final CLTask<CLQueueContext, R> task;
private final FinishAction mode;
- private TaskWrapper(CLTask<? super C, R> task, FinishAction mode) {
+ private TaskWrapper(CLTask<CLQueueContext, R> task, FinishAction mode) {
this.task = task;
this.mode = mode;
}
@Override
public R call() throws Exception {
- CLQueueContext context = ((QueueThread)Thread.currentThread()).context;
- R result = task.execute((C)context);
+
+ QueueThread thread = (QueueThread)Thread.currentThread();
+
+ final Object key = task.getContextKey();
+
+ CLQueueContext context = thread.contextMap.get(key);
+ if(context == null) {
+ context = task.createQueueContext(thread.queue);
+ thread.contextMap.put(key, context);
+ }
+
+ R result = task.execute(context);
if(mode.equals(FinishAction.FLUSH)) {
context.queue.flush();
}else if(mode.equals(FinishAction.FINISH)) {
@@ -344,16 +331,16 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
}
- private static class CLFutureTask<C extends CLQueueContext, R> extends FutureTask<R> {
+ private static class CLFutureTask<R> extends FutureTask<R> {
- private final TaskWrapper<C, R> wrapper;
+ private final TaskWrapper<R> wrapper;
- public CLFutureTask(TaskWrapper<C, R> wrapper) {
+ public CLFutureTask(TaskWrapper<R> wrapper) {
super(wrapper);
this.wrapper = wrapper;
}
- public CLTask<? super C, R> getCLTask() {
+ public CLTask<? extends CLQueueContext, R> getCLTask() {
return wrapper.task;
}
@@ -366,9 +353,9 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource
}
@Override
- protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
- TaskWrapper<CLQueueContext, T> wrapper = (TaskWrapper<CLQueueContext, T>)callable;
- return new CLFutureTask<CLQueueContext, T>(wrapper);
+ protected <R> RunnableFuture<R> newTaskFor(Callable<R> callable) {
+ TaskWrapper<R> wrapper = (TaskWrapper<R>)callable;
+ return new CLFutureTask<R>(wrapper);
}
}
diff --git a/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java b/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java
index 9f92b9a3..93c2d226 100644
--- a/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java
+++ b/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java
@@ -40,17 +40,21 @@ public abstract class CLQueueContext implements CLResource {
* A simple queue context holding a precompiled program and its kernels.
* @author Michael Bien
*/
- public static class CLSimpleQueueContext extends CLQueueContext {
+ public static class CLSingleProgramQueueContext extends CLQueueContext {
public final CLProgram program;
public final Map<String, CLKernel> kernels;
- public CLSimpleQueueContext(CLCommandQueue queue, CLProgram program) {
+ public CLSingleProgramQueueContext(CLCommandQueue queue, CLProgram program) {
super(queue);
this.program = program;
this.kernels = program.createCLKernels();
}
+ public CLSingleProgramQueueContext(CLCommandQueue queue, String... source) {
+ this(queue, queue.getContext().createProgram(source).build());
+ }
+
public Map<String, CLKernel> getKernels() {
return kernels;
}
@@ -65,7 +69,11 @@ public abstract class CLQueueContext implements CLResource {
@Override
public void release() {
- program.release();
+ synchronized(program) {
+ if(!program.isReleased()) {
+ program.release();
+ }
+ }
}
@Override
diff --git a/src/com/jogamp/opencl/util/concurrent/CLQueueContextFactory.java b/src/com/jogamp/opencl/util/concurrent/CLQueueContextFactory.java
deleted file mode 100644
index 58f389bf..00000000
--- a/src/com/jogamp/opencl/util/concurrent/CLQueueContextFactory.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Created onSaturday, May 07 2011 00:40
- */
-package com.jogamp.opencl.util.concurrent;
-
-import com.jogamp.opencl.CLCommandQueue;
-import com.jogamp.opencl.CLProgram;
-import com.jogamp.opencl.util.concurrent.CLQueueContext.CLSimpleQueueContext;
-
-/**
- * Creates {@link CLQueueContext}s.
- * @author Michael Bien
- */
-public abstract class CLQueueContextFactory<C extends CLQueueContext> {
-
- /**
- * Creates a new queue context for the given queue.
- * @param old the old context or null.
- */
- public abstract C setup(CLCommandQueue queue, CLQueueContext old);
-
-
- /**
- * Creates a simple context factory producing single program contexts.
- * @param source sourcecode of a OpenCL program.
- */
- public static CLSimpleContextFactory createSimple(String source) {
- return new CLSimpleContextFactory(source);
- }
-
- /**
- * Creates {@link CLSimpleQueueContext}s containing a precompiled program.
- * @author Michael Bien
- */
- public static class CLSimpleContextFactory extends CLQueueContextFactory<CLSimpleQueueContext> {
-
- private final String source;
-
- public CLSimpleContextFactory(String source) {
- this.source = source;
- }
-
- @Override
- public CLSimpleQueueContext setup(CLCommandQueue queue, CLQueueContext old) {
- CLProgram program = queue.getContext().createProgram(source).build(queue.getDevice());
- return new CLSimpleQueueContext(queue, program);
- }
-
- }
-
-}
diff --git a/src/com/jogamp/opencl/util/concurrent/CLTask.java b/src/com/jogamp/opencl/util/concurrent/CLTask.java
index 0cfd24a5..04d433c8 100644
--- a/src/com/jogamp/opencl/util/concurrent/CLTask.java
+++ b/src/com/jogamp/opencl/util/concurrent/CLTask.java
@@ -3,16 +3,35 @@
*/
package com.jogamp.opencl.util.concurrent;
+import com.jogamp.opencl.CLCommandQueue;
+
/**
* A task executed on a command queue.
* @author Michael Bien
*/
-public interface CLTask<C extends CLQueueContext, R> {
+public abstract class CLTask<C extends CLQueueContext, R> {
+
+
+ /**
+ * Creates a CLQueueContext for this task. A context may contain static resources
+ * like OpenCL program binaries or pre allocated buffers. A context can be used by an group
+ * of tasks identified by a common context key ({@link #getContextKey()}). This method
+ * won't be called if a context was already created by an previously executed task with the
+ * same context key as this task.
+ */
+ public abstract C createQueueContext(CLCommandQueue queue);
+
+ /**
+ * Returns the context key for this task. Default implementation returns {@link #getClass()}.
+ */
+ public Object getContextKey() {
+ return getClass();
+ }
/**
* Runs the task on a queue and returns a result.
*/
- R execute(C context);
+ public abstract R execute(C context);
}
diff --git a/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java b/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java
index d1d26824..630ee1c7 100644
--- a/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java
+++ b/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java
@@ -15,7 +15,7 @@ import java.util.concurrent.TimeUnit;
* @see CompletionService
* @author Michael Bien
*/
-public class CLTaskCompletionService<C extends CLQueueContext, R> {
+public class CLTaskCompletionService<R> {
private final ExecutorCompletionService<R> service;
private final CLCommandQueuePool pool;
@@ -25,7 +25,7 @@ public class CLTaskCompletionService<C extends CLQueueContext, R> {
* task execution and a LinkedBlockingQueue with the capacity of {@link Integer#MAX_VALUE}
* as a completion queue.
*/
- public CLTaskCompletionService(CLCommandQueuePool<C> pool) {
+ public CLTaskCompletionService(CLCommandQueuePool pool) {
this.service = new ExecutorCompletionService<R>(pool.getExcecutor());
this.pool = pool;
}
@@ -34,7 +34,7 @@ public class CLTaskCompletionService<C extends CLQueueContext, R> {
* Creates an CLTaskCompletionService using the supplied pool for base
* task execution the supplied queue as its completion queue.
*/
- public CLTaskCompletionService(CLCommandQueuePool<C> pool, BlockingQueue queue) {
+ public CLTaskCompletionService(CLCommandQueuePool pool, BlockingQueue queue) {
this.service = new ExecutorCompletionService<R>(pool.getExcecutor(), queue);
this.pool = pool;
}
@@ -44,7 +44,7 @@ public class CLTaskCompletionService<C extends CLQueueContext, R> {
* results of the task. Upon completion, this task may be taken or polled.
* @see CompletionService#submit(java.util.concurrent.Callable)
*/
- public Future<R> submit(CLTask<? super C, R> task) {
+ public Future<R> submit(CLTask<? extends CLQueueContext, R> task) {
return service.submit(pool.wrapTask(task));
}
diff --git a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java
index 1b2575f5..48425d5e 100644
--- a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java
+++ b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java
@@ -10,8 +10,7 @@ import com.jogamp.opencl.CLContext;
import com.jogamp.opencl.CLDevice;
import com.jogamp.opencl.CLKernel;
import com.jogamp.opencl.CLPlatform;
-import com.jogamp.opencl.util.concurrent.CLQueueContext.CLSimpleQueueContext;
-import com.jogamp.opencl.util.concurrent.CLQueueContextFactory.CLSimpleContextFactory;
+import com.jogamp.opencl.util.concurrent.CLQueueContext.CLSingleProgramQueueContext;
import java.nio.IntBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -69,16 +68,23 @@ public class CLMultiContextTest {
+ " array[index]++; \n"
+ "} \n";
- private final class CLTestTask implements CLTask<CLSimpleQueueContext, IntBuffer> {
+ private final class CLTestTask extends CLTask<CLSingleProgramQueueContext, IntBuffer> {
private final IntBuffer data;
+ private final String source;
- public CLTestTask(IntBuffer buffer) {
+ public CLTestTask(String source, IntBuffer buffer) {
this.data = buffer;
+ this.source = source;
}
@Override
- public IntBuffer execute(CLSimpleQueueContext qc) {
+ public CLSingleProgramQueueContext createQueueContext(CLCommandQueue queue) {
+ return new CLSingleProgramQueueContext(queue, source);
+ }
+
+ @Override
+ public IntBuffer execute(CLSingleProgramQueueContext qc) {
CLCommandQueue queue = qc.getQueue();
CLContext context = qc.getCLContext();
@@ -110,14 +116,19 @@ public class CLMultiContextTest {
return data;
}
+ @Override
+ public Object getContextKey() {
+ return source.hashCode();
+ }
+
}
- private List<CLTestTask> createTasks(IntBuffer data, int taskCount, int slice) {
+ private List<CLTestTask> createTasks(String source, IntBuffer data, int taskCount, int slice) {
List<CLTestTask> tasks = new ArrayList<CLTestTask>(taskCount);
for (int i = 0; i < taskCount; i++) {
IntBuffer subBuffer = Buffers.slice(data, i*slice, slice);
assertEquals(slice, subBuffer.capacity());
- tasks.add(new CLTestTask(subBuffer));
+ tasks.add(new CLTestTask(source, subBuffer));
}
return tasks;
}
@@ -129,8 +140,7 @@ public class CLMultiContextTest {
try {
- CLSimpleContextFactory factory = CLQueueContextFactory.createSimple(programSource);
- CLCommandQueuePool<CLSimpleQueueContext> pool = CLCommandQueuePool.create(factory, mc);
+ CLCommandQueuePool pool = CLCommandQueuePool.create(mc);
assertTrue(pool.getPoolSize() > 0);
@@ -139,7 +149,7 @@ public class CLMultiContextTest {
final int taskCount = pool.getPoolSize() * tasksPerQueue;
IntBuffer data = Buffers.newDirectIntBuffer(slice*taskCount);
- List<CLTestTask> tasks = createTasks(data, taskCount, slice);
+ List<CLTestTask> tasks = createTasks(programSource, data, taskCount, slice);
out.println("invoking "+tasks.size()+" tasks on "+pool.getPoolSize()+" queues");
@@ -166,8 +176,8 @@ public class CLMultiContextTest {
checkBuffer(3, data);
// switching contexts using different program
- factory = CLQueueContextFactory.createSimple(programSource.replaceAll("\\+\\+", "--"));
- pool.switchContext(factory);
+ final String decrementProgramSource = programSource.replaceAll("\\+\\+", "--");
+ tasks = createTasks(decrementProgramSource, data, taskCount, slice);
List<Future<IntBuffer>> results2 = pool.invokeAll(tasks);
assertNotNull(results2);
checkBuffer(2, data);
@@ -176,7 +186,7 @@ public class CLMultiContextTest {
// we wait only for completion of a subset of tasks.
// submit any
data = Buffers.newDirectIntBuffer(slice*taskCount);
- tasks = createTasks(data, taskCount, slice);
+ tasks = createTasks(decrementProgramSource, data, taskCount, slice);
IntBuffer ret1 = pool.invokeAny(tasks);
assertNotNull(ret1);
@@ -185,9 +195,9 @@ public class CLMultiContextTest {
// completionservice take/any test
data = Buffers.newDirectIntBuffer(slice*taskCount);
- tasks = createTasks(data, taskCount, slice);
+ tasks = createTasks(decrementProgramSource, data, taskCount, slice);
- CLTaskCompletionService<CLSimpleQueueContext, IntBuffer> service = new CLTaskCompletionService(pool);
+ CLTaskCompletionService<IntBuffer> service = new CLTaskCompletionService(pool);
for (CLTestTask task : tasks) {
service.submit(task);
}