diff options
author | Michael Bien <[email protected]> | 2011-08-03 16:57:50 +0200 |
---|---|---|
committer | Michael Bien <[email protected]> | 2011-08-03 16:57:50 +0200 |
commit | 9eb658932571d9d35bbd05b1527ffbb261adb7af (patch) | |
tree | a7a245b050d05fa3066134f1e0fba9b191036f96 /src/com/jogamp/opencl | |
parent | f8008024c3541f224705e2c293d68ffa2eafb347 (diff) |
initial import of CLForkJoinPool, a JOCL variant of the Java 7 ForkJoinPool + rudimentary test.
Diffstat (limited to 'src/com/jogamp/opencl')
-rw-r--r-- | src/com/jogamp/opencl/util/concurrent/CLForkJoinPool.java | 200 | ||||
-rw-r--r-- | src/com/jogamp/opencl/util/concurrent/CLRecursiveTask.java | 78 |
2 files changed, 278 insertions, 0 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLForkJoinPool.java b/src/com/jogamp/opencl/util/concurrent/CLForkJoinPool.java new file mode 100644 index 00000000..bf253442 --- /dev/null +++ b/src/com/jogamp/opencl/util/concurrent/CLForkJoinPool.java @@ -0,0 +1,200 @@ +/* + * Copyright (c) 2011, Michael Bien + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are + * permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this list of + * conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this list + * of conditions and the following disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND + * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* + * Created on Tuesday, August 02 2011 01:53 + */ +package com.jogamp.opencl.util.concurrent; + +import com.jogamp.opencl.CLCommandQueue; +import com.jogamp.opencl.CLDevice; +import com.jogamp.opencl.util.CLMultiContext; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinWorkerThread; +import java.util.concurrent.Future; + +/** + * JOCL implementation of the {@link ForkJoinPool}. + * @author Michael Bien + */ +public class CLForkJoinPool extends CLAbstractExecutorService { + + private CLForkJoinPool(ExecutorService executor, List<CLCommandQueue> queues) { + super(executor, queues); + } + + public static CLForkJoinPool create(CLMultiContext mc, CLCommandQueue.Mode... modes) { + return create(mc.getDevices(), modes); + } + + public static CLForkJoinPool create(Collection<? extends CLDevice> devices, CLCommandQueue.Mode... modes) { + List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(devices.size()); + for (CLDevice device : devices) { + queues.add(device.createCommandQueue(modes)); + } + return create(queues); + } + + public static CLForkJoinPool create(Collection<CLCommandQueue> queues) { + + List<CLCommandQueue> list = new ArrayList<CLCommandQueue>(queues); + + CLThreadFactory factory = new CLThreadFactory(list); + int size = list.size(); + + ExecutorService service = new ForkJoinPool(size, factory, null, false); + return new CLForkJoinPool(service, list); + } + + /** + * Performs the given task, returning its result upon completion. + * @see ForkJoinPool#invoke(java.util.concurrent.ForkJoinTask) + */ + public <R> R invoke(CLRecursiveTask<? extends CLQueueContext, R> task) { + // shortcut, prevents redundant wrapping + return getExcecutor().invoke(task); + } + + /** + * Submits this task to the pool for execution returning its {@link Future}. + * @see ForkJoinPool#submit(java.util.concurrent.ForkJoinTask) + */ + public <R> Future<R> submit(CLRecursiveTask<? extends CLQueueContext, R> task) { + // shortcut, prevents redundant wrapping + return getExcecutor().submit(task); + } + + @Override + ForkJoinPool getExcecutor() { + return (ForkJoinPool) super.getExcecutor(); + } + + /** + * Returns an estimate of the total number of tasks stolen from + * one thread's work queue by another. + */ + public long getStealCount() { + return getExcecutor().getStealCount(); + } + + /** + * Returns an estimate of the number of tasks submitted to this + * pool that have not yet begun executing. This method may take + * time proportional to the number of submissions. + */ + public int getQueuedSubmissionCount() { + return getExcecutor().getQueuedSubmissionCount(); + } + + /** + * Returns an estimate of the total number of tasks currently held + * in queues by worker threads (but not including tasks submitted + * to the pool that have not begun executing). This value is only + * an approximation, obtained by iterating across all threads in + * the pool. This method may be useful for tuning task + * granularities. + */ + public long getQueuedTaskCount() { + return getExcecutor().getQueuedTaskCount(); + } + + /** + * Returns {@code true} if there are any tasks submitted to this + * pool that have not yet begun executing. + */ + public boolean hasQueuedSubmissions() { + return getExcecutor().hasQueuedSubmissions(); + } + + /** + * Returns {@code true} if all worker threads are currently idle. + * An idle worker is one that cannot obtain a task to execute + * because none are available to steal from other threads, and + * there are no pending submissions to the pool. This method is + * conservative; it might not return {@code true} immediately upon + * idleness of all threads, but will eventually become true if + * threads remain inactive. + */ + public boolean isQuiescent() { + return getExcecutor().isQuiescent(); + } + + private static class CLThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory { + + private int index = 0; + private final List<CLCommandQueue> queues; + + private CLThreadFactory(List<CLCommandQueue> queues) { + this.queues = queues; + } + + @Override + public synchronized ForkJoinWorkerThread newThread(ForkJoinPool pool) { + return new ForkJoinQueueWorkerThread(pool, queues.get(index++)); + } + + } + + final static class ForkJoinQueueWorkerThread extends ForkJoinWorkerThread implements CommandQueueThread { + + private final CLCommandQueue queue; + private final Map<Object, CLQueueContext> contextMap; + + public ForkJoinQueueWorkerThread(ForkJoinPool pool, CLCommandQueue queue) { + super(pool); + this.queue = queue; + this.contextMap = new HashMap<Object, CLQueueContext>(); + } + + @Override + public void run() { + super.run(); + //release threadlocal contexts + queue.finish(); + for (CLQueueContext context : contextMap.values()) { + context.release(); + } + } + + @Override + public Map<Object, CLQueueContext> getContextMap() { + return contextMap; + } + + @Override + public CLCommandQueue getQueue() { + return queue; + } + } + + +} diff --git a/src/com/jogamp/opencl/util/concurrent/CLRecursiveTask.java b/src/com/jogamp/opencl/util/concurrent/CLRecursiveTask.java new file mode 100644 index 00000000..216a527e --- /dev/null +++ b/src/com/jogamp/opencl/util/concurrent/CLRecursiveTask.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2011, Michael Bien + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are + * permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this list of + * conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this list + * of conditions and the following disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND + * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* + * Created on Tuesday, August 02 2011 02:02 + */ +package com.jogamp.opencl.util.concurrent; + +import com.jogamp.opencl.CLCommandQueue; +import com.jogamp.opencl.util.concurrent.CLAbstractExecutorService.CommandQueueThread; +import java.util.concurrent.RecursiveTask; + +/** + * A recursive decomposable task executed on a {@link CLCommandQueue}. + * @see RecursiveTask + * @author Michael Bien + */ +public abstract class CLRecursiveTask<C extends CLQueueContext, R> extends RecursiveTask<R> implements CLPoolable<C, R> { + + @Override + protected final R compute() { + + CommandQueueThread thread = (CommandQueueThread)Thread.currentThread(); + + final Object key = getContextKey(); + + CLQueueContext context = thread.getContextMap().get(key); + if(context == null) { + context = createQueueContext(thread.getQueue()); + thread.getContextMap().put(key, context); + } + + @SuppressWarnings("unchecked") + CLPoolable<CLQueueContext, R> task = (CLPoolable<CLQueueContext, R>) this; + + R result = task.execute(context); + + // TODO: currently only the root task supports finish actions +// if(mode.equals(FinishAction.FLUSH)) { +// context.queue.flush(); +// }else if(mode.equals(FinishAction.FINISH)) { +// context.queue.finish(); +// } + return result; + } + + /** + * Returns the context key for this task. Default implementation returns {@link #getClass()}. + */ + @Override + public Object getContextKey() { + return getClass(); + } + +} |