summaryrefslogtreecommitdiffstats
path: root/src/com
diff options
context:
space:
mode:
Diffstat (limited to 'src/com')
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLForkJoinPool.java200
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLRecursiveTask.java78
2 files changed, 278 insertions, 0 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLForkJoinPool.java b/src/com/jogamp/opencl/util/concurrent/CLForkJoinPool.java
new file mode 100644
index 00000000..bf253442
--- /dev/null
+++ b/src/com/jogamp/opencl/util/concurrent/CLForkJoinPool.java
@@ -0,0 +1,200 @@
+/*
+ * Copyright (c) 2011, Michael Bien
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without modification, are
+ * permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this list of
+ * conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice, this list
+ * of conditions and the following disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+ * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/*
+ * Created on Tuesday, August 02 2011 01:53
+ */
+package com.jogamp.opencl.util.concurrent;
+
+import com.jogamp.opencl.CLCommandQueue;
+import com.jogamp.opencl.CLDevice;
+import com.jogamp.opencl.util.CLMultiContext;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinWorkerThread;
+import java.util.concurrent.Future;
+
+/**
+ * JOCL implementation of the {@link ForkJoinPool}.
+ * @author Michael Bien
+ */
+public class CLForkJoinPool extends CLAbstractExecutorService {
+
+ private CLForkJoinPool(ExecutorService executor, List<CLCommandQueue> queues) {
+ super(executor, queues);
+ }
+
+ public static CLForkJoinPool create(CLMultiContext mc, CLCommandQueue.Mode... modes) {
+ return create(mc.getDevices(), modes);
+ }
+
+ public static CLForkJoinPool create(Collection<? extends CLDevice> devices, CLCommandQueue.Mode... modes) {
+ List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(devices.size());
+ for (CLDevice device : devices) {
+ queues.add(device.createCommandQueue(modes));
+ }
+ return create(queues);
+ }
+
+ public static CLForkJoinPool create(Collection<CLCommandQueue> queues) {
+
+ List<CLCommandQueue> list = new ArrayList<CLCommandQueue>(queues);
+
+ CLThreadFactory factory = new CLThreadFactory(list);
+ int size = list.size();
+
+ ExecutorService service = new ForkJoinPool(size, factory, null, false);
+ return new CLForkJoinPool(service, list);
+ }
+
+ /**
+ * Performs the given task, returning its result upon completion.
+ * @see ForkJoinPool#invoke(java.util.concurrent.ForkJoinTask)
+ */
+ public <R> R invoke(CLRecursiveTask<? extends CLQueueContext, R> task) {
+ // shortcut, prevents redundant wrapping
+ return getExcecutor().invoke(task);
+ }
+
+ /**
+ * Submits this task to the pool for execution returning its {@link Future}.
+ * @see ForkJoinPool#submit(java.util.concurrent.ForkJoinTask)
+ */
+ public <R> Future<R> submit(CLRecursiveTask<? extends CLQueueContext, R> task) {
+ // shortcut, prevents redundant wrapping
+ return getExcecutor().submit(task);
+ }
+
+ @Override
+ ForkJoinPool getExcecutor() {
+ return (ForkJoinPool) super.getExcecutor();
+ }
+
+ /**
+ * Returns an estimate of the total number of tasks stolen from
+ * one thread's work queue by another.
+ */
+ public long getStealCount() {
+ return getExcecutor().getStealCount();
+ }
+
+ /**
+ * Returns an estimate of the number of tasks submitted to this
+ * pool that have not yet begun executing. This method may take
+ * time proportional to the number of submissions.
+ */
+ public int getQueuedSubmissionCount() {
+ return getExcecutor().getQueuedSubmissionCount();
+ }
+
+ /**
+ * Returns an estimate of the total number of tasks currently held
+ * in queues by worker threads (but not including tasks submitted
+ * to the pool that have not begun executing). This value is only
+ * an approximation, obtained by iterating across all threads in
+ * the pool. This method may be useful for tuning task
+ * granularities.
+ */
+ public long getQueuedTaskCount() {
+ return getExcecutor().getQueuedTaskCount();
+ }
+
+ /**
+ * Returns {@code true} if there are any tasks submitted to this
+ * pool that have not yet begun executing.
+ */
+ public boolean hasQueuedSubmissions() {
+ return getExcecutor().hasQueuedSubmissions();
+ }
+
+ /**
+ * Returns {@code true} if all worker threads are currently idle.
+ * An idle worker is one that cannot obtain a task to execute
+ * because none are available to steal from other threads, and
+ * there are no pending submissions to the pool. This method is
+ * conservative; it might not return {@code true} immediately upon
+ * idleness of all threads, but will eventually become true if
+ * threads remain inactive.
+ */
+ public boolean isQuiescent() {
+ return getExcecutor().isQuiescent();
+ }
+
+ private static class CLThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {
+
+ private int index = 0;
+ private final List<CLCommandQueue> queues;
+
+ private CLThreadFactory(List<CLCommandQueue> queues) {
+ this.queues = queues;
+ }
+
+ @Override
+ public synchronized ForkJoinWorkerThread newThread(ForkJoinPool pool) {
+ return new ForkJoinQueueWorkerThread(pool, queues.get(index++));
+ }
+
+ }
+
+ final static class ForkJoinQueueWorkerThread extends ForkJoinWorkerThread implements CommandQueueThread {
+
+ private final CLCommandQueue queue;
+ private final Map<Object, CLQueueContext> contextMap;
+
+ public ForkJoinQueueWorkerThread(ForkJoinPool pool, CLCommandQueue queue) {
+ super(pool);
+ this.queue = queue;
+ this.contextMap = new HashMap<Object, CLQueueContext>();
+ }
+
+ @Override
+ public void run() {
+ super.run();
+ //release threadlocal contexts
+ queue.finish();
+ for (CLQueueContext context : contextMap.values()) {
+ context.release();
+ }
+ }
+
+ @Override
+ public Map<Object, CLQueueContext> getContextMap() {
+ return contextMap;
+ }
+
+ @Override
+ public CLCommandQueue getQueue() {
+ return queue;
+ }
+ }
+
+
+}
diff --git a/src/com/jogamp/opencl/util/concurrent/CLRecursiveTask.java b/src/com/jogamp/opencl/util/concurrent/CLRecursiveTask.java
new file mode 100644
index 00000000..216a527e
--- /dev/null
+++ b/src/com/jogamp/opencl/util/concurrent/CLRecursiveTask.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2011, Michael Bien
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without modification, are
+ * permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this list of
+ * conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice, this list
+ * of conditions and the following disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+ * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/*
+ * Created on Tuesday, August 02 2011 02:02
+ */
+package com.jogamp.opencl.util.concurrent;
+
+import com.jogamp.opencl.CLCommandQueue;
+import com.jogamp.opencl.util.concurrent.CLAbstractExecutorService.CommandQueueThread;
+import java.util.concurrent.RecursiveTask;
+
+/**
+ * A recursive decomposable task executed on a {@link CLCommandQueue}.
+ * @see RecursiveTask
+ * @author Michael Bien
+ */
+public abstract class CLRecursiveTask<C extends CLQueueContext, R> extends RecursiveTask<R> implements CLPoolable<C, R> {
+
+ @Override
+ protected final R compute() {
+
+ CommandQueueThread thread = (CommandQueueThread)Thread.currentThread();
+
+ final Object key = getContextKey();
+
+ CLQueueContext context = thread.getContextMap().get(key);
+ if(context == null) {
+ context = createQueueContext(thread.getQueue());
+ thread.getContextMap().put(key, context);
+ }
+
+ @SuppressWarnings("unchecked")
+ CLPoolable<CLQueueContext, R> task = (CLPoolable<CLQueueContext, R>) this;
+
+ R result = task.execute(context);
+
+ // TODO: currently only the root task supports finish actions
+// if(mode.equals(FinishAction.FLUSH)) {
+// context.queue.flush();
+// }else if(mode.equals(FinishAction.FINISH)) {
+// context.queue.finish();
+// }
+ return result;
+ }
+
+ /**
+ * Returns the context key for this task. Default implementation returns {@link #getClass()}.
+ */
+ @Override
+ public Object getContextKey() {
+ return getClass();
+ }
+
+}