/**
* Copyright 2014 JogAmp Community. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are
* permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice, this list of
* conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice, this list
* of conditions and the following disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY JogAmp Community ``AS IS'' AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
* FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JogAmp Community OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* The views and conclusions contained in the software and documentation are those of the
* authors and should not be interpreted as representing official policies, either expressed
* or implied, of JogAmp Community.
*/
package com.jogamp.common.nio;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.io.RandomAccessFile;
import java.lang.ref.WeakReference;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.security.AccessController;
import java.security.PrivilegedAction;
import jogamp.common.Debug;
import com.jogamp.common.os.Platform;
/**
* An {@link InputStream} implementation based on an underlying {@link MappedByteBuffer}
* supporting {@link #markSupported() mark}.
*
* Intended to be utilized with a {@link MappedByteBuffer memory-mapped} {@link FileChannel#map(MapMode, long, long) FileChannel}
* beyond its size limitation of {@link Integer#MAX_VALUE}.
*
* @since 2.3.0
*/
public class MappedByteBufferInputStream extends InputStream {
public static enum CacheMode {
/**
* Keep all previous lazily cached buffer slices alive, useful for hopping readers,
* i.e. random access via {@link MappedByteBufferInputStream#position(long) position(p)}
* or {@link MappedByteBufferInputStream#reset() reset()}.
*
* Note that without flushing, the platform may fail memory mapping
* due to virtual address space exhaustion.
* In such case an {@link OutOfMemoryError} may be thrown directly,
* or encapsulated as the {@link IOException#getCause() the cause}
* of a thrown {@link IOException}.
*
*/
FLUSH_NONE,
/**
* Soft flush the previous lazily cached buffer slice when caching the next buffer slice,
* useful for sequential forward readers, as well as for hopping readers like {@link #FLUSH_NONE}
* in case of relatively short periods between hopping across slices.
*
* Implementation clears the buffer slice reference
* while preserving a {@link WeakReference} to allow its resurrection if not yet
* {@link System#gc() garbage collected}.
*
*
* This is the default.
*
*/
FLUSH_PRE_SOFT,
/**
* Hard flush the previous lazily cached buffer slice when caching the next buffer slice,
* useful for sequential forward readers.
*
* Besides clearing the buffer slice reference,
* implementation attempts to hard flush the mapped buffer
* using a {@code sun.misc.Cleaner} by reflection.
* In case such method does not exist nor works, implementation falls back to {@link #FLUSH_PRE_SOFT}.
*
*/
FLUSH_PRE_HARD
};
/**
* File resize interface allowing a file to change its size,
* e.g. via {@link RandomAccessFile#setLength(long)}.
*/
public static interface FileResizeOp {
/**
* @param newSize the new file size
* @throws IOException if file size change is not supported or any other I/O error occurs
*/
void setLength(final long newSize) throws IOException;
}
private static final FileResizeOp NoFileResize = new FileResizeOp() {
@Override
public void setLength(final long newSize) throws IOException {
throw new IOException("file size change not supported");
}
};
/**
* Default slice shift, i.e. 1L << shift, denoting slice size in MiB:
*
* In case the default is too much of-used up address-space, one may choose other values:
*
*
29 -> 512 MiB
*
28 -> 256 MiB
*
27 -> 128 MiB
*
26 -> 64 MiB
*
*
*/
public static final int DEFAULT_SLICE_SHIFT;
static final boolean DEBUG;
static {
Platform.initSingleton();
if( Platform.is32Bit() ) {
DEFAULT_SLICE_SHIFT = 29;
} else {
DEFAULT_SLICE_SHIFT = 30;
}
DEBUG = Debug.debug("ByteBufferInputStream");
}
private final int sliceShift;
private final FileChannel fc;
private final FileChannel.MapMode mmode;
private FileResizeOp fileResizeOp = NoFileResize;
private int sliceCount;
ByteBuffer[] slices;
private WeakReference[] slices2GC;
private long totalSize;
private int refCount;
private Method mbbCleaner;
private Method cClean;
private boolean cleanerInit;
private boolean hasCleaner;
private CacheMode cmode;
int currSlice;
private long mark;
public final void dbgDump(final String prefix, final PrintStream out) {
long fcSz = 0, pos = 0, rem = 0;
try {
fcSz = fc.size();
} catch (final IOException e) {
e.printStackTrace();
}
if( 0 < refCount ) {
try {
pos = position();
rem = totalSize - pos;
} catch (final IOException e) {
e.printStackTrace();
}
}
final int sliceCount2 = null != slices ? slices.length : 0;
out.println(prefix+" refCount "+refCount+", fcSize "+fcSz+", totalSize "+totalSize);
out.println(prefix+" position "+pos+", remaining "+rem);
out.println(prefix+" mmode "+mmode+", cmode "+cmode+", fileResizeOp "+fileResizeOp);
out.println(prefix+" slice "+currSlice+" / "+sliceCount+" ("+sliceCount2+")");
out.println(prefix+" sliceShift "+sliceShift+" -> "+(1L << sliceShift));
}
MappedByteBufferInputStream(final FileChannel fc, final FileChannel.MapMode mmode, final CacheMode cmode,
final int sliceShift, final long totalSize, final int currSlice) throws IOException {
this.sliceShift = sliceShift;
this.fc = fc;
this.mmode = mmode;
if( 0 > totalSize ) {
throw new IllegalArgumentException("Negative size "+totalSize);
}
// trigger notifyLengthChange
this.totalSize = -1;
this.sliceCount = 0;
notifyLengthChange(totalSize);
this.refCount = 1;
this.cleanerInit = false;
this.hasCleaner = false;
this.cmode = cmode;
this.currSlice = currSlice;
this.mark = -1;
slice(currSlice).position(0);
}
/**
* Creates a new instance using the given {@link FileChannel}.
*
* The {@link MappedByteBuffer} slices will be mapped lazily at first usage.
*
* @param fileChannel the file channel to be mapped lazily.
* @param mmode the map mode, default is {@link FileChannel.MapMode#READ_ONLY}.
* @param cmode the caching mode, default is {@link CacheMode#FLUSH_PRE_SOFT}.
* @param sliceShift the pow2 slice size, default is {@link #DEFAULT_SLICE_SHIFT}.
* @throws IOException
*/
public MappedByteBufferInputStream(final FileChannel fileChannel,
final FileChannel.MapMode mmode,
final CacheMode cmode,
final int sliceShift) throws IOException {
this(fileChannel, mmode, cmode, sliceShift, fileChannel.size(), 0);
}
/**
* Creates a new instance using the given {@link FileChannel},
* given mapping-mode, given cache-mode and the {@link #DEFAULT_SLICE_SHIFT}.
*
* The {@link MappedByteBuffer} slices will be mapped lazily at first usage.
*
* @param fileChannel the file channel to be used.
* @param mmode the map mode, default is {@link FileChannel.MapMode#READ_ONLY}.
* @param cmode the caching mode, default is {@link CacheMode#FLUSH_PRE_SOFT}.
* @throws IOException
*/
public MappedByteBufferInputStream(final FileChannel fileChannel, final FileChannel.MapMode mmode, final CacheMode cmode) throws IOException {
this(fileChannel, mmode, cmode, DEFAULT_SLICE_SHIFT);
}
/**
* Creates a new instance using the given {@link FileChannel},
* {@link FileChannel.MapMode#READ_ONLY read-only} mapping mode, {@link CacheMode#FLUSH_PRE_SOFT}
* and the {@link #DEFAULT_SLICE_SHIFT}.
*
* The {@link MappedByteBuffer} slices will be mapped {@link FileChannel.MapMode#READ_ONLY} lazily at first usage.
*
* @param fileChannel the file channel to be used.
* @throws IOException
*/
public MappedByteBufferInputStream(final FileChannel fileChannel) throws IOException {
this(fileChannel, FileChannel.MapMode.READ_ONLY, CacheMode.FLUSH_PRE_SOFT, DEFAULT_SLICE_SHIFT);
}
final synchronized void checkOpen() throws IOException {
if( 0 == refCount ) {
throw new IOException("stream closed");
}
}
@Override
public final synchronized void close() throws IOException {
if( 0 < refCount ) {
refCount--;
if( 0 == refCount ) {
for(int i=0; i
* User must have a {@link FileResizeOp} {@link #setFileResizeOp(FileResizeOp) registered} before.
*
* @param newTotalSize the new total size
* @throws IOException if no {@link FileResizeOp} has been {@link #setFileResizeOp(FileResizeOp) registered}
* or if a buffer slice operation failed
*/
public final synchronized void setLength(final long newTotalSize) throws IOException {
if( fc.size() != newTotalSize ) {
fileResizeOp.setLength(newTotalSize);
}
notifyLengthChange(newTotalSize);
}
/**
* Notify this instance that the underlying {@link FileChannel}'s size has been changed
* and adjusting this instances buffer slices and states accordingly.
*
* Should be called by user API when aware of such event.
*
* @param newTotalSize the new total size
* @throws IOException if a buffer slice operation failed
*/
public final synchronized void notifyLengthChange(final long newTotalSize) throws IOException {
/* if( DEBUG ) {
System.err.println("notifyLengthChange.0: "+totalSize+" -> "+newTotalSize);
dbgDump("notifyLengthChange.0:", System.err);
} */
if( totalSize == newTotalSize ) {
// NOP
return;
} else if( 0 == newTotalSize ) {
// ZERO - ensure one entry avoiding NULL checks
if( null != slices ) {
for(int i=0; i[] newSlices2GC = new WeakReference[ 1 ];
slices2GC = newSlices2GC;
slices = new ByteBuffer[1];
slices[0] = ByteBuffer.allocate(0);
sliceCount = 0;
totalSize = 0;
mark = -1;
currSlice = 0;
} else {
final long prePosition = position();
final long sliceSize = 1L << sliceShift;
final int newSliceCount = (int)( ( newTotalSize + ( sliceSize - 1 ) ) / sliceSize );
@SuppressWarnings("unchecked")
final WeakReference[] newSlices2GC = new WeakReference[ newSliceCount ];
final MappedByteBuffer[] newSlices = new MappedByteBuffer[ newSliceCount ];
final int copySliceCount = Math.min(newSliceCount, sliceCount-1); // drop last (resize)
if( 0 < copySliceCount ) {
System.arraycopy(slices2GC, 0, newSlices2GC, 0, copySliceCount);
System.arraycopy(slices, 0, newSlices, 0, copySliceCount);
for(int i=copySliceCount; i clipped position (set currSlice and re-map/-pos buffer)
}
/* if( DEBUG ) {
System.err.println("notifyLengthChange.X: "+slices[currSlice]);
dbgDump("notifyLengthChange.X:", System.err);
} */
}
/**
*
* @throws IOException if this stream has been {@link #close() closed}.
*/
public final synchronized void flush() throws IOException {
checkOpen();
if( mmode != FileChannel.MapMode.READ_ONLY ) {
fc.force(true);
}
}
/**
* Returns a new MappedByteBufferOutputStream instance sharing
* all resources of this input stream, including all buffer slices.
*
* @throws IllegalStateException if attempting to set the {@link FileResizeOp} to a different value than before
* @throws IOException if this instance was opened w/ {@link FileChannel.MapMode#READ_ONLY}
* or if this stream has been {@link #close() closed}.
*/
public final synchronized MappedByteBufferOutputStream getOutputStream(final FileResizeOp fileResizeOp)
throws IllegalStateException, IOException
{
if( FileChannel.MapMode.READ_ONLY == mmode ) {
throw new IOException("FileChannel map-mode is read-only");
}
checkOpen();
setFileResizeOp(fileResizeOp);
refCount++;
this.fileResizeOp = null != fileResizeOp ? fileResizeOp : NoFileResize;
return new MappedByteBufferOutputStream(this);
}
final synchronized ByteBuffer slice(final int i) throws IOException {
if ( null != slices[i] ) {
return slices[i];
} else {
if( CacheMode.FLUSH_PRE_SOFT == cmode ) {
final WeakReference ref = slices2GC[i];
if( null != ref ) {
final ByteBuffer mbb = ref.get();
slices2GC[i] = null;
if( null != mbb ) {
slices[i] = mbb;
return mbb;
}
}
}
final long pos = (long)i << sliceShift;
slices[i] = fc.map(mmode, pos, Math.min(1L << sliceShift, totalSize - pos));
return slices[i];
}
}
final synchronized boolean nextSlice() throws IOException {
if ( currSlice < sliceCount - 1 ) {
if( CacheMode.FLUSH_NONE != cmode ) {
flushSlice(currSlice);
}
currSlice++;
slice( currSlice ).position( 0 );
return true;
} else {
return false;
}
}
private synchronized void flushSlice(final int i) throws IOException {
final ByteBuffer s = slices[i];
if ( null != s ) {
slices[i] = null; // GC a slice is enough
if( CacheMode.FLUSH_PRE_HARD == cmode ) {
if( !cleanBuffer(s) ) {
cmode = CacheMode.FLUSH_PRE_SOFT;
slices2GC[i] = new WeakReference(s);
}
} else {
slices2GC[i] = new WeakReference(s);
}
}
}
private synchronized void cleanSlice(final int i) {
final ByteBuffer s = slices[i];
if( null != s ) {
slices[i] = null;
cleanBuffer(s);
}
slices2GC[i] = null;
}
private synchronized boolean cleanBuffer(final ByteBuffer mbb) {
if( !cleanerInit ) {
initCleaner(mbb);
}
if ( !hasCleaner || !mbb.isDirect() ) {
return false;
}
try {
cClean.invoke(mbbCleaner.invoke(mbb));
return true;
} catch(final Throwable t) {
hasCleaner = false;
if( DEBUG ) {
System.err.println("Caught "+t.getMessage());
t.printStackTrace();
}
return false;
}
}
private synchronized void initCleaner(final ByteBuffer bb) {
final Method[] _mbbCleaner = { null };
final Method[] _cClean = { null };
AccessController.doPrivileged(new PrivilegedAction