summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--nbproject/project.properties4
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLForkJoinPool.java200
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLRecursiveTask.java78
-rw-r--r--test/com/jogamp/opencl/util/concurrent/CLForkJoinTest.java110
4 files changed, 391 insertions, 1 deletions
diff --git a/nbproject/project.properties b/nbproject/project.properties
index 0214d3dd..1078020a 100644
--- a/nbproject/project.properties
+++ b/nbproject/project.properties
@@ -30,8 +30,8 @@ debug.test.classpath=\
dist.dir=dist
dist.jar=${dist.dir}/jocl.jar
dist.javadoc.dir=${dist.dir}/javadoc/jocl/javadoc
+mkdist.disabled=false
endorsed.classpath=
-excludes=
file.reference.junit-4.8.1.jar=lib/junit_4/junit-4.8.1.jar
file.reference.gluegen-rt.jar=${gluegen.root}/${rootrel.build}/gluegen-rt.jar
@@ -40,6 +40,7 @@ file.reference.jogl.all.jar=${jogl.root}/${rootrel.build}/jogl/jogl.all.jar
file.reference.nativewindow.all.jar=${jogl.root}/${rootrel.build}/nativewindow/nativewindow.all.jar
file.reference.newt.all.jar=${jogl.root}/${rootrel.build}/newt/newt.all.jar
+excludes=${src.excludes}
includes=**
jar.compress=false
javac.classpath=\
@@ -110,6 +111,7 @@ jocl_base_version=0.9
#force enable jdk7 features
#enable.jdk7.features=true
+src.excludes=**/concurrent/CLForkJoin*,**/concurrent/CLRecursive*
#crosscompile=true
diff --git a/src/com/jogamp/opencl/util/concurrent/CLForkJoinPool.java b/src/com/jogamp/opencl/util/concurrent/CLForkJoinPool.java
new file mode 100644
index 00000000..bf253442
--- /dev/null
+++ b/src/com/jogamp/opencl/util/concurrent/CLForkJoinPool.java
@@ -0,0 +1,200 @@
+/*
+ * Copyright (c) 2011, Michael Bien
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without modification, are
+ * permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this list of
+ * conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice, this list
+ * of conditions and the following disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+ * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/*
+ * Created on Tuesday, August 02 2011 01:53
+ */
+package com.jogamp.opencl.util.concurrent;
+
+import com.jogamp.opencl.CLCommandQueue;
+import com.jogamp.opencl.CLDevice;
+import com.jogamp.opencl.util.CLMultiContext;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinWorkerThread;
+import java.util.concurrent.Future;
+
+/**
+ * JOCL implementation of the {@link ForkJoinPool}.
+ * @author Michael Bien
+ */
+public class CLForkJoinPool extends CLAbstractExecutorService {
+
+ private CLForkJoinPool(ExecutorService executor, List<CLCommandQueue> queues) {
+ super(executor, queues);
+ }
+
+ public static CLForkJoinPool create(CLMultiContext mc, CLCommandQueue.Mode... modes) {
+ return create(mc.getDevices(), modes);
+ }
+
+ public static CLForkJoinPool create(Collection<? extends CLDevice> devices, CLCommandQueue.Mode... modes) {
+ List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(devices.size());
+ for (CLDevice device : devices) {
+ queues.add(device.createCommandQueue(modes));
+ }
+ return create(queues);
+ }
+
+ public static CLForkJoinPool create(Collection<CLCommandQueue> queues) {
+
+ List<CLCommandQueue> list = new ArrayList<CLCommandQueue>(queues);
+
+ CLThreadFactory factory = new CLThreadFactory(list);
+ int size = list.size();
+
+ ExecutorService service = new ForkJoinPool(size, factory, null, false);
+ return new CLForkJoinPool(service, list);
+ }
+
+ /**
+ * Performs the given task, returning its result upon completion.
+ * @see ForkJoinPool#invoke(java.util.concurrent.ForkJoinTask)
+ */
+ public <R> R invoke(CLRecursiveTask<? extends CLQueueContext, R> task) {
+ // shortcut, prevents redundant wrapping
+ return getExcecutor().invoke(task);
+ }
+
+ /**
+ * Submits this task to the pool for execution returning its {@link Future}.
+ * @see ForkJoinPool#submit(java.util.concurrent.ForkJoinTask)
+ */
+ public <R> Future<R> submit(CLRecursiveTask<? extends CLQueueContext, R> task) {
+ // shortcut, prevents redundant wrapping
+ return getExcecutor().submit(task);
+ }
+
+ @Override
+ ForkJoinPool getExcecutor() {
+ return (ForkJoinPool) super.getExcecutor();
+ }
+
+ /**
+ * Returns an estimate of the total number of tasks stolen from
+ * one thread's work queue by another.
+ */
+ public long getStealCount() {
+ return getExcecutor().getStealCount();
+ }
+
+ /**
+ * Returns an estimate of the number of tasks submitted to this
+ * pool that have not yet begun executing. This method may take
+ * time proportional to the number of submissions.
+ */
+ public int getQueuedSubmissionCount() {
+ return getExcecutor().getQueuedSubmissionCount();
+ }
+
+ /**
+ * Returns an estimate of the total number of tasks currently held
+ * in queues by worker threads (but not including tasks submitted
+ * to the pool that have not begun executing). This value is only
+ * an approximation, obtained by iterating across all threads in
+ * the pool. This method may be useful for tuning task
+ * granularities.
+ */
+ public long getQueuedTaskCount() {
+ return getExcecutor().getQueuedTaskCount();
+ }
+
+ /**
+ * Returns {@code true} if there are any tasks submitted to this
+ * pool that have not yet begun executing.
+ */
+ public boolean hasQueuedSubmissions() {
+ return getExcecutor().hasQueuedSubmissions();
+ }
+
+ /**
+ * Returns {@code true} if all worker threads are currently idle.
+ * An idle worker is one that cannot obtain a task to execute
+ * because none are available to steal from other threads, and
+ * there are no pending submissions to the pool. This method is
+ * conservative; it might not return {@code true} immediately upon
+ * idleness of all threads, but will eventually become true if
+ * threads remain inactive.
+ */
+ public boolean isQuiescent() {
+ return getExcecutor().isQuiescent();
+ }
+
+ private static class CLThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {
+
+ private int index = 0;
+ private final List<CLCommandQueue> queues;
+
+ private CLThreadFactory(List<CLCommandQueue> queues) {
+ this.queues = queues;
+ }
+
+ @Override
+ public synchronized ForkJoinWorkerThread newThread(ForkJoinPool pool) {
+ return new ForkJoinQueueWorkerThread(pool, queues.get(index++));
+ }
+
+ }
+
+ final static class ForkJoinQueueWorkerThread extends ForkJoinWorkerThread implements CommandQueueThread {
+
+ private final CLCommandQueue queue;
+ private final Map<Object, CLQueueContext> contextMap;
+
+ public ForkJoinQueueWorkerThread(ForkJoinPool pool, CLCommandQueue queue) {
+ super(pool);
+ this.queue = queue;
+ this.contextMap = new HashMap<Object, CLQueueContext>();
+ }
+
+ @Override
+ public void run() {
+ super.run();
+ //release threadlocal contexts
+ queue.finish();
+ for (CLQueueContext context : contextMap.values()) {
+ context.release();
+ }
+ }
+
+ @Override
+ public Map<Object, CLQueueContext> getContextMap() {
+ return contextMap;
+ }
+
+ @Override
+ public CLCommandQueue getQueue() {
+ return queue;
+ }
+ }
+
+
+}
diff --git a/src/com/jogamp/opencl/util/concurrent/CLRecursiveTask.java b/src/com/jogamp/opencl/util/concurrent/CLRecursiveTask.java
new file mode 100644
index 00000000..216a527e
--- /dev/null
+++ b/src/com/jogamp/opencl/util/concurrent/CLRecursiveTask.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2011, Michael Bien
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without modification, are
+ * permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this list of
+ * conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice, this list
+ * of conditions and the following disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+ * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/*
+ * Created on Tuesday, August 02 2011 02:02
+ */
+package com.jogamp.opencl.util.concurrent;
+
+import com.jogamp.opencl.CLCommandQueue;
+import com.jogamp.opencl.util.concurrent.CLAbstractExecutorService.CommandQueueThread;
+import java.util.concurrent.RecursiveTask;
+
+/**
+ * A recursive decomposable task executed on a {@link CLCommandQueue}.
+ * @see RecursiveTask
+ * @author Michael Bien
+ */
+public abstract class CLRecursiveTask<C extends CLQueueContext, R> extends RecursiveTask<R> implements CLPoolable<C, R> {
+
+ @Override
+ protected final R compute() {
+
+ CommandQueueThread thread = (CommandQueueThread)Thread.currentThread();
+
+ final Object key = getContextKey();
+
+ CLQueueContext context = thread.getContextMap().get(key);
+ if(context == null) {
+ context = createQueueContext(thread.getQueue());
+ thread.getContextMap().put(key, context);
+ }
+
+ @SuppressWarnings("unchecked")
+ CLPoolable<CLQueueContext, R> task = (CLPoolable<CLQueueContext, R>) this;
+
+ R result = task.execute(context);
+
+ // TODO: currently only the root task supports finish actions
+// if(mode.equals(FinishAction.FLUSH)) {
+// context.queue.flush();
+// }else if(mode.equals(FinishAction.FINISH)) {
+// context.queue.finish();
+// }
+ return result;
+ }
+
+ /**
+ * Returns the context key for this task. Default implementation returns {@link #getClass()}.
+ */
+ @Override
+ public Object getContextKey() {
+ return getClass();
+ }
+
+}
diff --git a/test/com/jogamp/opencl/util/concurrent/CLForkJoinTest.java b/test/com/jogamp/opencl/util/concurrent/CLForkJoinTest.java
new file mode 100644
index 00000000..bfcff6ab
--- /dev/null
+++ b/test/com/jogamp/opencl/util/concurrent/CLForkJoinTest.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2011, Michael Bien
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without modification, are
+ * permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this list of
+ * conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice, this list
+ * of conditions and the following disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+ * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/*
+ * Created on Tuesday, August 02 2011 22:53
+ */
+package com.jogamp.opencl.util.concurrent;
+
+import com.jogamp.opencl.CLCommandQueue;
+import com.jogamp.opencl.CLPlatform;
+import com.jogamp.opencl.util.CLMultiContext;
+import com.jogamp.opencl.util.concurrent.CLQueueContext.CLSingleProgramQueueContext;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static java.lang.System.*;
+
+/**
+ *
+ * @author Michael Bien
+ */
+public class CLForkJoinTest {
+
+ private class LogicTest extends CLRecursiveTask<CLQueueContext.CLSingleProgramQueueContext, Integer> {
+
+ private final int size;
+
+ public LogicTest(int size) {
+ this.size = size;
+ }
+
+ @Override
+ public CLSingleProgramQueueContext createQueueContext(CLCommandQueue queue) {
+ return new CLSingleProgramQueueContext(queue, "kernel void noop(void){}\n");
+ }
+
+ @Override
+ public Integer execute(CLSingleProgramQueueContext context) {
+
+ assertNotNull(context);
+ assertTrue(context instanceof CLSingleProgramQueueContext);
+
+// out.println(Thread.currentThread());
+
+ if(size > 8) {
+
+ LogicTest task2 = new LogicTest(size/2);
+ task2.fork();
+
+ LogicTest task1 = new LogicTest(size/2);
+
+ return task1.compute() + task2.join();
+ }else{
+ return size;
+ }
+
+ }
+
+ }
+
+ @Test
+ public void forkJoinTest() throws InterruptedException, ExecutionException {
+
+ CLMultiContext mc = CLMultiContext.create(CLPlatform.listCLPlatforms());
+
+ try{
+
+ CLForkJoinPool pool = CLForkJoinPool.create(mc);
+
+ final int size = 64;
+ LogicTest task = new LogicTest(size);
+ Future<Integer> future = pool.submit(task);
+ assertNotNull(future);
+ assertEquals(size, (int)future.get());
+
+ pool.release();
+
+ }finally{
+ mc.release();
+ }
+
+ }
+
+
+}