/**
* Copyright 2014 JogAmp Community. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are
* permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice, this list of
* conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice, this list
* of conditions and the following disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY JogAmp Community ``AS IS'' AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
* FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JogAmp Community OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* The views and conclusions contained in the software and documentation are those of the
* authors and should not be interpreted as representing official policies, either expressed
* or implied, of JogAmp Community.
*/
package com.jogamp.common.nio;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.RandomAccessFile;
import java.lang.ref.WeakReference;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.security.AccessController;
import java.security.PrivilegedAction;
import jogamp.common.Debug;
import com.jogamp.common.os.Platform;
/**
* An {@link InputStream} implementation based on an underlying {@link FileChannel}'s memory mapped {@link ByteBuffer},
* {@link #markSupported() supporting} {@link #mark(int) mark} and {@link #reset()}.
*
* Implementation allows full memory mapped {@link ByteBuffer} coverage via {@link FileChannel#map(MapMode, long, long) FileChannel}
* beyond its size limitation of {@link Integer#MAX_VALUE} utilizing an array of {@link ByteBuffer} slices.
*
*
* Implementation further allows full random access via {@link #position()} and {@link #position(long)}
* and accessing the memory mapped {@link ByteBuffer} slices directly via {@link #currentSlice()} and {@link #nextSlice()}.
*
* @since 2.3.0
*/
public class MappedByteBufferInputStream extends InputStream {
public static enum CacheMode {
/**
* Keep all previous lazily cached buffer slices alive, useful for hopping readers,
* i.e. random access via {@link MappedByteBufferInputStream#position(long) position(p)}
* or {@link MappedByteBufferInputStream#reset() reset()}.
*
* Note that without flushing, the platform may fail memory mapping
* due to virtual address space exhaustion.
* In such case an {@link OutOfMemoryError} may be thrown directly,
* or encapsulated as the {@link IOException#getCause() the cause}
* of a thrown {@link IOException}.
*
*/
FLUSH_NONE,
/**
* Soft flush the previous lazily cached buffer slice when caching the next buffer slice,
* useful for sequential forward readers, as well as for hopping readers like {@link #FLUSH_NONE}
* in case of relatively short periods between hopping across slices.
*
* Implementation clears the buffer slice reference
* while preserving a {@link WeakReference} to allow its resurrection if not yet
* {@link System#gc() garbage collected}.
*
*/
FLUSH_PRE_SOFT,
/**
* Hard flush the previous lazily cached buffer slice when caching the next buffer slice,
* useful for sequential forward readers.
*
* Besides clearing the buffer slice reference,
* implementation attempts to hard flush the mapped buffer
* using a {@code sun.misc.Cleaner} by reflection.
* In case such method does not exist nor works, implementation falls back to {@link #FLUSH_PRE_SOFT}.
*
*
* This is the default.
*
*/
FLUSH_PRE_HARD
};
/**
* File resize interface allowing a file to change its size,
* e.g. via {@link RandomAccessFile#setLength(long)}.
*/
public static interface FileResizeOp {
/**
* @param newSize the new file size
* @throws IOException if file size change is not supported or any other I/O error occurs
*/
void setLength(final long newSize) throws IOException;
}
private static final FileResizeOp NoFileResize = new FileResizeOp() {
@Override
public void setLength(final long newSize) throws IOException {
throw new IOException("file size change not supported");
}
};
/**
* Default slice shift, i.e. 1L << shift, denoting slice size in MiB:
*
* In case the default is too much of-used up address-space, one may choose other values:
*
*
29 -> 512 MiB
*
28 -> 256 MiB
*
27 -> 128 MiB
*
26 -> 64 MiB
*
*
*/
public static final int DEFAULT_SLICE_SHIFT;
static final boolean DEBUG;
static {
Platform.initSingleton();
if( Platform.is32Bit() ) {
DEFAULT_SLICE_SHIFT = 29;
} else {
DEFAULT_SLICE_SHIFT = 30;
}
DEBUG = Debug.debug("ByteBufferInputStream");
}
private final int sliceShift;
private final FileChannel fc;
private final FileChannel.MapMode mmode;
private FileResizeOp fileResizeOp = NoFileResize;
private int sliceCount;
private ByteBuffer[] slices;
private WeakReference[] slices2GC;
private long totalSize;
private int slicesEntries, slices2GCEntries;
private boolean synchronous;
private int refCount;
private Method mbbCleaner;
private Method cClean;
private boolean cleanerInit;
private boolean hasCleaner;
private CacheMode cmode;
private int sliceIdx;
private long mark;
final void dbgDump(final String prefix, final PrintStream out) {
int _slicesEntries = 0;
for(int i=0; i ref = slices2GC[i];
if( null != ref ) {
_slices2GCEntries++;
if( null != ref.get() ) {
_slices2GCAliveEntries++;
}
}
}
long fcSz = 0, pos = 0, rem = 0;
try {
fcSz = fc.size();
} catch (final IOException e) {
e.printStackTrace();
}
if( 0 < refCount ) {
try {
pos = position();
rem = totalSize - pos;
} catch (final IOException e) {
e.printStackTrace();
}
}
final int sliceCount2 = null != slices ? slices.length : 0;
out.println(prefix+" refCount "+refCount+", fcSize "+fcSz+", totalSize "+totalSize);
out.println(prefix+" position "+pos+", remaining "+rem);
out.println(prefix+" mmode "+mmode+", cmode "+cmode+", fileResizeOp "+fileResizeOp);
out.println(prefix+" slice "+sliceIdx+" / "+sliceCount+" ("+sliceCount2+"), synchronous "+synchronous);
out.println(prefix+" mapped "+slicesEntries+" / "+_slicesEntries);
out.println(prefix+" GC-queue "+slices2GCEntries+" / "+_slices2GCEntries+" (alive "+_slices2GCAliveEntries+")");
out.println(prefix+" sliceShift "+sliceShift+" -> "+(1L << sliceShift));
}
MappedByteBufferInputStream(final FileChannel fc, final FileChannel.MapMode mmode, final CacheMode cmode,
final int sliceShift, final long totalSize, final int currSliceIdx) throws IOException {
this.sliceShift = sliceShift;
this.fc = fc;
this.mmode = mmode;
if( 0 > totalSize ) {
throw new IllegalArgumentException("Negative size "+totalSize);
}
// trigger notifyLengthChange
this.totalSize = -1;
this.sliceCount = 0;
notifyLengthChange( totalSize );
this.refCount = 1;
this.cleanerInit = false;
this.hasCleaner = false;
this.cmode = cmode;
this.sliceIdx = currSliceIdx;
this.mark = -1;
currentSlice().position(0);
}
/**
* Creates a new instance using the given {@link FileChannel}.
*
* The {@link ByteBuffer} slices will be mapped lazily at first usage.
*
* @param fileChannel the file channel to be mapped lazily.
* @param mmode the map mode, default is {@link FileChannel.MapMode#READ_ONLY}.
* @param cmode the caching mode, default is {@link CacheMode#FLUSH_PRE_HARD}.
* @param sliceShift the pow2 slice size, default is {@link #DEFAULT_SLICE_SHIFT}.
* @throws IOException
*/
public MappedByteBufferInputStream(final FileChannel fileChannel,
final FileChannel.MapMode mmode,
final CacheMode cmode,
final int sliceShift) throws IOException {
this(fileChannel, mmode, cmode, sliceShift, fileChannel.size(), 0);
}
/**
* Creates a new instance using the given {@link FileChannel},
* given mapping-mode, given cache-mode and the {@link #DEFAULT_SLICE_SHIFT}.
*
* The {@link ByteBuffer} slices will be mapped lazily at first usage.
*
* @param fileChannel the file channel to be used.
* @param mmode the map mode, default is {@link FileChannel.MapMode#READ_ONLY}.
* @param cmode the caching mode, default is {@link CacheMode#FLUSH_PRE_HARD}.
* @throws IOException
*/
public MappedByteBufferInputStream(final FileChannel fileChannel, final FileChannel.MapMode mmode, final CacheMode cmode) throws IOException {
this(fileChannel, mmode, cmode, DEFAULT_SLICE_SHIFT);
}
/**
* Creates a new instance using the given {@link FileChannel},
* {@link FileChannel.MapMode#READ_ONLY read-only} mapping mode, {@link CacheMode#FLUSH_PRE_HARD}
* and the {@link #DEFAULT_SLICE_SHIFT}.
*
* The {@link ByteBuffer} slices will be mapped {@link FileChannel.MapMode#READ_ONLY} lazily at first usage.
*
* @param fileChannel the file channel to be used.
* @throws IOException
*/
public MappedByteBufferInputStream(final FileChannel fileChannel) throws IOException {
this(fileChannel, FileChannel.MapMode.READ_ONLY, CacheMode.FLUSH_PRE_HARD, DEFAULT_SLICE_SHIFT);
}
/**
* Enable or disable synchronous mode.
*
* If synchronous mode is enabled, mapped buffers will be {@link #flush(boolean) flushed}
* if {@link #notifyLengthChange(long) resized}, written to or {@link #close() closing} in {@link FileChannel.MapMode#READ_WRITE read-write} mapping mode.
*
*
* If synchronous mode is enabled, {@link FileChannel#force(boolean)} is issued
* if {@link #setLength(long) resizing} or {@link #close() closing} and not in {@link FileChannel.MapMode#READ_ONLY read-only} mapping mode.
*
* @param s {@code true} to enable synchronous mode
*/
public final synchronized void setSynchronous(final boolean s) {
synchronous = s;
}
/**
* Return {@link #setSynchronous(boolean) synchronous mode}.
*/
public final synchronized boolean getSynchronous() {
return synchronous ;
}
final synchronized void checkOpen() throws IOException {
if( 0 == refCount ) {
throw new IOException("stream closed");
}
}
@Override
public final synchronized void close() throws IOException {
if( 0 < refCount ) {
refCount--;
if( 0 == refCount ) {
try {
cleanAllSlices( true /* syncBuffer */ );
} finally {
flushImpl(true /* metaData */, false /* syncBuffer */);
fc.close();
mark = -1;
sliceIdx = -1;
super.close();
}
}
}
}
final FileChannel.MapMode getMapMode() { return mmode; }
/**
* @param fileResizeOp the new {@link FileResizeOp}.
* @throws IllegalStateException if attempting to set the {@link FileResizeOp} to a different value than before
*/
public final synchronized void setFileResizeOp(final FileResizeOp fileResizeOp) throws IllegalStateException {
if( NoFileResize != this.fileResizeOp && this.fileResizeOp != fileResizeOp ) {
throw new IllegalStateException("FileResizeOp already set, this value differs");
}
this.fileResizeOp = null != fileResizeOp ? fileResizeOp : NoFileResize;
}
/**
* Resize the underlying {@link FileChannel}'s size and adjusting this instance
* via {@link #notifyLengthChange(long) accordingly}.
*
* User must have a {@link FileResizeOp} {@link #setFileResizeOp(FileResizeOp) registered} before.
*
*
* Implementation calls {@link #notifyLengthChange(long)} after {@link FileResizeOp#setLength(long)}.
*
* @param newTotalSize the new total size
* @throws IOException if no {@link FileResizeOp} has been {@link #setFileResizeOp(FileResizeOp) registered}
* or if a buffer slice operation failed
*/
public final synchronized void setLength(final long newTotalSize) throws IOException {
final long currentPosition;
if( 0 != newTotalSize && totalSize != newTotalSize ) {
currentPosition = position();
} else {
currentPosition = -1L;
}
if( fc.size() != newTotalSize ) {
if( Platform.OSType.WINDOWS == Platform.getOSType() ) {
// On Windows, we have to close all mapped slices.
// Otherwise we will receive:
// java.io.IOException: The requested operation cannot be performed on a file with a user-mapped section open
// at java.io.RandomAccessFile.setLength(Native Method)
cleanAllSlices( synchronous );
}
fileResizeOp.setLength(newTotalSize);
if( synchronous ) {
// buffers will be synchronized in notifyLengthChangeImpl(..)
flushImpl( true /* metaData */, false /* syncBuffer */);
}
}
notifyLengthChangeImpl(newTotalSize, currentPosition);
}
/**
* Notify this instance that the underlying {@link FileChannel}'s size has been changed
* and adjusting this instances buffer slices and states accordingly.
*
* Should be called by user API when aware of such event.
*
* @param newTotalSize the new total size
* @throws IOException if a buffer slice operation failed
*/
public final synchronized void notifyLengthChange(final long newTotalSize) throws IOException {
notifyLengthChangeImpl(newTotalSize, -1L);
}
private final synchronized void notifyLengthChangeImpl(final long newTotalSize, final long currentPosition) throws IOException {
/* if( DEBUG ) {
System.err.println("notifyLengthChange.0: "+totalSize+" -> "+newTotalSize);
dbgDump("notifyLengthChange.0:", System.err);
} */
if( totalSize == newTotalSize ) {
// NOP
return;
} else if( 0 == newTotalSize ) {
// ZERO - ensure one entry avoiding NULL checks
cleanAllSlices( synchronous );
@SuppressWarnings("unchecked")
final WeakReference[] newSlices2GC = new WeakReference[ 1 ];
slices2GC = newSlices2GC;
slices = new ByteBuffer[1];
slices[0] = ByteBuffer.allocate(0);
sliceCount = 0;
totalSize = 0;
mark = -1;
sliceIdx = 0;
} else {
final long prePosition = 0 <= currentPosition ? currentPosition : position();
final long sliceSize = 1L << sliceShift;
final int newSliceCount = (int)( ( newTotalSize + ( sliceSize - 1 ) ) / sliceSize );
@SuppressWarnings("unchecked")
final WeakReference[] newSlices2GC = new WeakReference[ newSliceCount ];
final ByteBuffer[] newSlices = new ByteBuffer[ newSliceCount ];
final int copySliceCount = Math.min(newSliceCount, sliceCount-1); // drop last (resize)
if( 0 <= copySliceCount ) {
if( 0 < copySliceCount ) {
System.arraycopy(slices2GC, 0, newSlices2GC, 0, copySliceCount);
System.arraycopy(slices, 0, newSlices, 0, copySliceCount);
}
for(int i=copySliceCount; i clipped position (set currSlice and re-map/-pos buffer)
}
/* if( DEBUG ) {
System.err.println("notifyLengthChange.X: "+slices[currSlice]);
dbgDump("notifyLengthChange.X:", System.err);
} */
}
/**
* Similar to {@link OutputStream#flush()}, synchronizes all mapped buffers
* from local storage via {@link MappedByteBuffer#force()}
* as well as the {@link FileChannel#force(boolean)} w/o {@code metaData}.
* @param metaData TODO
* @throws IOException if this stream has been {@link #close() closed}.
*/
public final synchronized void flush(final boolean metaData) throws IOException {
checkOpen();
flushImpl(metaData, true);
}
private final synchronized void flushImpl(final boolean metaData, final boolean syncBuffer) throws IOException {
if( FileChannel.MapMode.READ_ONLY != mmode ) {
if( syncBuffer && FileChannel.MapMode.READ_WRITE == mmode ) {
for(int i=0; i ref = slices2GC[i];
if( null != ref ) {
syncSlice(ref.get(), true);
}
}
}
fc.force(metaData);
}
}
/**
* Returns a new MappedByteBufferOutputStream instance sharing
* all resources of this input stream, including all buffer slices.
*
* @throws IllegalStateException if attempting to set the {@link FileResizeOp} to a different value than before
* @throws IOException if this instance was opened w/ {@link FileChannel.MapMode#READ_ONLY}
* or if this stream has been {@link #close() closed}.
*/
public final synchronized MappedByteBufferOutputStream getOutputStream(final FileResizeOp fileResizeOp)
throws IllegalStateException, IOException
{
checkOpen();
final MappedByteBufferOutputStream res = new MappedByteBufferOutputStream(this, fileResizeOp);
refCount++;
return res;
}
/**
* Return the mapped {@link ByteBuffer} slice at the current {@link #position()}.
*
* Due to the nature of using sliced buffers mapping the whole region,
* user has to determine whether the returned buffer covers the desired region
* and may fetch the {@link #nextSlice()} until satisfied.
* It is also possible to repeat this operation after reposition the stream via {@link #position(long)}
* or {@link #skip(long)} to a position within the next block, similar to {@link #nextSlice()}.
*
* @throws IOException if a buffer slice operation failed.
*/
public final synchronized ByteBuffer currentSlice() throws IOException {
final ByteBuffer s0 = slices[sliceIdx];
if ( null != s0 ) {
return s0;
} else {
if( CacheMode.FLUSH_PRE_SOFT == cmode ) {
final WeakReference ref = slices2GC[sliceIdx];
if( null != ref ) {
final ByteBuffer mbb = ref.get();
slices2GC[sliceIdx] = null;
slices2GCEntries--;
if( null != mbb ) {
slices[sliceIdx] = mbb;
slicesEntries++;
return mbb;
}
}
}
final long pos = (long)sliceIdx << sliceShift;
final MappedByteBuffer s1 = fc.map(mmode, pos, Math.min(1L << sliceShift, totalSize - pos));
slices[sliceIdx] = s1;
slicesEntries++;
return s1;
}
}
/**
* Return the next mapped {@link ByteBuffer} slice from the current {@link #position()},
* implicitly setting {@link #position(long)} to the start of the returned next slice,
* see {@link #currentSlice()}.
*
* If no subsequent slice is available, {@code null} is being returned.
*
* @throws IOException if a buffer slice operation failed.
*/
public final synchronized ByteBuffer nextSlice() throws IOException {
if ( sliceIdx < sliceCount - 1 ) {
flushSlice(sliceIdx, synchronous);
sliceIdx++;
final ByteBuffer slice = currentSlice();
slice.position( 0 );
return slice;
} else {
return null;
}
}
synchronized void syncSlice(final ByteBuffer s) throws IOException {
syncSlice(s, synchronous);
}
synchronized void syncSlice(final ByteBuffer s, final boolean syncBuffer) throws IOException {
if( syncBuffer && null != s && FileChannel.MapMode.READ_WRITE == mmode ) {
try {
((MappedByteBuffer)s).force();
} catch( final Throwable t ) {
// On Windows .. this may happen, like:
// java.io.IOException: The process cannot access the file because another process has locked a portion of the file
// at java.nio.MappedByteBuffer.force0(Native Method)
// at java.nio.MappedByteBuffer.force(MappedByteBuffer.java:203)
if( DEBUG ) {
System.err.println("Caught "+t.getMessage());
t.printStackTrace();
}
}
}
}
private synchronized void flushSlice(final int i, final boolean syncBuffer) throws IOException {
final ByteBuffer s = slices[i];
if ( null != s ) {
if( CacheMode.FLUSH_NONE != cmode ) {
slices[i] = null; // trigger slice GC
slicesEntries--;
if( CacheMode.FLUSH_PRE_HARD == cmode ) {
if( !cleanBuffer(s, syncBuffer) ) {
// buffer already synced in cleanBuffer(..) if requested
slices2GC[i] = new WeakReference(s);
slices2GCEntries++;
}
} else {
syncSlice(s, syncBuffer);
slices2GC[i] = new WeakReference(s);
slices2GCEntries++;
}
} else {
syncSlice(s, syncBuffer);
}
}
}
private synchronized void cleanAllSlices(final boolean syncBuffers) throws IOException {
if( null != slices ) {
for(int i=0; i ref = slices2GC[i];
slices2GC[i] = null;
if( null != ref ) {
slices2GCEntries--;
s2 = ref.get();
} else {
s2 = null;
}
}
if( null != s1 ) {
slices[i] = null;
slicesEntries--;
cleanBuffer(s1, syncBuffer);
if( null != s2 ) {
throw new InternalError("XXX");
}
} else if( null != s2 ) {
cleanBuffer(s2, syncBuffer);
}
}
private synchronized boolean cleanBuffer(final ByteBuffer mbb, final boolean syncBuffer) throws IOException {
if( !cleanerInit ) {
initCleaner(mbb);
}
syncSlice(mbb, syncBuffer);
if( !mbb.isDirect() ) {
return false;
}
boolean res = false;
if ( hasCleaner ) {
try {
cClean.invoke(mbbCleaner.invoke(mbb));
res = true;
} catch(final Throwable t) {
hasCleaner = false;
if( DEBUG ) {
System.err.println("Caught "+t.getMessage());
t.printStackTrace();
}
}
}
if( !res && CacheMode.FLUSH_PRE_HARD == cmode ) {
cmode = CacheMode.FLUSH_PRE_SOFT;
}
return res;
}
private synchronized void initCleaner(final ByteBuffer bb) {
final Method[] _mbbCleaner = { null };
final Method[] _cClean = { null };
AccessController.doPrivileged(new PrivilegedAction