/** * Copyright 2013 JogAmp Community. All rights reserved. * * Redistribution and use in source and binary forms, with or without modification, are * permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, this list of * conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright notice, this list * of conditions and the following disclaimer in the documentation and/or other materials * provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY JogAmp Community ``AS IS'' AND ANY EXPRESS OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JogAmp Community OR * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * The views and conclusions contained in the software and documentation are those of the * authors and should not be interpreted as representing official policies, either expressed * or implied, of JogAmp Community. */ package com.jogamp.common.util; import java.io.PrintStream; /** * Simple implementation of {@link Ringbuffer}, * exposing lock-free * {@link #get() get*(..)} and {@link #put(Object) put*(..)} methods. *
* Implementation utilizes the Always Keep One Slot Open,
* hence implementation maintains an internal array of capacity
plus one!
*
* Implementation is thread safe if: *
* Following methods utilize global synchronization: *
* Characteristics: *
Empty | writePos == readPos | size == 0 |
Full | writePos == readPos - 1 | size == capacity |
* Integer[] source = new Integer[10]; * // fill source with content .. * Ringbuffer* *rb = new LFRingbuffer (source, new Ringbuffer.AllocEmptyArray () { * public Integer[] newArray(int size) { * return new Integer[size]; * } } ); *
* {@link #isFull()} returns true on the newly created full ring buffer. *
*
* Implementation will allocate an internal array with size of array copyFrom
plus one,
* and copy all elements from array copyFrom
into the internal array.
*
copyFrom
is null
*/
public LFRingbuffer(T[] copyFrom, AllocEmptyArraycapacity
.
* * Example for a 10 element Integer array: *
* Ringbuffer* *rb = new LFRingbuffer (10, new Ringbuffer.AllocEmptyArray () { * public Integer[] newArray(int size) { * return new Integer[size]; * } } ); *
* {@link #isEmpty()} returns true on the newly created empty ring buffer. *
*
* Implementation will allocate an internal array of size capacity
plus one.
*
* Implementation advances the read position and returns the element at it, if not empty. *
*/ @Override public final T getBlocking() throws InterruptedException { return getImpl(true, false); } @Override public final T peek() { try { return getImpl(false, true); } catch (InterruptedException ie) { throw new RuntimeException(ie); } } @Override public final T peekBlocking() throws InterruptedException { return getImpl(true, true); } private final T getImpl(boolean blocking, boolean peek) throws InterruptedException { int localReadPos = readPos; if( localReadPos == writePos ) { if( blocking ) { synchronized( syncRead ) { while( localReadPos == writePos ) { syncRead.wait(); } } } else { return null; } } localReadPos = (localReadPos + 1) % capacityPlusOne; final T r = array[localReadPos]; if( !peek ) { array[localReadPos] = null; synchronized ( syncWrite ) { size--; readPos = localReadPos; syncWrite.notifyAll(); // notify waiting putter } } return r; } /** * {@inheritDoc} ** Implementation advances the write position and stores the given element at it, if not full. *
*/ @Override public final boolean put(T e) { try { return putImpl(e, false, false); } catch (InterruptedException ie) { throw new RuntimeException(ie); } } /** * {@inheritDoc} ** Implementation advances the write position and stores the given element at it, if not full. *
*/ @Override public final void putBlocking(T e) throws InterruptedException { if( !putImpl(e, false, true) ) { throw new InternalError("Blocking put failed: "+this); } } /** * {@inheritDoc} ** Implementation advances the write position and keeps the element at it, if not full. *
*/ @Override public final boolean putSame(boolean blocking) throws InterruptedException { return putImpl(null, true, blocking); } private final boolean putImpl(T e, boolean sameRef, boolean blocking) throws InterruptedException { int localWritePos = writePos; localWritePos = (localWritePos + 1) % capacityPlusOne; if( localWritePos == readPos ) { if( blocking ) { synchronized( syncWrite ) { while( localWritePos == readPos ) { syncWrite.wait(); } } } else { return false; } } if( !sameRef ) { array[localWritePos] = e; } synchronized ( syncRead ) { size++; writePos = localWritePos; syncRead.notifyAll(); // notify waiting getter } return true; } @Override public final void waitForFreeSlots(int count) throws InterruptedException { synchronized ( syncRead ) { if( capacityPlusOne - 1 - size < count ) { while( capacityPlusOne - 1 - size < count ) { syncRead.wait(); } } } } @Override public void growBuffer(T[] newElements, int amount, AllocEmptyArray