aboutsummaryrefslogtreecommitdiffstats
path: root/src/java/com
diff options
context:
space:
mode:
authorSven Gothel <[email protected]>2013-08-22 23:39:58 +0200
committerSven Gothel <[email protected]>2013-08-22 23:39:58 +0200
commit30475c6bbeb9a5d48899b281ead8bb305679028d (patch)
treeb3c0875107448e13d0715bc9fa935f6f169b78a0 /src/java/com
parent77687335f7fae3727c902c678b9525e6f4631da1 (diff)
Add Ringbuffer interface an 2 implementations, synchronized (locking) SyncedRingbuffer and lock-free LFRingbuffer.
SyncedRingbuffer is moved from JOGL to GlueGen, and generalized w/ common interface Ringbuffer to allow testing diff. implementations. - Added Ringbuffer.AllocEmptyArray factory interface, allowing to pass a constructor to construct the generic array. - Added functionality is growBuffer(..), allowing to either grow a full or empty buffer, using Ringbuffer.AllocEmptyArray. - Removed explicit 'clearRef' at get*(..), always clear the taken reference for better interface generalization. - Added LFRingbuffer, exposing lock-free get*(..) and put*(..) methods using the 'Always Keep One Slot Open' pattern using the read/write index as barriers only. - Ctor's copy an optional passed user array into the internal array, utilizing Ringbuffer.AllocEmptyArray. - Added unit tests.
Diffstat (limited to 'src/java/com')
-rw-r--r--src/java/com/jogamp/common/util/LFRingbuffer.java401
-rw-r--r--src/java/com/jogamp/common/util/Ringbuffer.java214
-rw-r--r--src/java/com/jogamp/common/util/SyncedRingbuffer.java381
3 files changed, 996 insertions, 0 deletions
diff --git a/src/java/com/jogamp/common/util/LFRingbuffer.java b/src/java/com/jogamp/common/util/LFRingbuffer.java
new file mode 100644
index 0000000..a6b441a
--- /dev/null
+++ b/src/java/com/jogamp/common/util/LFRingbuffer.java
@@ -0,0 +1,401 @@
+/**
+ * Copyright 2013 JogAmp Community. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without modification, are
+ * permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this list of
+ * conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice, this list
+ * of conditions and the following disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY JogAmp Community ``AS IS'' AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+ * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JogAmp Community OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and documentation are those of the
+ * authors and should not be interpreted as representing official policies, either expressed
+ * or implied, of JogAmp Community.
+ */
+
+package com.jogamp.common.util;
+
+import java.io.PrintStream;
+
+/**
+ * Simple implementation of {@link Ringbuffer},
+ * exposing <i>lock-free</i>
+ * {@link #get() get*(..)} and {@link #put(Object) put*(..)} methods.
+ * <p>
+ * Implementation utilizes the <i>Always Keep One Slot Open</i>,
+ * hence implementation maintains an internal array of <code>capacity</code> <i>plus one</i>!
+ * </p>
+ * <p>
+ * Implementation is thread safe if:
+ * <ul>
+ * <li>{@link #get() get*(..)} operations are performed from one thread only.</li>
+ * <li>{@link #put(Object) put*(..)} operations are performed from one thread only.</li>
+ * <li>{@link #get() get*(..)} and {@link #put(Object) put*(..)} thread may be the same.</li>
+ * </ul>
+ * </p>
+ * <p>
+ * Following methods utilize global synchronization:
+ * <ul>
+ * <li>{@link #resetFull(Object[])}</li>
+ * <li>{@link #clear()}</li>
+ * <li>{@link #growBuffer(Object[], int, AllocEmptyArray)}</li>
+ * </ul>
+ * User needs to synchronize above methods w/ the lock-free
+ * w/ {@link #get() get*(..)} and {@link #put(Object) put*(..)} methods,
+ * e.g. by controlling their threads before invoking the above.
+ * </p>
+ * <p>
+ * Characteristics:
+ * <ul>
+ * <li>Read position points to the last read element.</li>
+ * <li>Write position points to the last written element.</li>
+ * </ul>
+ * <table border="1">
+ * <tr><td>Empty</td><td>writePos == readPos</td><td>size == 0</td></tr>
+ * <tr><td>Full</td><td>writePos == readPos - 1</td><td>size == capacity</td></tr>
+ * </table>
+ * </p>
+ */
+public class LFRingbuffer<T> implements Ringbuffer<T> {
+
+ private final Object syncRead = new Object();
+ private final Object syncWrite = new Object();
+ private final Object syncGlobal = new Object();
+ private /* final */ volatile T[] array; // not final due to grow
+ private /* final */ volatile int capacityPlusOne; // not final due to grow
+ private volatile int readPos;
+ private volatile int writePos;
+ private volatile int size;
+
+ public final String toString() {
+ return "LFRingbuffer<?>[filled "+size+" / "+(capacityPlusOne-1)+", writePos "+writePos+", readPos "+readPos+"]";
+ }
+
+ public final void dump(PrintStream stream, String prefix) {
+ stream.println(prefix+" "+toString()+" {");
+ for(int i=0; i<capacityPlusOne; i++) {
+ stream.println("\t["+i+"]: "+array[i]);
+ }
+ stream.println("}");
+ }
+
+ /**
+ * Create a full ring buffer instance w/ the given array's net capacity and content.
+ * <p>
+ * Example for a 10 element Integer array:
+ * <pre>
+ * Integer[] source = new Integer[10];
+ * // fill source with content ..
+ * Ringbuffer<Integer> rb = new LFRingbuffer<Integer>(source, new Ringbuffer.AllocEmptyArray<Integer>() {
+ * public Integer[] newArray(int size) {
+ * return new Integer[size];
+ * } } );
+ * </pre>
+ * </p>
+ * <p>
+ * {@link #isFull()} returns true on the newly created full ring buffer.
+ * </p>
+ * <p>
+ * Implementation will allocate an internal array with size of array <code>copyFrom</code> <i>plus one</i>,
+ * and copy all elements from array <code>copyFrom</code> into the internal array.
+ * </p>
+ * @param copyFrom mandatory source array determining ring buffer's net {@link #capacity()} and initial content.
+ * @param allocEmptyArray implementation hook to allocate a new empty array of generic type T
+ * @throws IllegalArgumentException if <code>copyFrom</code> is <code>null</code>
+ */
+ public LFRingbuffer(T[] copyFrom, AllocEmptyArray<T> allocEmptyArray) throws IllegalArgumentException {
+ capacityPlusOne = copyFrom.length + 1;
+ array = allocEmptyArray.newArray(capacityPlusOne);
+ resetImpl(true, copyFrom);
+ }
+
+ /**
+ * Create an empty ring buffer instance w/ the given net <code>capacity</code>.
+ * <p>
+ * Example for a 10 element Integer array:
+ * <pre>
+ * Ringbuffer<Integer> rb = new LFRingbuffer<Integer>(10, new Ringbuffer.AllocEmptyArray<Integer>() {
+ * public Integer[] newArray(int size) {
+ * return new Integer[size];
+ * } } );
+ * </pre>
+ * </p>
+ * <p>
+ * {@link #isEmpty()} returns true on the newly created empty ring buffer.
+ * </p>
+ * <p>
+ * Implementation will allocate an internal array of size <code>capacity</code> <i>plus one</i>.
+ * </p>
+ * @param capacity the initial net capacity of the ring buffer
+ * @param allocEmptyArray implementation hook to allocate a new empty array of generic type T
+ */
+ public LFRingbuffer(int capacity, AllocEmptyArray<T> allocEmptyArray) {
+ capacityPlusOne = capacity+1;
+ array = allocEmptyArray.newArray(capacityPlusOne);
+ resetImpl(false, null);
+ }
+
+ @Override
+ public final T[] getInternalArray() { return array; }
+
+ @Override
+ public final int capacity() { return capacityPlusOne-1; }
+
+ @Override
+ public final void clear() {
+ synchronized ( syncGlobal ) {
+ resetImpl(false, null);
+ for(int i=0; i<capacityPlusOne; i++) {
+ this.array[i] = null;
+ }
+ }
+ }
+
+ @Override
+ public final void resetFull(T[] copyFrom) throws IllegalArgumentException {
+ resetImpl(true, copyFrom);
+ }
+
+ private final void resetImpl(boolean full, T[] copyFrom) throws IllegalArgumentException {
+ synchronized ( syncGlobal ) {
+ if( null != copyFrom ) {
+ if( copyFrom.length != capacityPlusOne-1 ) {
+ throw new IllegalArgumentException("copyFrom array length "+copyFrom.length+" != capacity "+this);
+ }
+ System.arraycopy(copyFrom, 0, array, 0, copyFrom.length);
+ array[capacityPlusOne-1] = null; // null 'plus-one' field!
+ } else if( full ) {
+ throw new IllegalArgumentException("copyFrom array is null");
+ }
+ readPos = capacityPlusOne - 1;
+ if( full ) {
+ writePos = readPos - 1;
+ size = capacityPlusOne - 1;
+ } else {
+ writePos = readPos;
+ size = 0;
+ }
+ }
+ }
+
+ @Override
+ public final int size() { return size; }
+
+ @Override
+ public final int getFreeSlots() { return capacityPlusOne - 1 - size; }
+
+ @Override
+ public final boolean isEmpty() { return 0 == size; }
+
+ @Override
+ public final boolean isFull() { return capacityPlusOne - 1 == size; }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Implementation advances the read position and returns the element at it, if not empty.
+ * </p>
+ */
+ @Override
+ public final T get() {
+ try {
+ return getImpl(false, false);
+ } catch (InterruptedException ie) { throw new RuntimeException(ie); }
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Implementation advances the read position and returns the element at it, if not empty.
+ * </p>
+ */
+ @Override
+ public final T getBlocking() throws InterruptedException {
+ return getImpl(true, false);
+ }
+
+ @Override
+ public final T peek() {
+ try {
+ return getImpl(false, true);
+ } catch (InterruptedException ie) { throw new RuntimeException(ie); }
+ }
+ @Override
+ public final T peekBlocking() throws InterruptedException {
+ return getImpl(true, true);
+ }
+
+ private final T getImpl(boolean blocking, boolean peek) throws InterruptedException {
+ int localReadPos = readPos;
+ if( localReadPos == writePos ) {
+ if( blocking ) {
+ synchronized( syncRead ) {
+ while( localReadPos == writePos ) {
+ syncRead.wait();
+ }
+ }
+ } else {
+ return null;
+ }
+ }
+ localReadPos = (localReadPos + 1) % capacityPlusOne;
+ final T r = array[localReadPos];
+ if( !peek ) {
+ array[localReadPos] = null;
+ synchronized ( syncWrite ) {
+ size--;
+ readPos = localReadPos;
+ syncWrite.notifyAll(); // notify waiting putter
+ }
+ }
+ return r;
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Implementation advances the write position and stores the given element at it, if not full.
+ * </p>
+ */
+ @Override
+ public final boolean put(T e) {
+ try {
+ return putImpl(e, false, false);
+ } catch (InterruptedException ie) { throw new RuntimeException(ie); }
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Implementation advances the write position and stores the given element at it, if not full.
+ * </p>
+ */
+ @Override
+ public final void putBlocking(T e) throws InterruptedException {
+ if( !putImpl(e, false, true) ) {
+ throw new InternalError("Blocking put failed: "+this);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Implementation advances the write position and keeps the element at it, if not full.
+ * </p>
+ */
+ @Override
+ public final boolean putSame(boolean blocking) throws InterruptedException {
+ return putImpl(null, true, blocking);
+ }
+
+ private final boolean putImpl(T e, boolean sameRef, boolean blocking) throws InterruptedException {
+ int localWritePos = writePos;
+ localWritePos = (localWritePos + 1) % capacityPlusOne;
+ if( localWritePos == readPos ) {
+ if( blocking ) {
+ synchronized( syncWrite ) {
+ while( localWritePos == readPos ) {
+ syncWrite.wait();
+ }
+ }
+ } else {
+ return false;
+ }
+ }
+ if( !sameRef ) {
+ array[localWritePos] = e;
+ }
+ synchronized ( syncRead ) {
+ size++;
+ writePos = localWritePos;
+ syncRead.notifyAll(); // notify waiting getter
+ }
+ return true;
+ }
+
+
+ @Override
+ public final void waitForFreeSlots(int count) throws InterruptedException {
+ synchronized ( syncRead ) {
+ if( capacityPlusOne - 1 - size < count ) {
+ while( capacityPlusOne - 1 - size < count ) {
+ syncRead.wait();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void growBuffer(T[] newElements, int amount, AllocEmptyArray<T> allocEmptyArray) throws IllegalStateException, IllegalArgumentException {
+ synchronized( syncGlobal ) {
+ final boolean isFull = capacityPlusOne - 1 == size;
+ final boolean isEmpty = 0 == size;
+ if( !isFull && !isEmpty ) {
+ throw new IllegalStateException("Buffer neither full nor empty: "+this);
+ }
+ if( isEmpty ) {
+ if( readPos != writePos ) {
+ throw new InternalError("R/W pos not equal at empty: "+this);
+ }
+ } else /* isFull */ {
+ final int wp1 = ( writePos + 1 ) % capacityPlusOne;
+ if( wp1 != readPos ) {
+ throw new InternalError("R != W+1 pos at full: "+this);
+ }
+ }
+ if( null != newElements && amount < newElements.length ) {
+ throw new IllegalArgumentException("amount "+amount+" < newElements "+newElements.length);
+ }
+ final int newCapacity = capacityPlusOne + amount;
+ final T[] oldArray = array;
+ final T[] newArray = allocEmptyArray.newArray(newCapacity);
+
+ if( isFull ) {
+ // writePos == readPos - 1
+ readPos = ( writePos + 1 + amount ) % newCapacity; // warp readPos to the end of the new data location
+
+ if(writePos >= 0) {
+ System.arraycopy(oldArray, 0, newArray, 0, writePos+1);
+ }
+ if( null != newElements && newElements.length > 0 ) {
+ System.arraycopy(newElements, 0, newArray, writePos+1, newElements.length);
+ }
+ final int tail = capacityPlusOne-1-writePos;
+ if( tail > 0 ) {
+ System.arraycopy(oldArray, writePos+1, newArray, readPos, tail);
+ }
+ } else /* if ( isEmpty ) */ {
+ // writePos == readPos
+ writePos += amount; // warp writePos to the end of the new data location
+
+ if( readPos >= 0 ) {
+ System.arraycopy(oldArray, 0, newArray, 0, readPos+1);
+ }
+ if( null != newElements && newElements.length > 0 ) {
+ System.arraycopy(newElements, 0, newArray, readPos+1, newElements.length);
+ }
+ final int tail = capacityPlusOne-1-readPos;
+ if( tail > 0 ) {
+ System.arraycopy(oldArray, readPos+1, newArray, writePos+1, tail);
+ }
+ size = amount;
+ }
+
+ capacityPlusOne = newCapacity;
+ array = newArray;
+ }
+ }
+} \ No newline at end of file
diff --git a/src/java/com/jogamp/common/util/Ringbuffer.java b/src/java/com/jogamp/common/util/Ringbuffer.java
new file mode 100644
index 0000000..733a235
--- /dev/null
+++ b/src/java/com/jogamp/common/util/Ringbuffer.java
@@ -0,0 +1,214 @@
+/**
+ * Copyright 2013 JogAmp Community. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without modification, are
+ * permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this list of
+ * conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice, this list
+ * of conditions and the following disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY JogAmp Community ``AS IS'' AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+ * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JogAmp Community OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and documentation are those of the
+ * authors and should not be interpreted as representing official policies, either expressed
+ * or implied, of JogAmp Community.
+ */
+package com.jogamp.common.util;
+
+import java.io.PrintStream;
+
+/**
+ * Ring buffer interface, a.k.a circular buffer.
+ * <p>
+ * Caller can chose whether to block until get / put is able to proceed or not.
+ * </p>
+ * <p>
+ * Caller can chose whether to pass an empty array and clear references at get,
+ * or using a preset array for circular access of same objects.
+ * </p>
+ * <p>
+ * Synchronization and hence thread safety details belong to the implementation.
+ * </p>
+ */
+public interface Ringbuffer<T> {
+
+ /**
+ * Implementation hook for {@link #growBuffer(Object[], int, AllocEmptyArray)}
+ * to pass an implementation of {@link #newArray(int)}.
+ * @param <T> type of array
+ */
+ public static interface AllocEmptyArray<T> {
+ /**
+ * Returns a new allocated empty array of generic type T with given size.
+ */
+ public T[] newArray(int size);
+ }
+
+ /** Returns a short string representation incl. size/capacity and internal r/w index (impl. dependent). */
+ public String toString();
+
+ /** Debug functionality - Dumps the contents of the internal array. */
+ public void dump(PrintStream stream, String prefix);
+
+ /**
+ * Returns the internal array as-is, i.e. w/o a copy.
+ * <p>
+ * The layout and size of the internal array is implementation dependent.
+ * </p>
+ * <p>
+ * Users shall not modify or rely on the returned array.
+ * </p>
+ * @deprecated This method should not be required
+ */
+ public T[] getInternalArray();
+
+ /** Returns the net capacity of this ring buffer. */
+ public int capacity();
+
+ /**
+ * Resets the read and write position according to an empty ring buffer
+ * and set all ring buffer slots to <code>null</code>.
+ * <p>
+ * {@link #isEmpty()} will return <code>true</code> after calling this method.
+ * </p>
+ */
+ public void clear();
+
+ /**
+ * Resets the read and write position according to a full ring buffer
+ * and fill all slots w/ elements of array <code>copyFrom</code>.
+ * <p>
+ * Array's <code>copyFrom</code> elements will be copied into the internal array,
+ * hence it's length must be equal to {@link #capacity()}.
+ * </p>
+ * @param copyFrom Mandatory array w/ length {@link #capacity()} to be copied into the internal array.
+ * @throws IllegalArgumentException if <code>copyFrom</code> is <code>null</code>.
+ * @throws IllegalArgumentException if <code>copyFrom</code>'s length is different from {@link #capacity()}.
+ */
+ public void resetFull(T[] copyFrom) throws IllegalArgumentException;
+
+ /** Returns the number of elements in this ring buffer. */
+ public int size();
+
+ /** Returns the number of free slots available to put. */
+ public int getFreeSlots();
+
+ /** Returns true if this ring buffer is empty, otherwise false. */
+ public boolean isEmpty();
+
+ /** Returns true if this ring buffer is full, otherwise false. */
+ public boolean isFull();
+
+ /**
+ * Dequeues the oldest enqueued element if available, otherwise null.
+ * <p>
+ * The returned ring buffer slot will be set to <code>null</code> to release the reference
+ * and move ownership to the caller.
+ * </p>
+ * <p>
+ * Method is non blocking and returns immediately;.
+ * </p>
+ * @return the oldest put element if available, otherwise null.
+ */
+ public T get();
+
+ /**
+ * Dequeues the oldest enqueued element.
+ * <p>
+ * The returned ring buffer slot will be set to <code>null</code> to release the reference
+ * and move ownership to the caller.
+ * </p>
+ * <p>
+ * Methods blocks until an element becomes available via put.
+ * </p>
+ * @return the oldest put element
+ * @throws InterruptedException
+ */
+ public T getBlocking() throws InterruptedException;
+
+ /**
+ * Peeks the next element at the read position w/o modifying pointer, nor blocking.
+ * @return <code>null</code> if empty, otherwise the element which would be read next.
+ */
+ public T peek();
+
+ /**
+ * Peeks the next element at the read position w/o modifying pointer, but w/ blocking.
+ * @return <code>null</code> if empty, otherwise the element which would be read next.
+ */
+ public T peekBlocking() throws InterruptedException;
+
+ /**
+ * Enqueues the given element.
+ * <p>
+ * Returns true if successful, otherwise false in case buffer is full.
+ * </p>
+ * <p>
+ * Method is non blocking and returns immediately;.
+ * </p>
+ */
+ public boolean put(T e);
+
+ /**
+ * Enqueues the given element.
+ * <p>
+ * Method blocks until a free slot becomes available via get.
+ * </p>
+ * @throws InterruptedException
+ */
+ public void putBlocking(T e) throws InterruptedException;
+
+ /**
+ * Enqueues the same element at it's write position, if not full.
+ * <p>
+ * Returns true if successful, otherwise false in case buffer is full.
+ * </p>
+ * <p>
+ * If <code>blocking</code> is true, method blocks until a free slot becomes available via get.
+ * </p>
+ * @param blocking if true, wait until a free slot becomes available via get.
+ * @throws InterruptedException
+ */
+ public boolean putSame(boolean blocking) throws InterruptedException;
+
+ /**
+ * Blocks until at least <code>count</code> free slots become available.
+ * @throws InterruptedException
+ */
+ public void waitForFreeSlots(int count) throws InterruptedException;
+
+ /**
+ * Grows a full or empty ring buffer, increasing it's capacity about the amount.
+ * <p>
+ * Growing an empty ring buffer increases it's size about the amount, i.e. renders it not empty.
+ * The new elements are inserted at the read position, able to be read out via {@link #get()} etc.
+ * </p>
+ * <p>
+ * Growing a full ring buffer leaves the size intact, i.e. renders it not full.
+ * The new elements are inserted at the write position, able to be written to via {@link #put(Object)} etc.
+ * </p>
+ *
+ * @param newElements array of new empty elements the buffer shall grow about, maybe <code>null</code>.
+ * If not <code>null</code>, array size must be <= <code>amount</code>
+ * @param amount the amount of elements the buffer shall grow about
+ * @param allocEmptyArray implementation hook to allocate a new empty array of generic type T
+ * @throws IllegalStateException if buffer is neither full nor empty
+ * @throws IllegalArgumentException if newElements is given but is > amount
+ */
+ public void growBuffer(T[] newElements, int amount,
+ AllocEmptyArray<T> allocEmptyArray) throws IllegalStateException,
+ IllegalArgumentException;
+
+} \ No newline at end of file
diff --git a/src/java/com/jogamp/common/util/SyncedRingbuffer.java b/src/java/com/jogamp/common/util/SyncedRingbuffer.java
new file mode 100644
index 0000000..b8ddbd6
--- /dev/null
+++ b/src/java/com/jogamp/common/util/SyncedRingbuffer.java
@@ -0,0 +1,381 @@
+/**
+ * Copyright 2013 JogAmp Community. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without modification, are
+ * permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this list of
+ * conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice, this list
+ * of conditions and the following disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY JogAmp Community ``AS IS'' AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+ * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JogAmp Community OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and documentation are those of the
+ * authors and should not be interpreted as representing official policies, either expressed
+ * or implied, of JogAmp Community.
+ */
+
+package com.jogamp.common.util;
+
+import java.io.PrintStream;
+
+/**
+ * Simple synchronized implementation of {@link Ringbuffer}.
+ * <p>
+ * All methods utilize global synchronization.
+ * </p>
+ * <p>
+ * Characteristics:
+ * <ul>
+ * <li>Read position points to the next read element.</li>
+ * <li>Write position points to the next write element.</li>
+ * </ul>
+ * <table border="1">
+ * <tr><td>Empty</td><td>writePos == readPos</td><td>size == 0</td></tr>
+ * <tr><td>Full</td><td>writePos == readPos</td><td>size == capacity</td></tr>
+ * </table>
+ * </p>
+ */
+public class SyncedRingbuffer<T> implements Ringbuffer<T> {
+
+ private final Object syncGlobal = new Object();
+ private /* final */ T[] array; // not final due to grow
+ private /* final */ int capacity; // not final due to grow
+ private int readPos;
+ private int writePos;
+ private int size;
+
+ @Override
+ public final String toString() {
+ return "SyncedRingbuffer<?>[filled "+size+" / "+capacity+", writePos "+writePos+", readPos "+readPos+"]";
+ }
+
+ @Override
+ public final void dump(PrintStream stream, String prefix) {
+ stream.println(prefix+" "+toString()+" {");
+ for(int i=0; i<capacity; i++) {
+ stream.println("\t["+i+"]: "+array[i]);
+ }
+ stream.println("}");
+ }
+
+ /**
+ * Create a full ring buffer instance w/ the given array's net capacity and content.
+ * <p>
+ * Example for a 10 element Integer array:
+ * <pre>
+ * Integer[] source = new Integer[10];
+ * // fill source with content ..
+ * Ringbuffer<Integer> rb = new SyncedRingbuffer<Integer>(source, new Ringbuffer.AllocEmptyArray<Integer>() {
+ * public Integer[] newArray(int size) {
+ * return new Integer[size];
+ * } } );
+ * </pre>
+ * </p>
+ * <p>
+ * {@link #isFull()} returns true on the newly created full ring buffer.
+ * </p>
+ * <p>
+ * Implementation will allocate an internal array with size of array <code>copyFrom</code>
+ * and copy all elements from array <code>copyFrom</code> into the internal array.
+ * </p>
+ * @param copyFrom mandatory source array determining ring buffer's net {@link #capacity()} and initial content.
+ * @param allocEmptyArray implementation hook to allocate a new empty array of generic type T
+ * @throws IllegalArgumentException if <code>copyFrom</code> is <code>null</code>
+ */
+ public SyncedRingbuffer(T[] copyFrom, AllocEmptyArray<T> allocEmptyArray) throws IllegalArgumentException {
+ capacity = copyFrom.length;
+ array = allocEmptyArray.newArray(capacity);
+ resetImpl(true, copyFrom);
+ }
+
+ /**
+ * Create an empty ring buffer instance w/ the given net <code>capacity</code>.
+ * <p>
+ * Example for a 10 element Integer array:
+ * <pre>
+ * Ringbuffer<Integer> rb = new SyncedRingbuffer<Integer>(10, new Ringbuffer.AllocEmptyArray<Integer>() {
+ * public Integer[] newArray(int size) {
+ * return new Integer[size];
+ * } } );
+ * </pre>
+ * </p>
+ * <p>
+ * {@link #isEmpty()} returns true on the newly created empty ring buffer.
+ * </p>
+ * <p>
+ * Implementation will allocate an internal array of size <code>capacity</code>.
+ * </p>
+ * @param capacity the initial net capacity of the ring buffer
+ * @param allocEmptyArray implementation hook to allocate a new empty array of generic type T
+ */
+ public SyncedRingbuffer(int capacity, AllocEmptyArray<T> allocEmptyArray) {
+ this.capacity = capacity;
+ this.array = allocEmptyArray.newArray(capacity);
+ resetImpl(false, null);
+ }
+
+ @Override
+ public final T[] getInternalArray() { return array; }
+
+ @Override
+ public final int capacity() { return capacity; }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Implementation sets read and write position to zero.
+ * </p>
+ */
+ @Override
+ public final void clear() {
+ synchronized ( syncGlobal ) {
+ resetImpl(false, null);
+ for(int i=0; i<capacity; i++) {
+ this.array[i] = null;
+ }
+ }
+ }
+
+ @Override
+ public final void resetFull(T[] copyFrom) throws IllegalArgumentException {
+ resetImpl(true, copyFrom);
+ }
+
+ private final void resetImpl(boolean full, T[] copyFrom) throws IllegalArgumentException {
+ synchronized ( syncGlobal ) {
+ if( null != copyFrom ) {
+ if( copyFrom.length != capacity() ) {
+ throw new IllegalArgumentException("copyFrom array length "+copyFrom.length+" != capacity "+this);
+ }
+ System.arraycopy(copyFrom, 0, array, 0, copyFrom.length);
+ } else if( full ) {
+ throw new IllegalArgumentException("copyFrom array is null");
+ }
+ readPos = 0;
+ writePos = 0;
+ size = full ? capacity : 0;
+ }
+ }
+
+ @Override
+ public final int size() {
+ synchronized ( syncGlobal ) {
+ return size;
+ }
+ }
+
+ @Override
+ public final int getFreeSlots() {
+ synchronized ( syncGlobal ) {
+ return capacity - size;
+ }
+ }
+
+ @Override
+ public final boolean isEmpty() {
+ synchronized ( syncGlobal ) {
+ return 0 == size;
+ }
+ }
+
+ @Override
+ public final boolean isFull() {
+ synchronized ( syncGlobal ) {
+ return capacity == size;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Implementation returns the element at the current read position and advances it, if not empty.
+ * </p>
+ */
+ @Override
+ public final T get() {
+ try {
+ return getImpl(false, false);
+ } catch (InterruptedException ie) { throw new RuntimeException(ie); }
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Implementation returns the element at the current read position and advances it, if not empty.
+ * </p>
+ */
+ @Override
+ public final T getBlocking() throws InterruptedException {
+ return getImpl(true, false);
+ }
+
+ @Override
+ public final T peek() {
+ try {
+ return getImpl(false, true);
+ } catch (InterruptedException ie) { throw new RuntimeException(ie); }
+ }
+ @Override
+ public final T peekBlocking() throws InterruptedException {
+ return getImpl(true, true);
+ }
+
+ private final T getImpl(boolean blocking, boolean peek) throws InterruptedException {
+ synchronized( syncGlobal ) {
+ if( 0 == size ) {
+ if( blocking ) {
+ while( 0 == size ) {
+ syncGlobal.wait();
+ }
+ } else {
+ return null;
+ }
+ }
+ final int localReadPos = readPos;
+ final T r = array[localReadPos];
+ if( !peek ) {
+ array[localReadPos] = null;
+ size--;
+ readPos = (localReadPos + 1) % capacity;
+ syncGlobal.notifyAll(); // notify waiting putter
+ }
+ return r;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Implementation stores the element at the current write position and advances it, if not full.
+ * </p>
+ */
+ @Override
+ public final boolean put(T e) {
+ try {
+ return putImpl(e, false, false);
+ } catch (InterruptedException ie) { throw new RuntimeException(ie); }
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Implementation stores the element at the current write position and advances it, if not full.
+ * </p>
+ */
+ @Override
+ public final void putBlocking(T e) throws InterruptedException {
+ if( !putImpl(e, false, true) ) {
+ throw new InternalError("Blocking put failed: "+this);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Implementation keeps the element at the current write position and advances it, if not full.
+ * </p>
+ */
+ @Override
+ public final boolean putSame(boolean blocking) throws InterruptedException {
+ return putImpl(null, true, blocking);
+ }
+
+ private final boolean putImpl(T e, boolean sameRef, boolean blocking) throws InterruptedException {
+ synchronized( syncGlobal ) {
+ if( capacity == size ) {
+ if( blocking ) {
+ while( capacity == size ) {
+ syncGlobal.wait();
+ }
+ } else {
+ return false;
+ }
+ }
+ final int localWritePos = writePos;
+ if( !sameRef ) {
+ array[localWritePos] = e;
+ }
+ size++;
+ writePos = (localWritePos + 1) % capacity;
+ syncGlobal.notifyAll(); // notify waiting getter
+ return true;
+ }
+ }
+
+ @Override
+ public final void waitForFreeSlots(int count) throws InterruptedException {
+ synchronized ( syncGlobal ) {
+ if( capacity - size < count ) {
+ while( capacity - size < count ) {
+ syncGlobal.wait();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void growBuffer(T[] newElements, int amount, AllocEmptyArray<T> allocEmptyArray) throws IllegalStateException, IllegalArgumentException {
+ synchronized ( syncGlobal ) {
+ final boolean isFull = capacity == size;
+ final boolean isEmpty = 0 == size;
+ if( !isFull && !isEmpty ) {
+ throw new IllegalStateException("Buffer neither full nor empty: "+this);
+ }
+ if( readPos != writePos ) {
+ throw new InternalError("R/W pos not equal: "+this);
+ }
+ if( null != newElements && amount < newElements.length ) {
+ throw new IllegalArgumentException("amount "+amount+" < newElements "+newElements.length);
+ }
+ final int newCapacity = capacity + amount;
+ final T[] oldArray = array;
+ final T[] newArray = allocEmptyArray.newArray(newCapacity);
+
+ if( isFull ) {
+ // writePos == readPos
+ readPos += amount; // warp readPos to the end of the new data location
+
+ if(writePos > 0) {
+ System.arraycopy(oldArray, 0, newArray, 0, writePos);
+ }
+ if( null != newElements && newElements.length > 0 ) {
+ System.arraycopy(newElements, 0, newArray, writePos, newElements.length);
+ }
+ final int tail = capacity-writePos;
+ if( tail > 0 ) {
+ System.arraycopy(oldArray, writePos, newArray, readPos, tail);
+ }
+ } else /* if ( isEmpty ) */ {
+ // writePos == readPos
+ writePos += amount; // warp writePos to the end of the new data location
+
+ if( readPos > 0 ) {
+ System.arraycopy(oldArray, 0, newArray, 0, readPos);
+ }
+ if( null != newElements && newElements.length > 0 ) {
+ System.arraycopy(newElements, 0, newArray, readPos, newElements.length);
+ }
+ final int tail = capacity-readPos;
+ if( tail > 0 ) {
+ System.arraycopy(oldArray, readPos, newArray, writePos, tail);
+ }
+ size = amount;
+ }
+
+ capacity = newCapacity;
+ array = newArray;
+ }
+ }
+}