aboutsummaryrefslogtreecommitdiffstats
path: root/src/java/com/jogamp/common/nio/MappedByteBufferInputStream.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/java/com/jogamp/common/nio/MappedByteBufferInputStream.java')
-rw-r--r--src/java/com/jogamp/common/nio/MappedByteBufferInputStream.java281
1 files changed, 221 insertions, 60 deletions
diff --git a/src/java/com/jogamp/common/nio/MappedByteBufferInputStream.java b/src/java/com/jogamp/common/nio/MappedByteBufferInputStream.java
index 5f91f64..1d4d78a 100644
--- a/src/java/com/jogamp/common/nio/MappedByteBufferInputStream.java
+++ b/src/java/com/jogamp/common/nio/MappedByteBufferInputStream.java
@@ -29,11 +29,13 @@ package com.jogamp.common.nio;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.PrintStream;
import java.io.RandomAccessFile;
import java.lang.ref.WeakReference;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.security.AccessController;
@@ -156,6 +158,8 @@ public class MappedByteBufferInputStream extends InputStream {
private ByteBuffer[] slices;
private WeakReference<ByteBuffer>[] slices2GC;
private long totalSize;
+ private int slicesEntries, slices2GCEntries;
+ private boolean synchronous;
private int refCount;
@@ -169,6 +173,23 @@ public class MappedByteBufferInputStream extends InputStream {
private long mark;
final void dbgDump(final String prefix, final PrintStream out) {
+ int _slicesEntries = 0;
+ for(int i=0; i<sliceCount; i++) {
+ if( null != slices[i] ) {
+ _slicesEntries++;
+ }
+ }
+ int _slices2GCEntries = 0;
+ int _slices2GCAliveEntries = 0;
+ for(int i=0; i<sliceCount; i++) {
+ final WeakReference<ByteBuffer> ref = slices2GC[i];
+ if( null != ref ) {
+ _slices2GCEntries++;
+ if( null != ref.get() ) {
+ _slices2GCAliveEntries++;
+ }
+ }
+ }
long fcSz = 0, pos = 0, rem = 0;
try {
fcSz = fc.size();
@@ -187,7 +208,9 @@ public class MappedByteBufferInputStream extends InputStream {
out.println(prefix+" refCount "+refCount+", fcSize "+fcSz+", totalSize "+totalSize);
out.println(prefix+" position "+pos+", remaining "+rem);
out.println(prefix+" mmode "+mmode+", cmode "+cmode+", fileResizeOp "+fileResizeOp);
- out.println(prefix+" slice "+sliceIdx+" / "+sliceCount+" ("+sliceCount2+")");
+ out.println(prefix+" slice "+sliceIdx+" / "+sliceCount+" ("+sliceCount2+"), synchronous "+synchronous);
+ out.println(prefix+" mapped "+slicesEntries+" / "+_slicesEntries);
+ out.println(prefix+" GC-queue "+slices2GCEntries+" / "+_slices2GCEntries+" (alive "+_slices2GCAliveEntries+")");
out.println(prefix+" sliceShift "+sliceShift+" -> "+(1L << sliceShift));
}
@@ -203,7 +226,7 @@ public class MappedByteBufferInputStream extends InputStream {
// trigger notifyLengthChange
this.totalSize = -1;
this.sliceCount = 0;
- notifyLengthChange(totalSize);
+ notifyLengthChange( totalSize );
this.refCount = 1;
this.cleanerInit = false;
@@ -263,6 +286,28 @@ public class MappedByteBufferInputStream extends InputStream {
this(fileChannel, FileChannel.MapMode.READ_ONLY, CacheMode.FLUSH_PRE_SOFT, DEFAULT_SLICE_SHIFT);
}
+ /**
+ * Enable or disable synchronous mode.
+ * <p>
+ * If synchronous mode is enabled, mapped buffers will be {@link #flush(boolean) flushed}
+ * if {@link #notifyLengthChange(long) resized}, <i>written to</i> or {@link #close() closing} in {@link FileChannel.MapMode#READ_WRITE read-write} mapping mode.
+ * </p>
+ * <p>
+ * If synchronous mode is enabled, {@link FileChannel#force(boolean)} is issued
+ * if {@link #setLength(long) resizing} or {@link #close() closing} and not in {@link FileChannel.MapMode#READ_ONLY read-only} mapping mode.
+ * </p>
+ * @param s {@code true} to enable synchronous mode
+ */
+ public final synchronized void setSynchronous(final boolean s) {
+ synchronous = s;
+ }
+ /**
+ * Return {@link #setSynchronous(boolean) synchronous mode}.
+ */
+ public final synchronized boolean getSynchronous() {
+ return synchronous ;
+ }
+
final synchronized void checkOpen() throws IOException {
if( 0 == refCount ) {
throw new IOException("stream closed");
@@ -274,16 +319,15 @@ public class MappedByteBufferInputStream extends InputStream {
if( 0 < refCount ) {
refCount--;
if( 0 == refCount ) {
- for(int i=0; i<sliceCount; i++) {
- cleanSlice(i);
- }
- if( mmode != FileChannel.MapMode.READ_ONLY ) {
- fc.force(true);
+ try {
+ cleanAllSlices( true /* syncBuffer */ );
+ } finally {
+ flushImpl(true /* metaData */, false /* syncBuffer */);
+ fc.close();
+ mark = -1;
+ sliceIdx = -1;
+ super.close();
}
- fc.close();
- mark = -1;
- sliceIdx = -1;
- super.close();
}
}
}
@@ -307,15 +351,35 @@ public class MappedByteBufferInputStream extends InputStream {
* <p>
* User must have a {@link FileResizeOp} {@link #setFileResizeOp(FileResizeOp) registered} before.
* </p>
+ * <p>
+ * Implementation calls {@link #notifyLengthChange(long)} after {@link FileResizeOp#setLength(long)}.
+ * </p>
* @param newTotalSize the new total size
* @throws IOException if no {@link FileResizeOp} has been {@link #setFileResizeOp(FileResizeOp) registered}
* or if a buffer slice operation failed
*/
public final synchronized void setLength(final long newTotalSize) throws IOException {
+ final long currentPosition;
+ if( 0 != newTotalSize && totalSize != newTotalSize ) {
+ currentPosition = position();
+ } else {
+ currentPosition = -1L;
+ }
if( fc.size() != newTotalSize ) {
+ if( Platform.OSType.WINDOWS == Platform.getOSType() ) {
+ // On Windows, we have to close all mapped slices.
+ // Otherwise we will receive:
+ // java.io.IOException: The requested operation cannot be performed on a file with a user-mapped section open
+ // at java.io.RandomAccessFile.setLength(Native Method)
+ cleanAllSlices( synchronous );
+ }
fileResizeOp.setLength(newTotalSize);
+ if( synchronous ) {
+ // buffers will be synchronized in notifyLengthChangeImpl(..)
+ flushImpl( true /* metaData */, false /* syncBuffer */);
+ }
}
- notifyLengthChange(newTotalSize);
+ notifyLengthChangeImpl(newTotalSize, currentPosition);
}
/**
@@ -328,6 +392,9 @@ public class MappedByteBufferInputStream extends InputStream {
* @throws IOException if a buffer slice operation failed
*/
public final synchronized void notifyLengthChange(final long newTotalSize) throws IOException {
+ notifyLengthChangeImpl(newTotalSize, -1L);
+ }
+ private final synchronized void notifyLengthChangeImpl(final long newTotalSize, final long currentPosition) throws IOException {
/* if( DEBUG ) {
System.err.println("notifyLengthChange.0: "+totalSize+" -> "+newTotalSize);
dbgDump("notifyLengthChange.0:", System.err);
@@ -337,11 +404,7 @@ public class MappedByteBufferInputStream extends InputStream {
return;
} else if( 0 == newTotalSize ) {
// ZERO - ensure one entry avoiding NULL checks
- if( null != slices ) {
- for(int i=0; i<sliceCount; i++) {
- cleanSlice(i);
- }
- }
+ cleanAllSlices( synchronous );
@SuppressWarnings("unchecked")
final WeakReference<ByteBuffer>[] newSlices2GC = new WeakReference[ 1 ];
slices2GC = newSlices2GC;
@@ -352,7 +415,7 @@ public class MappedByteBufferInputStream extends InputStream {
mark = -1;
sliceIdx = 0;
} else {
- final long prePosition = position();
+ final long prePosition = 0 <= currentPosition ? currentPosition : position();
final long sliceSize = 1L << sliceShift;
final int newSliceCount = (int)( ( newTotalSize + ( sliceSize - 1 ) ) / sliceSize );
@@ -360,11 +423,13 @@ public class MappedByteBufferInputStream extends InputStream {
final WeakReference<ByteBuffer>[] newSlices2GC = new WeakReference[ newSliceCount ];
final ByteBuffer[] newSlices = new ByteBuffer[ newSliceCount ];
final int copySliceCount = Math.min(newSliceCount, sliceCount-1); // drop last (resize)
- if( 0 < copySliceCount ) {
- System.arraycopy(slices2GC, 0, newSlices2GC, 0, copySliceCount);
- System.arraycopy(slices, 0, newSlices, 0, copySliceCount);
+ if( 0 <= copySliceCount ) {
+ if( 0 < copySliceCount ) {
+ System.arraycopy(slices2GC, 0, newSlices2GC, 0, copySliceCount);
+ System.arraycopy(slices, 0, newSlices, 0, copySliceCount);
+ }
for(int i=copySliceCount; i<sliceCount; i++) { // clip shrunken slices + 1 (last), incl. slices2GC!
- cleanSlice(i);
+ cleanSlice(i, synchronous);
}
}
slices2GC = newSlices2GC;
@@ -374,7 +439,7 @@ public class MappedByteBufferInputStream extends InputStream {
if( newTotalSize < mark ) {
mark = -1;
}
- positionImpl( Math.min(prePosition, newTotalSize) ); // -> clipped position (set currSlice and re-map/-pos buffer)
+ position2( Math.min(prePosition, newTotalSize) ); // -> clipped position (set currSlice and re-map/-pos buffer)
}
/* if( DEBUG ) {
System.err.println("notifyLengthChange.X: "+slices[currSlice]);
@@ -383,16 +448,34 @@ public class MappedByteBufferInputStream extends InputStream {
}
/**
- *
+ * Similar to {@link OutputStream#flush()}, synchronizes all mapped buffers
+ * from local storage via {@link MappedByteBuffer#force()}
+ * as well as the {@link FileChannel#force(boolean)} w/o {@code metaData}.
+ * @param metaData TODO
* @throws IOException if this stream has been {@link #close() closed}.
*/
- public final synchronized void flush() throws IOException {
+ public final synchronized void flush(final boolean metaData) throws IOException {
checkOpen();
- if( mmode != FileChannel.MapMode.READ_ONLY ) {
- fc.force(true);
+ flushImpl(metaData, true);
+ }
+ private final synchronized void flushImpl(final boolean metaData, final boolean syncBuffer) throws IOException {
+ if( FileChannel.MapMode.READ_ONLY != mmode ) {
+ if( syncBuffer && FileChannel.MapMode.READ_WRITE == mmode ) {
+ for(int i=0; i<sliceCount; i++) {
+ syncSlice(slices[i], true);
+ }
+ for(int i=0; i<sliceCount; i++) {
+ final WeakReference<ByteBuffer> ref = slices2GC[i];
+ if( null != ref ) {
+ syncSlice(ref.get(), true);
+ }
+ }
+ }
+ fc.force(metaData);
}
}
+
/**
* Returns a new MappedByteBufferOutputStream instance sharing
* all resources of this input stream, including all buffer slices.
@@ -422,23 +505,28 @@ public class MappedByteBufferInputStream extends InputStream {
* @throws IOException if a buffer slice operation failed.
*/
public final synchronized ByteBuffer currentSlice() throws IOException {
- if ( null != slices[sliceIdx] ) {
- return slices[sliceIdx];
+ final ByteBuffer s0 = slices[sliceIdx];
+ if ( null != s0 ) {
+ return s0;
} else {
if( CacheMode.FLUSH_PRE_SOFT == cmode ) {
final WeakReference<ByteBuffer> ref = slices2GC[sliceIdx];
if( null != ref ) {
final ByteBuffer mbb = ref.get();
slices2GC[sliceIdx] = null;
+ slices2GCEntries--;
if( null != mbb ) {
slices[sliceIdx] = mbb;
+ slicesEntries++;
return mbb;
}
}
}
final long pos = (long)sliceIdx << sliceShift;
- slices[sliceIdx] = fc.map(mmode, pos, Math.min(1L << sliceShift, totalSize - pos));
- return slices[sliceIdx];
+ final MappedByteBuffer s1 = fc.map(mmode, pos, Math.min(1L << sliceShift, totalSize - pos));
+ slices[sliceIdx] = s1;
+ slicesEntries++;
+ return s1;
}
}
@@ -453,9 +541,7 @@ public class MappedByteBufferInputStream extends InputStream {
*/
public final synchronized ByteBuffer nextSlice() throws IOException {
if ( sliceIdx < sliceCount - 1 ) {
- if( CacheMode.FLUSH_NONE != cmode ) {
- flushSlice(sliceIdx);
- }
+ flushSlice(sliceIdx, synchronous);
sliceIdx++;
final ByteBuffer slice = currentSlice();
slice.position( 0 );
@@ -465,46 +551,109 @@ public class MappedByteBufferInputStream extends InputStream {
}
}
- private synchronized void flushSlice(final int i) throws IOException {
+ synchronized void syncSlice(final ByteBuffer s) throws IOException {
+ syncSlice(s, synchronous);
+ }
+ synchronized void syncSlice(final ByteBuffer s, final boolean syncBuffer) throws IOException {
+ if( syncBuffer && null != s && FileChannel.MapMode.READ_WRITE == mmode ) {
+ try {
+ ((MappedByteBuffer)s).force();
+ } catch( final Throwable t ) {
+ // On Windows .. this may happen, like:
+ // java.io.IOException: The process cannot access the file because another process has locked a portion of the file
+ // at java.nio.MappedByteBuffer.force0(Native Method)
+ // at java.nio.MappedByteBuffer.force(MappedByteBuffer.java:203)
+ if( DEBUG ) {
+ System.err.println("Caught "+t.getMessage());
+ t.printStackTrace();
+ }
+ }
+ }
+ }
+ private synchronized void flushSlice(final int i, final boolean syncBuffer) throws IOException {
final ByteBuffer s = slices[i];
if ( null != s ) {
- slices[i] = null; // GC a slice is enough
- if( CacheMode.FLUSH_PRE_HARD == cmode ) {
- if( !cleanBuffer(s) ) {
- cmode = CacheMode.FLUSH_PRE_SOFT;
+ if( CacheMode.FLUSH_NONE != cmode ) {
+ slices[i] = null; // trigger slice GC
+ slicesEntries--;
+ if( CacheMode.FLUSH_PRE_HARD == cmode ) {
+ if( !cleanBuffer(s, syncBuffer) ) {
+ // buffer already synced in cleanBuffer(..) if requested
+ slices2GC[i] = new WeakReference<ByteBuffer>(s);
+ slices2GCEntries++;
+ }
+ } else {
+ syncSlice(s, syncBuffer);
slices2GC[i] = new WeakReference<ByteBuffer>(s);
+ slices2GCEntries++;
}
} else {
- slices2GC[i] = new WeakReference<ByteBuffer>(s);
+ syncSlice(s, syncBuffer);
}
}
}
- private synchronized void cleanSlice(final int i) {
- final ByteBuffer s = slices[i];
- if( null != s ) {
+ private synchronized void cleanAllSlices(final boolean syncBuffers) throws IOException {
+ if( null != slices ) {
+ for(int i=0; i<sliceCount; i++) {
+ cleanSlice(i, syncBuffers);
+ }
+ if( 0 != slicesEntries || 0 != slices2GCEntries ) { // FIXME
+ final String err = "mappedSliceCount "+slicesEntries+", slices2GCEntries "+slices2GCEntries;
+ dbgDump(err+": ", System.err);
+ throw new InternalError(err);
+ }
+ }
+ }
+
+ private synchronized void cleanSlice(final int i, final boolean syncBuffer) throws IOException {
+ final ByteBuffer s1 = slices[i];
+ final ByteBuffer s2;
+ {
+ final WeakReference<ByteBuffer> ref = slices2GC[i];
+ slices2GC[i] = null;
+ if( null != ref ) {
+ slices2GCEntries--;
+ s2 = ref.get();
+ } else {
+ s2 = null;
+ }
+ }
+ if( null != s1 ) {
slices[i] = null;
- cleanBuffer(s);
+ slicesEntries--;
+ cleanBuffer(s1, syncBuffer);
+ if( null != s2 ) {
+ throw new InternalError("XXX");
+ }
+ } else if( null != s2 ) {
+ cleanBuffer(s2, syncBuffer);
}
- slices2GC[i] = null;
}
- private synchronized boolean cleanBuffer(final ByteBuffer mbb) {
+ private synchronized boolean cleanBuffer(final ByteBuffer mbb, final boolean syncBuffer) throws IOException {
if( !cleanerInit ) {
initCleaner(mbb);
}
- if ( !hasCleaner || !mbb.isDirect() ) {
+ syncSlice(mbb, syncBuffer);
+ if( !mbb.isDirect() ) {
return false;
}
- try {
- cClean.invoke(mbbCleaner.invoke(mbb));
- return true;
- } catch(final Throwable t) {
- hasCleaner = false;
- if( DEBUG ) {
- System.err.println("Caught "+t.getMessage());
- t.printStackTrace();
+ boolean res = false;
+ if ( hasCleaner ) {
+ try {
+ cClean.invoke(mbbCleaner.invoke(mbb));
+ res = true;
+ } catch(final Throwable t) {
+ hasCleaner = false;
+ if( DEBUG ) {
+ System.err.println("Caught "+t.getMessage());
+ t.printStackTrace();
+ }
}
- return false;
}
+ if( !res && CacheMode.FLUSH_PRE_HARD == cmode ) {
+ cmode = CacheMode.FLUSH_PRE_SOFT;
+ }
+ return res;
}
private synchronized void initCleaner(final ByteBuffer bb) {
final Method[] _mbbCleaner = { null };
@@ -618,13 +767,25 @@ public class MappedByteBufferInputStream extends InputStream {
throw new IllegalArgumentException("new position "+newPosition+" not within [0.."+totalSize+"]");
}
final int preSlice = sliceIdx;
- positionImpl( newPosition );
- if( CacheMode.FLUSH_NONE != cmode && preSlice != sliceIdx) {
- flushSlice(preSlice);
+
+ if ( totalSize == newPosition ) {
+ // EOF, pos == maxPos + 1
+ sliceIdx = Math.max(0, sliceCount - 1); // handle zero size
+ if( preSlice != sliceIdx ) {
+ flushSlice(preSlice, synchronous);
+ }
+ final ByteBuffer s = currentSlice();
+ s.position( s.capacity() );
+ } else {
+ sliceIdx = (int)( newPosition >>> sliceShift );
+ if( preSlice != sliceIdx ) {
+ flushSlice(preSlice, synchronous);
+ }
+ currentSlice().position( (int)( newPosition - ( (long)sliceIdx << sliceShift ) ) );
}
return this;
}
- private final synchronized void positionImpl( final long newPosition ) throws IOException {
+ private final synchronized void position2( final long newPosition ) throws IOException {
if ( totalSize == newPosition ) {
// EOF, pos == maxPos + 1
sliceIdx = Math.max(0, sliceCount - 1); // handle zero size