From 30475c6bbeb9a5d48899b281ead8bb305679028d Mon Sep 17 00:00:00 2001
From: Sven Gothel
+ * Implementation utilizes the Always Keep One Slot Open,
+ * hence implementation maintains an internal array of
+ * Implementation is thread safe if:
+ * capacity
plus one!
+ *
+ *
+ *
+ * Following methods utilize global synchronization: + *
+ * Characteristics: + *
Empty | writePos == readPos | size == 0 |
Full | writePos == readPos - 1 | size == capacity |
+ * Integer[] source = new Integer[10]; + * // fill source with content .. + * Ringbuffer+ * + *rb = new LFRingbuffer (source, new Ringbuffer.AllocEmptyArray () { + * public Integer[] newArray(int size) { + * return new Integer[size]; + * } } ); + *
+ * {@link #isFull()} returns true on the newly created full ring buffer. + *
+ *
+ * Implementation will allocate an internal array with size of array copyFrom
plus one,
+ * and copy all elements from array copyFrom
into the internal array.
+ *
copyFrom
is null
+ */
+ public LFRingbuffer(T[] copyFrom, AllocEmptyArraycapacity
.
+ * + * Example for a 10 element Integer array: + *
+ * Ringbuffer+ * + *rb = new LFRingbuffer (10, new Ringbuffer.AllocEmptyArray () { + * public Integer[] newArray(int size) { + * return new Integer[size]; + * } } ); + *
+ * {@link #isEmpty()} returns true on the newly created empty ring buffer. + *
+ *
+ * Implementation will allocate an internal array of size capacity
plus one.
+ *
+ * Implementation advances the read position and returns the element at it, if not empty. + *
+ */ + @Override + public final T getBlocking() throws InterruptedException { + return getImpl(true, false); + } + + @Override + public final T peek() { + try { + return getImpl(false, true); + } catch (InterruptedException ie) { throw new RuntimeException(ie); } + } + @Override + public final T peekBlocking() throws InterruptedException { + return getImpl(true, true); + } + + private final T getImpl(boolean blocking, boolean peek) throws InterruptedException { + int localReadPos = readPos; + if( localReadPos == writePos ) { + if( blocking ) { + synchronized( syncRead ) { + while( localReadPos == writePos ) { + syncRead.wait(); + } + } + } else { + return null; + } + } + localReadPos = (localReadPos + 1) % capacityPlusOne; + final T r = array[localReadPos]; + if( !peek ) { + array[localReadPos] = null; + synchronized ( syncWrite ) { + size--; + readPos = localReadPos; + syncWrite.notifyAll(); // notify waiting putter + } + } + return r; + } + + /** + * {@inheritDoc} + *+ * Implementation advances the write position and stores the given element at it, if not full. + *
+ */ + @Override + public final boolean put(T e) { + try { + return putImpl(e, false, false); + } catch (InterruptedException ie) { throw new RuntimeException(ie); } + } + + /** + * {@inheritDoc} + *+ * Implementation advances the write position and stores the given element at it, if not full. + *
+ */ + @Override + public final void putBlocking(T e) throws InterruptedException { + if( !putImpl(e, false, true) ) { + throw new InternalError("Blocking put failed: "+this); + } + } + + /** + * {@inheritDoc} + *+ * Implementation advances the write position and keeps the element at it, if not full. + *
+ */ + @Override + public final boolean putSame(boolean blocking) throws InterruptedException { + return putImpl(null, true, blocking); + } + + private final boolean putImpl(T e, boolean sameRef, boolean blocking) throws InterruptedException { + int localWritePos = writePos; + localWritePos = (localWritePos + 1) % capacityPlusOne; + if( localWritePos == readPos ) { + if( blocking ) { + synchronized( syncWrite ) { + while( localWritePos == readPos ) { + syncWrite.wait(); + } + } + } else { + return false; + } + } + if( !sameRef ) { + array[localWritePos] = e; + } + synchronized ( syncRead ) { + size++; + writePos = localWritePos; + syncRead.notifyAll(); // notify waiting getter + } + return true; + } + + + @Override + public final void waitForFreeSlots(int count) throws InterruptedException { + synchronized ( syncRead ) { + if( capacityPlusOne - 1 - size < count ) { + while( capacityPlusOne - 1 - size < count ) { + syncRead.wait(); + } + } + } + } + + @Override + public void growBuffer(T[] newElements, int amount, AllocEmptyArray+ * Caller can chose whether to block until get / put is able to proceed or not. + *
+ *+ * Caller can chose whether to pass an empty array and clear references at get, + * or using a preset array for circular access of same objects. + *
+ *+ * Synchronization and hence thread safety details belong to the implementation. + *
+ */ +public interface Ringbuffer+ * The layout and size of the internal array is implementation dependent. + *
+ *+ * Users shall not modify or rely on the returned array. + *
+ * @deprecated This method should not be required + */ + public T[] getInternalArray(); + + /** Returns the net capacity of this ring buffer. */ + public int capacity(); + + /** + * Resets the read and write position according to an empty ring buffer + * and set all ring buffer slots tonull
.
+ *
+ * {@link #isEmpty()} will return true
after calling this method.
+ *
copyFrom
.
+ *
+ * Array's copyFrom
elements will be copied into the internal array,
+ * hence it's length must be equal to {@link #capacity()}.
+ *
copyFrom
is null
.
+ * @throws IllegalArgumentException if copyFrom
's length is different from {@link #capacity()}.
+ */
+ public void resetFull(T[] copyFrom) throws IllegalArgumentException;
+
+ /** Returns the number of elements in this ring buffer. */
+ public int size();
+
+ /** Returns the number of free slots available to put. */
+ public int getFreeSlots();
+
+ /** Returns true if this ring buffer is empty, otherwise false. */
+ public boolean isEmpty();
+
+ /** Returns true if this ring buffer is full, otherwise false. */
+ public boolean isFull();
+
+ /**
+ * Dequeues the oldest enqueued element if available, otherwise null.
+ *
+ * The returned ring buffer slot will be set to null
to release the reference
+ * and move ownership to the caller.
+ *
+ * Method is non blocking and returns immediately;. + *
+ * @return the oldest put element if available, otherwise null. + */ + public T get(); + + /** + * Dequeues the oldest enqueued element. + *
+ * The returned ring buffer slot will be set to null
to release the reference
+ * and move ownership to the caller.
+ *
+ * Methods blocks until an element becomes available via put. + *
+ * @return the oldest put element + * @throws InterruptedException + */ + public T getBlocking() throws InterruptedException; + + /** + * Peeks the next element at the read position w/o modifying pointer, nor blocking. + * @returnnull
if empty, otherwise the element which would be read next.
+ */
+ public T peek();
+
+ /**
+ * Peeks the next element at the read position w/o modifying pointer, but w/ blocking.
+ * @return null
if empty, otherwise the element which would be read next.
+ */
+ public T peekBlocking() throws InterruptedException;
+
+ /**
+ * Enqueues the given element.
+ * + * Returns true if successful, otherwise false in case buffer is full. + *
+ *+ * Method is non blocking and returns immediately;. + *
+ */ + public boolean put(T e); + + /** + * Enqueues the given element. + *+ * Method blocks until a free slot becomes available via get. + *
+ * @throws InterruptedException + */ + public void putBlocking(T e) throws InterruptedException; + + /** + * Enqueues the same element at it's write position, if not full. + *+ * Returns true if successful, otherwise false in case buffer is full. + *
+ *
+ * If blocking
is true, method blocks until a free slot becomes available via get.
+ *
count
free slots become available.
+ * @throws InterruptedException
+ */
+ public void waitForFreeSlots(int count) throws InterruptedException;
+
+ /**
+ * Grows a full or empty ring buffer, increasing it's capacity about the amount.
+ * + * Growing an empty ring buffer increases it's size about the amount, i.e. renders it not empty. + * The new elements are inserted at the read position, able to be read out via {@link #get()} etc. + *
+ *+ * Growing a full ring buffer leaves the size intact, i.e. renders it not full. + * The new elements are inserted at the write position, able to be written to via {@link #put(Object)} etc. + *
+ * + * @param newElements array of new empty elements the buffer shall grow about, maybenull
.
+ * If not null
, array size must be <= amount
+ * @param amount the amount of elements the buffer shall grow about
+ * @param allocEmptyArray implementation hook to allocate a new empty array of generic type T
+ * @throws IllegalStateException if buffer is neither full nor empty
+ * @throws IllegalArgumentException if newElements is given but is > amount
+ */
+ public void growBuffer(T[] newElements, int amount,
+ AllocEmptyArray+ * All methods utilize global synchronization. + *
+ *+ * Characteristics: + *
Empty | writePos == readPos | size == 0 |
Full | writePos == readPos | size == capacity |
+ * Integer[] source = new Integer[10]; + * // fill source with content .. + * Ringbuffer+ * + *rb = new SyncedRingbuffer (source, new Ringbuffer.AllocEmptyArray () { + * public Integer[] newArray(int size) { + * return new Integer[size]; + * } } ); + *
+ * {@link #isFull()} returns true on the newly created full ring buffer. + *
+ *
+ * Implementation will allocate an internal array with size of array copyFrom
+ * and copy all elements from array copyFrom
into the internal array.
+ *
copyFrom
is null
+ */
+ public SyncedRingbuffer(T[] copyFrom, AllocEmptyArraycapacity
.
+ * + * Example for a 10 element Integer array: + *
+ * Ringbuffer+ * + *rb = new SyncedRingbuffer (10, new Ringbuffer.AllocEmptyArray () { + * public Integer[] newArray(int size) { + * return new Integer[size]; + * } } ); + *
+ * {@link #isEmpty()} returns true on the newly created empty ring buffer. + *
+ *
+ * Implementation will allocate an internal array of size capacity
.
+ *
+ * Implementation sets read and write position to zero. + *
+ */ + @Override + public final void clear() { + synchronized ( syncGlobal ) { + resetImpl(false, null); + for(int i=0; i+ * Implementation returns the element at the current read position and advances it, if not empty. + *
+ */ + @Override + public final T getBlocking() throws InterruptedException { + return getImpl(true, false); + } + + @Override + public final T peek() { + try { + return getImpl(false, true); + } catch (InterruptedException ie) { throw new RuntimeException(ie); } + } + @Override + public final T peekBlocking() throws InterruptedException { + return getImpl(true, true); + } + + private final T getImpl(boolean blocking, boolean peek) throws InterruptedException { + synchronized( syncGlobal ) { + if( 0 == size ) { + if( blocking ) { + while( 0 == size ) { + syncGlobal.wait(); + } + } else { + return null; + } + } + final int localReadPos = readPos; + final T r = array[localReadPos]; + if( !peek ) { + array[localReadPos] = null; + size--; + readPos = (localReadPos + 1) % capacity; + syncGlobal.notifyAll(); // notify waiting putter + } + return r; + } + } + + /** + * {@inheritDoc} + *+ * Implementation stores the element at the current write position and advances it, if not full. + *
+ */ + @Override + public final boolean put(T e) { + try { + return putImpl(e, false, false); + } catch (InterruptedException ie) { throw new RuntimeException(ie); } + } + + /** + * {@inheritDoc} + *+ * Implementation stores the element at the current write position and advances it, if not full. + *
+ */ + @Override + public final void putBlocking(T e) throws InterruptedException { + if( !putImpl(e, false, true) ) { + throw new InternalError("Blocking put failed: "+this); + } + } + + /** + * {@inheritDoc} + *+ * Implementation keeps the element at the current write position and advances it, if not full. + *
+ */ + @Override + public final boolean putSame(boolean blocking) throws InterruptedException { + return putImpl(null, true, blocking); + } + + private final boolean putImpl(T e, boolean sameRef, boolean blocking) throws InterruptedException { + synchronized( syncGlobal ) { + if( capacity == size ) { + if( blocking ) { + while( capacity == size ) { + syncGlobal.wait(); + } + } else { + return false; + } + } + final int localWritePos = writePos; + if( !sameRef ) { + array[localWritePos] = e; + } + size++; + writePos = (localWritePos + 1) % capacity; + syncGlobal.notifyAll(); // notify waiting getter + return true; + } + } + + @Override + public final void waitForFreeSlots(int count) throws InterruptedException { + synchronized ( syncGlobal ) { + if( capacity - size < count ) { + while( capacity - size < count ) { + syncGlobal.wait(); + } + } + } + } + + @Override + public void growBuffer(T[] newElements, int amount, AllocEmptyArray