From 30475c6bbeb9a5d48899b281ead8bb305679028d Mon Sep 17 00:00:00 2001 From: Sven Gothel Date: Thu, 22 Aug 2013 23:39:58 +0200 Subject: Add Ringbuffer interface an 2 implementations, synchronized (locking) SyncedRingbuffer and lock-free LFRingbuffer. SyncedRingbuffer is moved from JOGL to GlueGen, and generalized w/ common interface Ringbuffer to allow testing diff. implementations. - Added Ringbuffer.AllocEmptyArray factory interface, allowing to pass a constructor to construct the generic array. - Added functionality is growBuffer(..), allowing to either grow a full or empty buffer, using Ringbuffer.AllocEmptyArray. - Removed explicit 'clearRef' at get*(..), always clear the taken reference for better interface generalization. - Added LFRingbuffer, exposing lock-free get*(..) and put*(..) methods using the 'Always Keep One Slot Open' pattern using the read/write index as barriers only. - Ctor's copy an optional passed user array into the internal array, utilizing Ringbuffer.AllocEmptyArray. - Added unit tests. --- src/java/com/jogamp/common/util/LFRingbuffer.java | 401 +++++++++++++++++++++ src/java/com/jogamp/common/util/Ringbuffer.java | 214 +++++++++++ .../com/jogamp/common/util/SyncedRingbuffer.java | 381 ++++++++++++++++++++ 3 files changed, 996 insertions(+) create mode 100644 src/java/com/jogamp/common/util/LFRingbuffer.java create mode 100644 src/java/com/jogamp/common/util/Ringbuffer.java create mode 100644 src/java/com/jogamp/common/util/SyncedRingbuffer.java (limited to 'src/java/com/jogamp') diff --git a/src/java/com/jogamp/common/util/LFRingbuffer.java b/src/java/com/jogamp/common/util/LFRingbuffer.java new file mode 100644 index 0000000..a6b441a --- /dev/null +++ b/src/java/com/jogamp/common/util/LFRingbuffer.java @@ -0,0 +1,401 @@ +/** + * Copyright 2013 JogAmp Community. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are + * permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this list of + * conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this list + * of conditions and the following disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY JogAmp Community ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND + * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JogAmp Community OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are those of the + * authors and should not be interpreted as representing official policies, either expressed + * or implied, of JogAmp Community. + */ + +package com.jogamp.common.util; + +import java.io.PrintStream; + +/** + * Simple implementation of {@link Ringbuffer}, + * exposing lock-free + * {@link #get() get*(..)} and {@link #put(Object) put*(..)} methods. + *

+ * Implementation utilizes the Always Keep One Slot Open, + * hence implementation maintains an internal array of capacity plus one! + *

+ *

+ * Implementation is thread safe if: + *

+ *

+ *

+ * Following methods utilize global synchronization: + *

+ * User needs to synchronize above methods w/ the lock-free + * w/ {@link #get() get*(..)} and {@link #put(Object) put*(..)} methods, + * e.g. by controlling their threads before invoking the above. + *

+ *

+ * Characteristics: + *

+ * + * + * + *
EmptywritePos == readPossize == 0
FullwritePos == readPos - 1size == capacity
+ *

+ */ +public class LFRingbuffer implements Ringbuffer { + + private final Object syncRead = new Object(); + private final Object syncWrite = new Object(); + private final Object syncGlobal = new Object(); + private /* final */ volatile T[] array; // not final due to grow + private /* final */ volatile int capacityPlusOne; // not final due to grow + private volatile int readPos; + private volatile int writePos; + private volatile int size; + + public final String toString() { + return "LFRingbuffer[filled "+size+" / "+(capacityPlusOne-1)+", writePos "+writePos+", readPos "+readPos+"]"; + } + + public final void dump(PrintStream stream, String prefix) { + stream.println(prefix+" "+toString()+" {"); + for(int i=0; i + * Example for a 10 element Integer array: + *
+     *  Integer[] source = new Integer[10];
+     *  // fill source with content ..
+     *  Ringbuffer rb = new LFRingbuffer(source, new Ringbuffer.AllocEmptyArray() {
+     *      public Integer[] newArray(int size) {
+     *          return new Integer[size];
+     *      } } );
+     * 
+ *

+ *

+ * {@link #isFull()} returns true on the newly created full ring buffer. + *

+ *

+ * Implementation will allocate an internal array with size of array copyFrom plus one, + * and copy all elements from array copyFrom into the internal array. + *

+ * @param copyFrom mandatory source array determining ring buffer's net {@link #capacity()} and initial content. + * @param allocEmptyArray implementation hook to allocate a new empty array of generic type T + * @throws IllegalArgumentException if copyFrom is null + */ + public LFRingbuffer(T[] copyFrom, AllocEmptyArray allocEmptyArray) throws IllegalArgumentException { + capacityPlusOne = copyFrom.length + 1; + array = allocEmptyArray.newArray(capacityPlusOne); + resetImpl(true, copyFrom); + } + + /** + * Create an empty ring buffer instance w/ the given net capacity. + *

+ * Example for a 10 element Integer array: + *

+     *  Ringbuffer rb = new LFRingbuffer(10, new Ringbuffer.AllocEmptyArray() {
+     *      public Integer[] newArray(int size) {
+     *          return new Integer[size];
+     *      } } );
+     * 
+ *

+ *

+ * {@link #isEmpty()} returns true on the newly created empty ring buffer. + *

+ *

+ * Implementation will allocate an internal array of size capacity plus one. + *

+ * @param capacity the initial net capacity of the ring buffer + * @param allocEmptyArray implementation hook to allocate a new empty array of generic type T + */ + public LFRingbuffer(int capacity, AllocEmptyArray allocEmptyArray) { + capacityPlusOne = capacity+1; + array = allocEmptyArray.newArray(capacityPlusOne); + resetImpl(false, null); + } + + @Override + public final T[] getInternalArray() { return array; } + + @Override + public final int capacity() { return capacityPlusOne-1; } + + @Override + public final void clear() { + synchronized ( syncGlobal ) { + resetImpl(false, null); + for(int i=0; i + * Implementation advances the read position and returns the element at it, if not empty. + *

+ */ + @Override + public final T get() { + try { + return getImpl(false, false); + } catch (InterruptedException ie) { throw new RuntimeException(ie); } + } + + /** + * {@inheritDoc} + *

+ * Implementation advances the read position and returns the element at it, if not empty. + *

+ */ + @Override + public final T getBlocking() throws InterruptedException { + return getImpl(true, false); + } + + @Override + public final T peek() { + try { + return getImpl(false, true); + } catch (InterruptedException ie) { throw new RuntimeException(ie); } + } + @Override + public final T peekBlocking() throws InterruptedException { + return getImpl(true, true); + } + + private final T getImpl(boolean blocking, boolean peek) throws InterruptedException { + int localReadPos = readPos; + if( localReadPos == writePos ) { + if( blocking ) { + synchronized( syncRead ) { + while( localReadPos == writePos ) { + syncRead.wait(); + } + } + } else { + return null; + } + } + localReadPos = (localReadPos + 1) % capacityPlusOne; + final T r = array[localReadPos]; + if( !peek ) { + array[localReadPos] = null; + synchronized ( syncWrite ) { + size--; + readPos = localReadPos; + syncWrite.notifyAll(); // notify waiting putter + } + } + return r; + } + + /** + * {@inheritDoc} + *

+ * Implementation advances the write position and stores the given element at it, if not full. + *

+ */ + @Override + public final boolean put(T e) { + try { + return putImpl(e, false, false); + } catch (InterruptedException ie) { throw new RuntimeException(ie); } + } + + /** + * {@inheritDoc} + *

+ * Implementation advances the write position and stores the given element at it, if not full. + *

+ */ + @Override + public final void putBlocking(T e) throws InterruptedException { + if( !putImpl(e, false, true) ) { + throw new InternalError("Blocking put failed: "+this); + } + } + + /** + * {@inheritDoc} + *

+ * Implementation advances the write position and keeps the element at it, if not full. + *

+ */ + @Override + public final boolean putSame(boolean blocking) throws InterruptedException { + return putImpl(null, true, blocking); + } + + private final boolean putImpl(T e, boolean sameRef, boolean blocking) throws InterruptedException { + int localWritePos = writePos; + localWritePos = (localWritePos + 1) % capacityPlusOne; + if( localWritePos == readPos ) { + if( blocking ) { + synchronized( syncWrite ) { + while( localWritePos == readPos ) { + syncWrite.wait(); + } + } + } else { + return false; + } + } + if( !sameRef ) { + array[localWritePos] = e; + } + synchronized ( syncRead ) { + size++; + writePos = localWritePos; + syncRead.notifyAll(); // notify waiting getter + } + return true; + } + + + @Override + public final void waitForFreeSlots(int count) throws InterruptedException { + synchronized ( syncRead ) { + if( capacityPlusOne - 1 - size < count ) { + while( capacityPlusOne - 1 - size < count ) { + syncRead.wait(); + } + } + } + } + + @Override + public void growBuffer(T[] newElements, int amount, AllocEmptyArray allocEmptyArray) throws IllegalStateException, IllegalArgumentException { + synchronized( syncGlobal ) { + final boolean isFull = capacityPlusOne - 1 == size; + final boolean isEmpty = 0 == size; + if( !isFull && !isEmpty ) { + throw new IllegalStateException("Buffer neither full nor empty: "+this); + } + if( isEmpty ) { + if( readPos != writePos ) { + throw new InternalError("R/W pos not equal at empty: "+this); + } + } else /* isFull */ { + final int wp1 = ( writePos + 1 ) % capacityPlusOne; + if( wp1 != readPos ) { + throw new InternalError("R != W+1 pos at full: "+this); + } + } + if( null != newElements && amount < newElements.length ) { + throw new IllegalArgumentException("amount "+amount+" < newElements "+newElements.length); + } + final int newCapacity = capacityPlusOne + amount; + final T[] oldArray = array; + final T[] newArray = allocEmptyArray.newArray(newCapacity); + + if( isFull ) { + // writePos == readPos - 1 + readPos = ( writePos + 1 + amount ) % newCapacity; // warp readPos to the end of the new data location + + if(writePos >= 0) { + System.arraycopy(oldArray, 0, newArray, 0, writePos+1); + } + if( null != newElements && newElements.length > 0 ) { + System.arraycopy(newElements, 0, newArray, writePos+1, newElements.length); + } + final int tail = capacityPlusOne-1-writePos; + if( tail > 0 ) { + System.arraycopy(oldArray, writePos+1, newArray, readPos, tail); + } + } else /* if ( isEmpty ) */ { + // writePos == readPos + writePos += amount; // warp writePos to the end of the new data location + + if( readPos >= 0 ) { + System.arraycopy(oldArray, 0, newArray, 0, readPos+1); + } + if( null != newElements && newElements.length > 0 ) { + System.arraycopy(newElements, 0, newArray, readPos+1, newElements.length); + } + final int tail = capacityPlusOne-1-readPos; + if( tail > 0 ) { + System.arraycopy(oldArray, readPos+1, newArray, writePos+1, tail); + } + size = amount; + } + + capacityPlusOne = newCapacity; + array = newArray; + } + } +} \ No newline at end of file diff --git a/src/java/com/jogamp/common/util/Ringbuffer.java b/src/java/com/jogamp/common/util/Ringbuffer.java new file mode 100644 index 0000000..733a235 --- /dev/null +++ b/src/java/com/jogamp/common/util/Ringbuffer.java @@ -0,0 +1,214 @@ +/** + * Copyright 2013 JogAmp Community. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are + * permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this list of + * conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this list + * of conditions and the following disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY JogAmp Community ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND + * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JogAmp Community OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are those of the + * authors and should not be interpreted as representing official policies, either expressed + * or implied, of JogAmp Community. + */ +package com.jogamp.common.util; + +import java.io.PrintStream; + +/** + * Ring buffer interface, a.k.a circular buffer. + *

+ * Caller can chose whether to block until get / put is able to proceed or not. + *

+ *

+ * Caller can chose whether to pass an empty array and clear references at get, + * or using a preset array for circular access of same objects. + *

+ *

+ * Synchronization and hence thread safety details belong to the implementation. + *

+ */ +public interface Ringbuffer { + + /** + * Implementation hook for {@link #growBuffer(Object[], int, AllocEmptyArray)} + * to pass an implementation of {@link #newArray(int)}. + * @param type of array + */ + public static interface AllocEmptyArray { + /** + * Returns a new allocated empty array of generic type T with given size. + */ + public T[] newArray(int size); + } + + /** Returns a short string representation incl. size/capacity and internal r/w index (impl. dependent). */ + public String toString(); + + /** Debug functionality - Dumps the contents of the internal array. */ + public void dump(PrintStream stream, String prefix); + + /** + * Returns the internal array as-is, i.e. w/o a copy. + *

+ * The layout and size of the internal array is implementation dependent. + *

+ *

+ * Users shall not modify or rely on the returned array. + *

+ * @deprecated This method should not be required + */ + public T[] getInternalArray(); + + /** Returns the net capacity of this ring buffer. */ + public int capacity(); + + /** + * Resets the read and write position according to an empty ring buffer + * and set all ring buffer slots to null. + *

+ * {@link #isEmpty()} will return true after calling this method. + *

+ */ + public void clear(); + + /** + * Resets the read and write position according to a full ring buffer + * and fill all slots w/ elements of array copyFrom. + *

+ * Array's copyFrom elements will be copied into the internal array, + * hence it's length must be equal to {@link #capacity()}. + *

+ * @param copyFrom Mandatory array w/ length {@link #capacity()} to be copied into the internal array. + * @throws IllegalArgumentException if copyFrom is null. + * @throws IllegalArgumentException if copyFrom's length is different from {@link #capacity()}. + */ + public void resetFull(T[] copyFrom) throws IllegalArgumentException; + + /** Returns the number of elements in this ring buffer. */ + public int size(); + + /** Returns the number of free slots available to put. */ + public int getFreeSlots(); + + /** Returns true if this ring buffer is empty, otherwise false. */ + public boolean isEmpty(); + + /** Returns true if this ring buffer is full, otherwise false. */ + public boolean isFull(); + + /** + * Dequeues the oldest enqueued element if available, otherwise null. + *

+ * The returned ring buffer slot will be set to null to release the reference + * and move ownership to the caller. + *

+ *

+ * Method is non blocking and returns immediately;. + *

+ * @return the oldest put element if available, otherwise null. + */ + public T get(); + + /** + * Dequeues the oldest enqueued element. + *

+ * The returned ring buffer slot will be set to null to release the reference + * and move ownership to the caller. + *

+ *

+ * Methods blocks until an element becomes available via put. + *

+ * @return the oldest put element + * @throws InterruptedException + */ + public T getBlocking() throws InterruptedException; + + /** + * Peeks the next element at the read position w/o modifying pointer, nor blocking. + * @return null if empty, otherwise the element which would be read next. + */ + public T peek(); + + /** + * Peeks the next element at the read position w/o modifying pointer, but w/ blocking. + * @return null if empty, otherwise the element which would be read next. + */ + public T peekBlocking() throws InterruptedException; + + /** + * Enqueues the given element. + *

+ * Returns true if successful, otherwise false in case buffer is full. + *

+ *

+ * Method is non blocking and returns immediately;. + *

+ */ + public boolean put(T e); + + /** + * Enqueues the given element. + *

+ * Method blocks until a free slot becomes available via get. + *

+ * @throws InterruptedException + */ + public void putBlocking(T e) throws InterruptedException; + + /** + * Enqueues the same element at it's write position, if not full. + *

+ * Returns true if successful, otherwise false in case buffer is full. + *

+ *

+ * If blocking is true, method blocks until a free slot becomes available via get. + *

+ * @param blocking if true, wait until a free slot becomes available via get. + * @throws InterruptedException + */ + public boolean putSame(boolean blocking) throws InterruptedException; + + /** + * Blocks until at least count free slots become available. + * @throws InterruptedException + */ + public void waitForFreeSlots(int count) throws InterruptedException; + + /** + * Grows a full or empty ring buffer, increasing it's capacity about the amount. + *

+ * Growing an empty ring buffer increases it's size about the amount, i.e. renders it not empty. + * The new elements are inserted at the read position, able to be read out via {@link #get()} etc. + *

+ *

+ * Growing a full ring buffer leaves the size intact, i.e. renders it not full. + * The new elements are inserted at the write position, able to be written to via {@link #put(Object)} etc. + *

+ * + * @param newElements array of new empty elements the buffer shall grow about, maybe null. + * If not null, array size must be <= amount + * @param amount the amount of elements the buffer shall grow about + * @param allocEmptyArray implementation hook to allocate a new empty array of generic type T + * @throws IllegalStateException if buffer is neither full nor empty + * @throws IllegalArgumentException if newElements is given but is > amount + */ + public void growBuffer(T[] newElements, int amount, + AllocEmptyArray allocEmptyArray) throws IllegalStateException, + IllegalArgumentException; + +} \ No newline at end of file diff --git a/src/java/com/jogamp/common/util/SyncedRingbuffer.java b/src/java/com/jogamp/common/util/SyncedRingbuffer.java new file mode 100644 index 0000000..b8ddbd6 --- /dev/null +++ b/src/java/com/jogamp/common/util/SyncedRingbuffer.java @@ -0,0 +1,381 @@ +/** + * Copyright 2013 JogAmp Community. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are + * permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this list of + * conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this list + * of conditions and the following disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY JogAmp Community ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND + * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JogAmp Community OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are those of the + * authors and should not be interpreted as representing official policies, either expressed + * or implied, of JogAmp Community. + */ + +package com.jogamp.common.util; + +import java.io.PrintStream; + +/** + * Simple synchronized implementation of {@link Ringbuffer}. + *

+ * All methods utilize global synchronization. + *

+ *

+ * Characteristics: + *

    + *
  • Read position points to the next read element.
  • + *
  • Write position points to the next write element.
  • + *
+ * + * + * + *
EmptywritePos == readPossize == 0
FullwritePos == readPossize == capacity
+ *

+ */ +public class SyncedRingbuffer implements Ringbuffer { + + private final Object syncGlobal = new Object(); + private /* final */ T[] array; // not final due to grow + private /* final */ int capacity; // not final due to grow + private int readPos; + private int writePos; + private int size; + + @Override + public final String toString() { + return "SyncedRingbuffer[filled "+size+" / "+capacity+", writePos "+writePos+", readPos "+readPos+"]"; + } + + @Override + public final void dump(PrintStream stream, String prefix) { + stream.println(prefix+" "+toString()+" {"); + for(int i=0; i + * Example for a 10 element Integer array: + *
+     *  Integer[] source = new Integer[10];
+     *  // fill source with content ..
+     *  Ringbuffer rb = new SyncedRingbuffer(source, new Ringbuffer.AllocEmptyArray() {
+     *      public Integer[] newArray(int size) {
+     *          return new Integer[size];
+     *      } } );
+     * 
+ *

+ *

+ * {@link #isFull()} returns true on the newly created full ring buffer. + *

+ *

+ * Implementation will allocate an internal array with size of array copyFrom + * and copy all elements from array copyFrom into the internal array. + *

+ * @param copyFrom mandatory source array determining ring buffer's net {@link #capacity()} and initial content. + * @param allocEmptyArray implementation hook to allocate a new empty array of generic type T + * @throws IllegalArgumentException if copyFrom is null + */ + public SyncedRingbuffer(T[] copyFrom, AllocEmptyArray allocEmptyArray) throws IllegalArgumentException { + capacity = copyFrom.length; + array = allocEmptyArray.newArray(capacity); + resetImpl(true, copyFrom); + } + + /** + * Create an empty ring buffer instance w/ the given net capacity. + *

+ * Example for a 10 element Integer array: + *

+     *  Ringbuffer rb = new SyncedRingbuffer(10, new Ringbuffer.AllocEmptyArray() {
+     *      public Integer[] newArray(int size) {
+     *          return new Integer[size];
+     *      } } );
+     * 
+ *

+ *

+ * {@link #isEmpty()} returns true on the newly created empty ring buffer. + *

+ *

+ * Implementation will allocate an internal array of size capacity. + *

+ * @param capacity the initial net capacity of the ring buffer + * @param allocEmptyArray implementation hook to allocate a new empty array of generic type T + */ + public SyncedRingbuffer(int capacity, AllocEmptyArray allocEmptyArray) { + this.capacity = capacity; + this.array = allocEmptyArray.newArray(capacity); + resetImpl(false, null); + } + + @Override + public final T[] getInternalArray() { return array; } + + @Override + public final int capacity() { return capacity; } + + /** + * {@inheritDoc} + *

+ * Implementation sets read and write position to zero. + *

+ */ + @Override + public final void clear() { + synchronized ( syncGlobal ) { + resetImpl(false, null); + for(int i=0; i + * Implementation returns the element at the current read position and advances it, if not empty. + *

+ */ + @Override + public final T get() { + try { + return getImpl(false, false); + } catch (InterruptedException ie) { throw new RuntimeException(ie); } + } + + /** + * {@inheritDoc} + *

+ * Implementation returns the element at the current read position and advances it, if not empty. + *

+ */ + @Override + public final T getBlocking() throws InterruptedException { + return getImpl(true, false); + } + + @Override + public final T peek() { + try { + return getImpl(false, true); + } catch (InterruptedException ie) { throw new RuntimeException(ie); } + } + @Override + public final T peekBlocking() throws InterruptedException { + return getImpl(true, true); + } + + private final T getImpl(boolean blocking, boolean peek) throws InterruptedException { + synchronized( syncGlobal ) { + if( 0 == size ) { + if( blocking ) { + while( 0 == size ) { + syncGlobal.wait(); + } + } else { + return null; + } + } + final int localReadPos = readPos; + final T r = array[localReadPos]; + if( !peek ) { + array[localReadPos] = null; + size--; + readPos = (localReadPos + 1) % capacity; + syncGlobal.notifyAll(); // notify waiting putter + } + return r; + } + } + + /** + * {@inheritDoc} + *

+ * Implementation stores the element at the current write position and advances it, if not full. + *

+ */ + @Override + public final boolean put(T e) { + try { + return putImpl(e, false, false); + } catch (InterruptedException ie) { throw new RuntimeException(ie); } + } + + /** + * {@inheritDoc} + *

+ * Implementation stores the element at the current write position and advances it, if not full. + *

+ */ + @Override + public final void putBlocking(T e) throws InterruptedException { + if( !putImpl(e, false, true) ) { + throw new InternalError("Blocking put failed: "+this); + } + } + + /** + * {@inheritDoc} + *

+ * Implementation keeps the element at the current write position and advances it, if not full. + *

+ */ + @Override + public final boolean putSame(boolean blocking) throws InterruptedException { + return putImpl(null, true, blocking); + } + + private final boolean putImpl(T e, boolean sameRef, boolean blocking) throws InterruptedException { + synchronized( syncGlobal ) { + if( capacity == size ) { + if( blocking ) { + while( capacity == size ) { + syncGlobal.wait(); + } + } else { + return false; + } + } + final int localWritePos = writePos; + if( !sameRef ) { + array[localWritePos] = e; + } + size++; + writePos = (localWritePos + 1) % capacity; + syncGlobal.notifyAll(); // notify waiting getter + return true; + } + } + + @Override + public final void waitForFreeSlots(int count) throws InterruptedException { + synchronized ( syncGlobal ) { + if( capacity - size < count ) { + while( capacity - size < count ) { + syncGlobal.wait(); + } + } + } + } + + @Override + public void growBuffer(T[] newElements, int amount, AllocEmptyArray allocEmptyArray) throws IllegalStateException, IllegalArgumentException { + synchronized ( syncGlobal ) { + final boolean isFull = capacity == size; + final boolean isEmpty = 0 == size; + if( !isFull && !isEmpty ) { + throw new IllegalStateException("Buffer neither full nor empty: "+this); + } + if( readPos != writePos ) { + throw new InternalError("R/W pos not equal: "+this); + } + if( null != newElements && amount < newElements.length ) { + throw new IllegalArgumentException("amount "+amount+" < newElements "+newElements.length); + } + final int newCapacity = capacity + amount; + final T[] oldArray = array; + final T[] newArray = allocEmptyArray.newArray(newCapacity); + + if( isFull ) { + // writePos == readPos + readPos += amount; // warp readPos to the end of the new data location + + if(writePos > 0) { + System.arraycopy(oldArray, 0, newArray, 0, writePos); + } + if( null != newElements && newElements.length > 0 ) { + System.arraycopy(newElements, 0, newArray, writePos, newElements.length); + } + final int tail = capacity-writePos; + if( tail > 0 ) { + System.arraycopy(oldArray, writePos, newArray, readPos, tail); + } + } else /* if ( isEmpty ) */ { + // writePos == readPos + writePos += amount; // warp writePos to the end of the new data location + + if( readPos > 0 ) { + System.arraycopy(oldArray, 0, newArray, 0, readPos); + } + if( null != newElements && newElements.length > 0 ) { + System.arraycopy(newElements, 0, newArray, readPos, newElements.length); + } + final int tail = capacity-readPos; + if( tail > 0 ) { + System.arraycopy(oldArray, readPos, newArray, writePos, tail); + } + size = amount; + } + + capacity = newCapacity; + array = newArray; + } + } +} -- cgit v1.2.3