diff options
-rw-r--r-- | nbproject/project.properties | 4 | ||||
-rw-r--r-- | src/com/jogamp/opencl/util/concurrent/CLForkJoinPool.java | 200 | ||||
-rw-r--r-- | src/com/jogamp/opencl/util/concurrent/CLRecursiveTask.java | 78 | ||||
-rw-r--r-- | test/com/jogamp/opencl/util/concurrent/CLForkJoinTest.java | 110 |
4 files changed, 391 insertions, 1 deletions
diff --git a/nbproject/project.properties b/nbproject/project.properties index 0214d3dd..1078020a 100644 --- a/nbproject/project.properties +++ b/nbproject/project.properties @@ -30,8 +30,8 @@ debug.test.classpath=\ dist.dir=dist dist.jar=${dist.dir}/jocl.jar dist.javadoc.dir=${dist.dir}/javadoc/jocl/javadoc +mkdist.disabled=false endorsed.classpath= -excludes= file.reference.junit-4.8.1.jar=lib/junit_4/junit-4.8.1.jar file.reference.gluegen-rt.jar=${gluegen.root}/${rootrel.build}/gluegen-rt.jar @@ -40,6 +40,7 @@ file.reference.jogl.all.jar=${jogl.root}/${rootrel.build}/jogl/jogl.all.jar file.reference.nativewindow.all.jar=${jogl.root}/${rootrel.build}/nativewindow/nativewindow.all.jar file.reference.newt.all.jar=${jogl.root}/${rootrel.build}/newt/newt.all.jar +excludes=${src.excludes} includes=** jar.compress=false javac.classpath=\ @@ -110,6 +111,7 @@ jocl_base_version=0.9 #force enable jdk7 features #enable.jdk7.features=true +src.excludes=**/concurrent/CLForkJoin*,**/concurrent/CLRecursive* #crosscompile=true diff --git a/src/com/jogamp/opencl/util/concurrent/CLForkJoinPool.java b/src/com/jogamp/opencl/util/concurrent/CLForkJoinPool.java new file mode 100644 index 00000000..bf253442 --- /dev/null +++ b/src/com/jogamp/opencl/util/concurrent/CLForkJoinPool.java @@ -0,0 +1,200 @@ +/* + * Copyright (c) 2011, Michael Bien + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are + * permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this list of + * conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this list + * of conditions and the following disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND + * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* + * Created on Tuesday, August 02 2011 01:53 + */ +package com.jogamp.opencl.util.concurrent; + +import com.jogamp.opencl.CLCommandQueue; +import com.jogamp.opencl.CLDevice; +import com.jogamp.opencl.util.CLMultiContext; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinWorkerThread; +import java.util.concurrent.Future; + +/** + * JOCL implementation of the {@link ForkJoinPool}. + * @author Michael Bien + */ +public class CLForkJoinPool extends CLAbstractExecutorService { + + private CLForkJoinPool(ExecutorService executor, List<CLCommandQueue> queues) { + super(executor, queues); + } + + public static CLForkJoinPool create(CLMultiContext mc, CLCommandQueue.Mode... modes) { + return create(mc.getDevices(), modes); + } + + public static CLForkJoinPool create(Collection<? extends CLDevice> devices, CLCommandQueue.Mode... modes) { + List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(devices.size()); + for (CLDevice device : devices) { + queues.add(device.createCommandQueue(modes)); + } + return create(queues); + } + + public static CLForkJoinPool create(Collection<CLCommandQueue> queues) { + + List<CLCommandQueue> list = new ArrayList<CLCommandQueue>(queues); + + CLThreadFactory factory = new CLThreadFactory(list); + int size = list.size(); + + ExecutorService service = new ForkJoinPool(size, factory, null, false); + return new CLForkJoinPool(service, list); + } + + /** + * Performs the given task, returning its result upon completion. + * @see ForkJoinPool#invoke(java.util.concurrent.ForkJoinTask) + */ + public <R> R invoke(CLRecursiveTask<? extends CLQueueContext, R> task) { + // shortcut, prevents redundant wrapping + return getExcecutor().invoke(task); + } + + /** + * Submits this task to the pool for execution returning its {@link Future}. + * @see ForkJoinPool#submit(java.util.concurrent.ForkJoinTask) + */ + public <R> Future<R> submit(CLRecursiveTask<? extends CLQueueContext, R> task) { + // shortcut, prevents redundant wrapping + return getExcecutor().submit(task); + } + + @Override + ForkJoinPool getExcecutor() { + return (ForkJoinPool) super.getExcecutor(); + } + + /** + * Returns an estimate of the total number of tasks stolen from + * one thread's work queue by another. + */ + public long getStealCount() { + return getExcecutor().getStealCount(); + } + + /** + * Returns an estimate of the number of tasks submitted to this + * pool that have not yet begun executing. This method may take + * time proportional to the number of submissions. + */ + public int getQueuedSubmissionCount() { + return getExcecutor().getQueuedSubmissionCount(); + } + + /** + * Returns an estimate of the total number of tasks currently held + * in queues by worker threads (but not including tasks submitted + * to the pool that have not begun executing). This value is only + * an approximation, obtained by iterating across all threads in + * the pool. This method may be useful for tuning task + * granularities. + */ + public long getQueuedTaskCount() { + return getExcecutor().getQueuedTaskCount(); + } + + /** + * Returns {@code true} if there are any tasks submitted to this + * pool that have not yet begun executing. + */ + public boolean hasQueuedSubmissions() { + return getExcecutor().hasQueuedSubmissions(); + } + + /** + * Returns {@code true} if all worker threads are currently idle. + * An idle worker is one that cannot obtain a task to execute + * because none are available to steal from other threads, and + * there are no pending submissions to the pool. This method is + * conservative; it might not return {@code true} immediately upon + * idleness of all threads, but will eventually become true if + * threads remain inactive. + */ + public boolean isQuiescent() { + return getExcecutor().isQuiescent(); + } + + private static class CLThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory { + + private int index = 0; + private final List<CLCommandQueue> queues; + + private CLThreadFactory(List<CLCommandQueue> queues) { + this.queues = queues; + } + + @Override + public synchronized ForkJoinWorkerThread newThread(ForkJoinPool pool) { + return new ForkJoinQueueWorkerThread(pool, queues.get(index++)); + } + + } + + final static class ForkJoinQueueWorkerThread extends ForkJoinWorkerThread implements CommandQueueThread { + + private final CLCommandQueue queue; + private final Map<Object, CLQueueContext> contextMap; + + public ForkJoinQueueWorkerThread(ForkJoinPool pool, CLCommandQueue queue) { + super(pool); + this.queue = queue; + this.contextMap = new HashMap<Object, CLQueueContext>(); + } + + @Override + public void run() { + super.run(); + //release threadlocal contexts + queue.finish(); + for (CLQueueContext context : contextMap.values()) { + context.release(); + } + } + + @Override + public Map<Object, CLQueueContext> getContextMap() { + return contextMap; + } + + @Override + public CLCommandQueue getQueue() { + return queue; + } + } + + +} diff --git a/src/com/jogamp/opencl/util/concurrent/CLRecursiveTask.java b/src/com/jogamp/opencl/util/concurrent/CLRecursiveTask.java new file mode 100644 index 00000000..216a527e --- /dev/null +++ b/src/com/jogamp/opencl/util/concurrent/CLRecursiveTask.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2011, Michael Bien + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are + * permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this list of + * conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this list + * of conditions and the following disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND + * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* + * Created on Tuesday, August 02 2011 02:02 + */ +package com.jogamp.opencl.util.concurrent; + +import com.jogamp.opencl.CLCommandQueue; +import com.jogamp.opencl.util.concurrent.CLAbstractExecutorService.CommandQueueThread; +import java.util.concurrent.RecursiveTask; + +/** + * A recursive decomposable task executed on a {@link CLCommandQueue}. + * @see RecursiveTask + * @author Michael Bien + */ +public abstract class CLRecursiveTask<C extends CLQueueContext, R> extends RecursiveTask<R> implements CLPoolable<C, R> { + + @Override + protected final R compute() { + + CommandQueueThread thread = (CommandQueueThread)Thread.currentThread(); + + final Object key = getContextKey(); + + CLQueueContext context = thread.getContextMap().get(key); + if(context == null) { + context = createQueueContext(thread.getQueue()); + thread.getContextMap().put(key, context); + } + + @SuppressWarnings("unchecked") + CLPoolable<CLQueueContext, R> task = (CLPoolable<CLQueueContext, R>) this; + + R result = task.execute(context); + + // TODO: currently only the root task supports finish actions +// if(mode.equals(FinishAction.FLUSH)) { +// context.queue.flush(); +// }else if(mode.equals(FinishAction.FINISH)) { +// context.queue.finish(); +// } + return result; + } + + /** + * Returns the context key for this task. Default implementation returns {@link #getClass()}. + */ + @Override + public Object getContextKey() { + return getClass(); + } + +} diff --git a/test/com/jogamp/opencl/util/concurrent/CLForkJoinTest.java b/test/com/jogamp/opencl/util/concurrent/CLForkJoinTest.java new file mode 100644 index 00000000..bfcff6ab --- /dev/null +++ b/test/com/jogamp/opencl/util/concurrent/CLForkJoinTest.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2011, Michael Bien + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are + * permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this list of + * conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this list + * of conditions and the following disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND + * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* + * Created on Tuesday, August 02 2011 22:53 + */ +package com.jogamp.opencl.util.concurrent; + +import com.jogamp.opencl.CLCommandQueue; +import com.jogamp.opencl.CLPlatform; +import com.jogamp.opencl.util.CLMultiContext; +import com.jogamp.opencl.util.concurrent.CLQueueContext.CLSingleProgramQueueContext; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import org.junit.Test; + +import static org.junit.Assert.*; +import static java.lang.System.*; + +/** + * + * @author Michael Bien + */ +public class CLForkJoinTest { + + private class LogicTest extends CLRecursiveTask<CLQueueContext.CLSingleProgramQueueContext, Integer> { + + private final int size; + + public LogicTest(int size) { + this.size = size; + } + + @Override + public CLSingleProgramQueueContext createQueueContext(CLCommandQueue queue) { + return new CLSingleProgramQueueContext(queue, "kernel void noop(void){}\n"); + } + + @Override + public Integer execute(CLSingleProgramQueueContext context) { + + assertNotNull(context); + assertTrue(context instanceof CLSingleProgramQueueContext); + +// out.println(Thread.currentThread()); + + if(size > 8) { + + LogicTest task2 = new LogicTest(size/2); + task2.fork(); + + LogicTest task1 = new LogicTest(size/2); + + return task1.compute() + task2.join(); + }else{ + return size; + } + + } + + } + + @Test + public void forkJoinTest() throws InterruptedException, ExecutionException { + + CLMultiContext mc = CLMultiContext.create(CLPlatform.listCLPlatforms()); + + try{ + + CLForkJoinPool pool = CLForkJoinPool.create(mc); + + final int size = 64; + LogicTest task = new LogicTest(size); + Future<Integer> future = pool.submit(task); + assertNotNull(future); + assertEquals(size, (int)future.get()); + + pool.release(); + + }finally{ + mc.release(); + } + + } + + +} |