diff options
author | Michael Bien <[email protected]> | 2010-01-14 17:38:34 +0100 |
---|---|---|
committer | Michael Bien <[email protected]> | 2010-01-14 17:38:34 +0100 |
commit | 9e650242da44a939e6a4c1e3c06d77c2e668a3e0 (patch) | |
tree | b0272560299430beb79915c9f701994bc40a10d5 | |
parent | 9343c3ef5829f74207a8d220cb3b082211b910f2 (diff) |
cleaned up NioDirectOnly list, added clSetKernelArg to list.
added experimental QueueBarrier for easy synchronization between multiple concurrent CLCommandQueues.
refactored CLCommandQueue, added putTask().
added another concurrency JUnit test.
-rw-r--r-- | resources/cl-common.cfg | 25 | ||||
-rw-r--r-- | src/com/mbien/opencl/CLCommandQueue.java | 56 | ||||
-rw-r--r-- | src/com/mbien/opencl/CLKernel.java | 18 | ||||
-rw-r--r-- | src/com/mbien/opencl/QueueBarrier.java | 48 | ||||
-rw-r--r-- | test/com/mbien/opencl/CLConcurrencyTest.java | 126 |
5 files changed, 229 insertions, 44 deletions
diff --git a/resources/cl-common.cfg b/resources/cl-common.cfg index 16b2017d..470c562d 100644 --- a/resources/cl-common.cfg +++ b/resources/cl-common.cfg @@ -29,30 +29,27 @@ Ignore CL_ULONG_MAX #enforce client side "good behavior" by generating direct-memory-only bindings for #performance critical functions. #NioDirectOnly __ALL__ +#command queue ops NioDirectOnly clEnqueueBarrier -NioDirectOnly clEnqueueMarker -NioDirectOnly clEnqueueNativeKernel -NioDirectOnly clEnqueueUnmapMemObject -NioDirectOnly clEnqueueWaitForEvents -NioDirectOnly clEnqueueWriteImage -NioDirectOnly clEnqueueReadBuffer -NioDirectOnly clEnqueueWriteBuffer -NioDirectOnly clEnqueueReadImage -NioDirectOnly clEnqueueWriteImage +NioDirectOnly clEnqueueCopyBuffer +NioDirectOnly clEnqueueCopyBufferToImage NioDirectOnly clEnqueueCopyImage NioDirectOnly clEnqueueCopyImageToBuffer -NioDirectOnly clEnqueueCopyBufferToImage NioDirectOnly clEnqueueMapBuffer NioDirectOnly clEnqueueMapImage -NioDirectOnly clEnqueueUnmapMemObject +NioDirectOnly clEnqueueMarker NioDirectOnly clEnqueueNDRangeKernel +NioDirectOnly clEnqueueReadBuffer +NioDirectOnly clEnqueueReadImage NioDirectOnly clEnqueueTask -NioDirectOnly clEnqueueNativeKernel -NioDirectOnly clEnqueueMarker +NioDirectOnly clEnqueueUnmapMemObject NioDirectOnly clEnqueueWaitForEvents -NioDirectOnly clEnqueueBarrier +NioDirectOnly clEnqueueWriteBuffer +NioDirectOnly clEnqueueWriteImage NioDirectOnly clEnqueueAcquireGLObjects NioDirectOnly clEnqueueReleaseGLObjects +#kernel ops +NioDirectOnly clSetKernelArg #common rename emitted struct accessors #struct cl_image_format diff --git a/src/com/mbien/opencl/CLCommandQueue.java b/src/com/mbien/opencl/CLCommandQueue.java index 38ec7274..4e88ff1d 100644 --- a/src/com/mbien/opencl/CLCommandQueue.java +++ b/src/com/mbien/opencl/CLCommandQueue.java @@ -48,11 +48,10 @@ public class CLCommandQueue implements CLResource { } public CLCommandQueue putWriteBuffer(CLBuffer<?> writeBuffer, boolean blockingRead) { - return putWriteBuffer(writeBuffer, null, blockingRead); + return putWriteBuffer(writeBuffer, blockingRead, null); } - public CLCommandQueue putWriteBuffer(CLBuffer<?> writeBuffer, CLEventList events, boolean blockingWrite) { - PointerBuffer pb = PointerBuffer.allocateDirect(2); + public CLCommandQueue putWriteBuffer(CLBuffer<?> writeBuffer, boolean blockingWrite, CLEventList events) { int ret = cl.clEnqueueWriteBuffer( ID, writeBuffer.ID, blockingWrite ? CL_TRUE : CL_FALSE, @@ -70,11 +69,11 @@ public class CLCommandQueue implements CLResource { } public CLCommandQueue putReadBuffer(CLBuffer<?> readBuffer, boolean blockingRead) { - putReadBuffer(readBuffer, null, blockingRead); + putReadBuffer(readBuffer, blockingRead, null); return this; } - public CLCommandQueue putReadBuffer(CLBuffer<?> readBuffer, CLEventList events, boolean blockingRead) { + public CLCommandQueue putReadBuffer(CLBuffer<?> readBuffer, boolean blockingRead, CLEventList events) { int ret = cl.clEnqueueReadBuffer( ID, readBuffer.ID, blockingRead ? CL_TRUE : CL_FALSE, @@ -150,11 +149,6 @@ public class CLCommandQueue implements CLResource { return this; } - public CLCommandQueue putTask() { - - return this; - } - public CLBuffer putMapBuffer() { return null; @@ -198,12 +192,35 @@ public class CLCommandQueue implements CLResource { return this; } + /** + * {@link #putTask} equivalent to calling + * {@link #put1DRangeKernel(CLKernel kernel, long globalWorkOffset, long globalWorkSize, long localWorkSize)} + * with globalWorkOffset = null, globalWorkSize set to 1, and localWorkSize set to 1. + */ + public CLCommandQueue putTask(CLKernel kernel) { + int ret = cl.clEnqueueTask(ID, kernel.ID, 0, null, null); + checkForError(ret, "can not enqueue Task"); + return this; + } + + /** + * @see #putTask(com.mbien.opencl.CLKernel) + */ + public CLCommandQueue putTask(CLKernel kernel, CLEventList events) { + int ret = cl.clEnqueueTask(ID, kernel.ID, 0, null, events==null ? null : events.IDs); + checkForError(ret, "can not enqueue Task"); + if(events != null) { + events.createEvent(context); + } + return this; + } + public CLCommandQueue put1DRangeKernel(CLKernel kernel, long globalWorkOffset, long globalWorkSize, long localWorkSize) { - this.put1DRangeKernel(kernel, null, globalWorkOffset, globalWorkSize, localWorkSize); + this.put1DRangeKernel(kernel, globalWorkOffset, globalWorkSize, localWorkSize, null); return this; } - public CLCommandQueue put1DRangeKernel(CLKernel kernel, CLEventList events, long globalWorkOffset, long globalWorkSize, long localWorkSize) { + public CLCommandQueue put1DRangeKernel(CLKernel kernel, long globalWorkOffset, long globalWorkSize, long localWorkSize, CLEventList events) { PointerBuffer globWO = null; PointerBuffer globWS = null; PointerBuffer locWS = null; @@ -218,25 +235,24 @@ public class CLCommandQueue implements CLResource { locWS = bufferC.put(1, localWorkSize).position(1); } - this.putNDRangeKernel(kernel, events, 1, globWO, globWS, locWS); + this.putNDRangeKernel(kernel, 1, globWO, globWS, locWS, events); return this; } public CLCommandQueue put2DRangeKernel(CLKernel kernel, long globalWorkOffsetX, long globalWorkOffsetY, long globalWorkSizeX, long globalWorkSizeY, long localWorkSizeX, long localWorkSizeY) { - this.put2DRangeKernel(kernel, null, + this.put2DRangeKernel(kernel, globalWorkOffsetX, globalWorkOffsetY, globalWorkSizeX, globalWorkSizeY, - localWorkSizeX, localWorkSizeY); + localWorkSizeX, localWorkSizeY, null); return this; } - public CLCommandQueue put2DRangeKernel(CLKernel kernel, CLEventList events, - long globalWorkOffsetX, long globalWorkOffsetY, + public CLCommandQueue put2DRangeKernel(CLKernel kernel, long globalWorkOffsetX, long globalWorkOffsetY, long globalWorkSizeX, long globalWorkSizeY, - long localWorkSizeX, long localWorkSizeY) { + long localWorkSizeX, long localWorkSizeY, CLEventList events) { PointerBuffer globalWorkOffset = null; PointerBuffer globalWorkSize = null; PointerBuffer localWorkSize = null; @@ -255,11 +271,11 @@ public class CLCommandQueue implements CLResource { } public CLCommandQueue putNDRangeKernel(CLKernel kernel, int workDimension, PointerBuffer globalWorkOffset, PointerBuffer globalWorkSize, PointerBuffer localWorkSize) { - this.putNDRangeKernel(kernel, null, workDimension, globalWorkOffset, globalWorkSize, localWorkSize); + this.putNDRangeKernel(kernel, workDimension, globalWorkOffset, globalWorkSize, localWorkSize, null); return this; } - public CLCommandQueue putNDRangeKernel(CLKernel kernel, CLEventList events, int workDimension, PointerBuffer globalWorkOffset, PointerBuffer globalWorkSize, PointerBuffer localWorkSize) { + public CLCommandQueue putNDRangeKernel(CLKernel kernel, int workDimension, PointerBuffer globalWorkOffset, PointerBuffer globalWorkSize, PointerBuffer localWorkSize, CLEventList events) { int ret = cl.clEnqueueNDRangeKernel( ID, kernel.ID, workDimension, diff --git a/src/com/mbien/opencl/CLKernel.java b/src/com/mbien/opencl/CLKernel.java index e7ac4b4d..2115a9f8 100644 --- a/src/com/mbien/opencl/CLKernel.java +++ b/src/com/mbien/opencl/CLKernel.java @@ -26,12 +26,15 @@ public class CLKernel implements CLResource { private final CLProgram program; private final CL cl; + private final ByteBuffer buffer; + private int argIndex; CLKernel(CLProgram program, long id) { this.ID = id; this.program = program; this.cl = program.context.cl; + this.buffer = BufferFactory.newDirectByteBuffer(8); long[] longArray = new long[1]; @@ -136,19 +139,19 @@ public class CLKernel implements CLResource { } private final Buffer wrap(float value) { - return BufferFactory.newDirectByteBuffer(4).putFloat(value).rewind(); + return buffer.putFloat(value).rewind(); } private final Buffer wrap(double value) { - return BufferFactory.newDirectByteBuffer(8).putDouble(value).rewind(); + return buffer.putDouble(value).rewind(); } private final Buffer wrap(int value) { - return BufferFactory.newDirectByteBuffer(4).putInt(value).rewind(); + return buffer.putInt(value).rewind(); } private final Buffer wrap(long value) { - return BufferFactory.newDirectByteBuffer(8).putLong(value).rewind(); + return buffer.putLong(value).rewind(); } public CLKernel rewind() { @@ -196,5 +199,12 @@ public class CLKernel implements CLResource { hash = 43 * hash + (this.program != null ? this.program.hashCode() : 0); return hash; } + + CLKernel copy() { + int[] err = new int[1]; + long newID = cl.clCreateKernel(program.ID, name, err, 0); + checkForError(err[0], "can not copy kernel"); + return new CLKernel(program, newID); + } } diff --git a/src/com/mbien/opencl/QueueBarrier.java b/src/com/mbien/opencl/QueueBarrier.java new file mode 100644 index 00000000..247ede7a --- /dev/null +++ b/src/com/mbien/opencl/QueueBarrier.java @@ -0,0 +1,48 @@ +package com.mbien.opencl; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * + * @author Michael Bien + */ +public class QueueBarrier { + + private final CountDownLatch latch; + + public QueueBarrier(int queueCount) { + this.latch = new CountDownLatch(queueCount); + } + + /** + * Blocks the current Thread until the given events on the CLCommandQueue occurred. + * This method may be invoked concurrently without synchronization on the QueueBarrier object + * as long each Thread passes a distinct CLCommandQueue as parameter to this method. + */ + public QueueBarrier waitFor(CLCommandQueue queue, CLEventList events) { + queue.putWaitForEvents(events); + latch.countDown(); + return this; + } + + /** + * Blocks until all Threads which called {@link #waitFor} + * continue excecution. + * This method blocks only once, all subsequent calls are ignored. + */ + public QueueBarrier await() throws InterruptedException { + latch.await(); + return this; + } + /** + * @see {@link #await()} + * @param timeout the maximum time to wait + * @param unit the time unit of the {@code timeout} argument + */ + public QueueBarrier await(long timeout, TimeUnit unit) throws InterruptedException { + latch.await(timeout, unit); + return this; + } + +} diff --git a/test/com/mbien/opencl/CLConcurrencyTest.java b/test/com/mbien/opencl/CLConcurrencyTest.java index 057b57ee..dfaa4e49 100644 --- a/test/com/mbien/opencl/CLConcurrencyTest.java +++ b/test/com/mbien/opencl/CLConcurrencyTest.java @@ -1,7 +1,6 @@ package com.mbien.opencl; import com.mbien.opencl.CLBuffer.Mem; -import com.mbien.opencl.CLCommandQueue.Mode; import java.io.IOException; import java.nio.ByteBuffer; import org.junit.Test; @@ -45,9 +44,8 @@ public class CLConcurrencyTest { assertEquals(0, events.size()); - // asynchronous write of data to GPU device, blocking read later to get the computed results back. - queue.putWriteBuffer(clBufferA, events, false) // write A - .putWriteBuffer(clBufferB, events, false); // write B + queue.putWriteBuffer(clBufferA, false, events) // write A + .putWriteBuffer(clBufferB, false, events); // write B assertEquals(2, events.size()); queue.putWaitForEvents(events); @@ -56,10 +54,10 @@ public class CLConcurrencyTest { assertEquals(0, events.size()); vectorAddKernel.setArgs(clBufferA, clBufferB, clBufferC); // C = A+B - queue.put1DRangeKernel(vectorAddKernel, events, 0, elements, 256); + queue.put1DRangeKernel(vectorAddKernel, 0, elements, 256, events); vectorAddKernel.setArgs(clBufferA, clBufferB, clBufferD); // D = A+B - queue.put1DRangeKernel(vectorAddKernel, events, 0, elements, 256); + queue.put1DRangeKernel(vectorAddKernel, 0, elements, 256, events); assertEquals(2, events.size()); queue.putWaitForEvent(events, 0) @@ -80,4 +78,120 @@ public class CLConcurrencyTest { } + @Test + public void concurrencyTest() throws IOException, InterruptedException { + + out.println(" - - - queue synchronisation test - - - "); + + final int elements = ONE_MB/SIZEOF_INT * 10; // 20MB per buffer + + CLContext context = CLContext.create(); + + CLDevice[] devices = context.getCLDevices(); + + if(devices.length < 2) { + out.println("aborting test... need at least 2 devices"); + context.release(); + return; + } + + final CLBuffer<ByteBuffer> clBufferC = context.createByteBuffer(elements*SIZEOF_INT, Mem.READ_ONLY); + final CLBuffer<ByteBuffer> clBufferD = context.createByteBuffer(elements*SIZEOF_INT, Mem.READ_ONLY); + + final CLBuffer<ByteBuffer> clBufferA1 = context.createByteBuffer(elements*SIZEOF_INT, Mem.READ_ONLY); + final CLBuffer<ByteBuffer> clBufferB1 = context.createByteBuffer(elements*SIZEOF_INT, Mem.READ_ONLY); + final CLBuffer<ByteBuffer> clBufferA2 = context.createByteBuffer(elements*SIZEOF_INT, Mem.READ_ONLY); + final CLBuffer<ByteBuffer> clBufferB2 = context.createByteBuffer(elements*SIZEOF_INT, Mem.READ_ONLY); + + CLProgram program = context.createProgram(getClass().getResourceAsStream("testkernels.cl")).build(); + + final CLKernel vectorAddKernel1 = program.getCLKernel("VectorAddGM") + .setArg(3, elements); + + //TODO introduce public api for cloning/creating kernels + final CLKernel vectorAddKernel2 = vectorAddKernel1.copy() + .setArg(3, elements); + + + int secondDevice = devices.length > 1 ? 1 : 0; + + final CLCommandQueue queue1 = devices[0 ].createCommandQueue(); + final CLCommandQueue queue2 = devices[secondDevice].createCommandQueue(); + + if(secondDevice > 0) + System.out.println("using two devices"); + + final QueueBarrier barrier = new QueueBarrier(2); + + Thread thread1 = new Thread("C") { + + @Override + public void run() { + + fillBuffer(clBufferA1.buffer, 12345); + fillBuffer(clBufferB1.buffer, 67890); + +// System.out.println("C buffer"); + queue1.putWriteBuffer(clBufferA1, false) // write A + .putWriteBuffer(clBufferB1, true); // write B + +// System.out.println("C args"); + vectorAddKernel1.setArgs(clBufferA1, clBufferB1, clBufferC); // C = A+B + +// System.out.println("C kernels"); + CLEventList events1 = new CLEventList(2); + queue1.put1DRangeKernel(vectorAddKernel1, 0, elements, 256, events1) + .putReadBuffer(clBufferC, false, events1); + + barrier.waitFor(queue1, events1); + + } + + }; + + Thread thread2 = new Thread("D") { + + @Override + public void run() { + + fillBuffer(clBufferA2.buffer, 12345); + fillBuffer(clBufferB2.buffer, 67890); + +// System.out.println("D buffer"); + queue2.putWriteBuffer(clBufferA2, false) // write A + .putWriteBuffer(clBufferB2, true); // write B + +// System.out.println("D args"); + vectorAddKernel2.setArgs(clBufferA2, clBufferB2, clBufferD); // D = A+B + +// System.out.println("D kernels"); + CLEventList events2 = new CLEventList(2); + queue2.put1DRangeKernel(vectorAddKernel2, 0, elements, 256, events2) + .putReadBuffer(clBufferD, false, events2); + + barrier.waitFor(queue2, events2); + + } + + }; + + out.println("starting threads"); + thread1.start(); + thread2.start(); + barrier.await(); + out.println("done"); + + checkIfEqual(clBufferC.buffer, clBufferD.buffer, elements); + + context.release(); + +// vectorAddKernel2.release(); + + out.println("results are valid"); + + } + + + + }
\ No newline at end of file |