diff options
author | Sven Gothel <[email protected]> | 2014-10-03 03:12:42 +0200 |
---|---|---|
committer | Sven Gothel <[email protected]> | 2014-10-03 03:12:42 +0200 |
commit | a7a3d5ab98ee0ad33fdef50bf081afeb8295ebe4 (patch) | |
tree | 8316bb60b02dfa3d1a61aed6640afeaaeab99c13 /src/java/com/jogamp/common/nio | |
parent | 00a9ee70054872712017b5a14b19aa92068c8420 (diff) |
MappedByteBuffer*Stream:
- Validate active and GC'ed mapped-buffer count
in cleanAllSlices() via close() ..
- Fix missing unmapping last buffer in notifyLengthChangeImpl(),
branch criteria was off by one.
- cleanSlice(..) now also issues cleanBuffer(..) on the GC'ed entry,
hence if WeakReference is still alive, enforce it's release.
- cleanBuffer(..) reverts FLUSH_PRE_HARD -> FLUSH_PRE_SOFT
in case of an error.
- flush() -> flush(boolean metaData) to expose FileChannel.force(metaData).
- Add synchronous mode, flushing/syncing the mapped buffers when
in READ_WRITE mapping mode and issue FileChannel.force() if not READ_ONLY.
Above is implemented via flush()/flushImpl(..) for buffers and FileChannel,
as well as in syncSlice(..) for buffers only.
flush*()/syncSlice() is covered by:
- setLength()
- notifyLengthChange*(..)
- nextSlice()
Always issue flushImpl() in close().
- Windows: Clean all buffers in setLength(),
otherwise Windows will report:
- Windows: Catch MappedByteBuffer.force() IOException
- Optimization of position(..)
position(..) is now standalone to allow issuing flushSlice(..)
before gathering the new mapped buffer.
This shall avoid one extra cache miss.
Hence rename positionImpl(..) -> position2(..).
- All MappedByteBufferOutputStream.write(..) methods
issue syncSlice(..) on the last written current slice
to ensure new 'synchronous' mode is honored.
+++
Unit tests:
- Ensure test files are being deleted
- TestByteBufferCopyStream: Reduced test file size to more sensible values.
-
Diffstat (limited to 'src/java/com/jogamp/common/nio')
-rw-r--r-- | src/java/com/jogamp/common/nio/MappedByteBufferInputStream.java | 281 | ||||
-rw-r--r-- | src/java/com/jogamp/common/nio/MappedByteBufferOutputStream.java | 49 |
2 files changed, 266 insertions, 64 deletions
diff --git a/src/java/com/jogamp/common/nio/MappedByteBufferInputStream.java b/src/java/com/jogamp/common/nio/MappedByteBufferInputStream.java index 5f91f64..1d4d78a 100644 --- a/src/java/com/jogamp/common/nio/MappedByteBufferInputStream.java +++ b/src/java/com/jogamp/common/nio/MappedByteBufferInputStream.java @@ -29,11 +29,13 @@ package com.jogamp.common.nio; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.io.PrintStream; import java.io.RandomAccessFile; import java.lang.ref.WeakReference; import java.lang.reflect.Method; import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; import java.security.AccessController; @@ -156,6 +158,8 @@ public class MappedByteBufferInputStream extends InputStream { private ByteBuffer[] slices; private WeakReference<ByteBuffer>[] slices2GC; private long totalSize; + private int slicesEntries, slices2GCEntries; + private boolean synchronous; private int refCount; @@ -169,6 +173,23 @@ public class MappedByteBufferInputStream extends InputStream { private long mark; final void dbgDump(final String prefix, final PrintStream out) { + int _slicesEntries = 0; + for(int i=0; i<sliceCount; i++) { + if( null != slices[i] ) { + _slicesEntries++; + } + } + int _slices2GCEntries = 0; + int _slices2GCAliveEntries = 0; + for(int i=0; i<sliceCount; i++) { + final WeakReference<ByteBuffer> ref = slices2GC[i]; + if( null != ref ) { + _slices2GCEntries++; + if( null != ref.get() ) { + _slices2GCAliveEntries++; + } + } + } long fcSz = 0, pos = 0, rem = 0; try { fcSz = fc.size(); @@ -187,7 +208,9 @@ public class MappedByteBufferInputStream extends InputStream { out.println(prefix+" refCount "+refCount+", fcSize "+fcSz+", totalSize "+totalSize); out.println(prefix+" position "+pos+", remaining "+rem); out.println(prefix+" mmode "+mmode+", cmode "+cmode+", fileResizeOp "+fileResizeOp); - out.println(prefix+" slice "+sliceIdx+" / "+sliceCount+" ("+sliceCount2+")"); + out.println(prefix+" slice "+sliceIdx+" / "+sliceCount+" ("+sliceCount2+"), synchronous "+synchronous); + out.println(prefix+" mapped "+slicesEntries+" / "+_slicesEntries); + out.println(prefix+" GC-queue "+slices2GCEntries+" / "+_slices2GCEntries+" (alive "+_slices2GCAliveEntries+")"); out.println(prefix+" sliceShift "+sliceShift+" -> "+(1L << sliceShift)); } @@ -203,7 +226,7 @@ public class MappedByteBufferInputStream extends InputStream { // trigger notifyLengthChange this.totalSize = -1; this.sliceCount = 0; - notifyLengthChange(totalSize); + notifyLengthChange( totalSize ); this.refCount = 1; this.cleanerInit = false; @@ -263,6 +286,28 @@ public class MappedByteBufferInputStream extends InputStream { this(fileChannel, FileChannel.MapMode.READ_ONLY, CacheMode.FLUSH_PRE_SOFT, DEFAULT_SLICE_SHIFT); } + /** + * Enable or disable synchronous mode. + * <p> + * If synchronous mode is enabled, mapped buffers will be {@link #flush(boolean) flushed} + * if {@link #notifyLengthChange(long) resized}, <i>written to</i> or {@link #close() closing} in {@link FileChannel.MapMode#READ_WRITE read-write} mapping mode. + * </p> + * <p> + * If synchronous mode is enabled, {@link FileChannel#force(boolean)} is issued + * if {@link #setLength(long) resizing} or {@link #close() closing} and not in {@link FileChannel.MapMode#READ_ONLY read-only} mapping mode. + * </p> + * @param s {@code true} to enable synchronous mode + */ + public final synchronized void setSynchronous(final boolean s) { + synchronous = s; + } + /** + * Return {@link #setSynchronous(boolean) synchronous mode}. + */ + public final synchronized boolean getSynchronous() { + return synchronous ; + } + final synchronized void checkOpen() throws IOException { if( 0 == refCount ) { throw new IOException("stream closed"); @@ -274,16 +319,15 @@ public class MappedByteBufferInputStream extends InputStream { if( 0 < refCount ) { refCount--; if( 0 == refCount ) { - for(int i=0; i<sliceCount; i++) { - cleanSlice(i); - } - if( mmode != FileChannel.MapMode.READ_ONLY ) { - fc.force(true); + try { + cleanAllSlices( true /* syncBuffer */ ); + } finally { + flushImpl(true /* metaData */, false /* syncBuffer */); + fc.close(); + mark = -1; + sliceIdx = -1; + super.close(); } - fc.close(); - mark = -1; - sliceIdx = -1; - super.close(); } } } @@ -307,15 +351,35 @@ public class MappedByteBufferInputStream extends InputStream { * <p> * User must have a {@link FileResizeOp} {@link #setFileResizeOp(FileResizeOp) registered} before. * </p> + * <p> + * Implementation calls {@link #notifyLengthChange(long)} after {@link FileResizeOp#setLength(long)}. + * </p> * @param newTotalSize the new total size * @throws IOException if no {@link FileResizeOp} has been {@link #setFileResizeOp(FileResizeOp) registered} * or if a buffer slice operation failed */ public final synchronized void setLength(final long newTotalSize) throws IOException { + final long currentPosition; + if( 0 != newTotalSize && totalSize != newTotalSize ) { + currentPosition = position(); + } else { + currentPosition = -1L; + } if( fc.size() != newTotalSize ) { + if( Platform.OSType.WINDOWS == Platform.getOSType() ) { + // On Windows, we have to close all mapped slices. + // Otherwise we will receive: + // java.io.IOException: The requested operation cannot be performed on a file with a user-mapped section open + // at java.io.RandomAccessFile.setLength(Native Method) + cleanAllSlices( synchronous ); + } fileResizeOp.setLength(newTotalSize); + if( synchronous ) { + // buffers will be synchronized in notifyLengthChangeImpl(..) + flushImpl( true /* metaData */, false /* syncBuffer */); + } } - notifyLengthChange(newTotalSize); + notifyLengthChangeImpl(newTotalSize, currentPosition); } /** @@ -328,6 +392,9 @@ public class MappedByteBufferInputStream extends InputStream { * @throws IOException if a buffer slice operation failed */ public final synchronized void notifyLengthChange(final long newTotalSize) throws IOException { + notifyLengthChangeImpl(newTotalSize, -1L); + } + private final synchronized void notifyLengthChangeImpl(final long newTotalSize, final long currentPosition) throws IOException { /* if( DEBUG ) { System.err.println("notifyLengthChange.0: "+totalSize+" -> "+newTotalSize); dbgDump("notifyLengthChange.0:", System.err); @@ -337,11 +404,7 @@ public class MappedByteBufferInputStream extends InputStream { return; } else if( 0 == newTotalSize ) { // ZERO - ensure one entry avoiding NULL checks - if( null != slices ) { - for(int i=0; i<sliceCount; i++) { - cleanSlice(i); - } - } + cleanAllSlices( synchronous ); @SuppressWarnings("unchecked") final WeakReference<ByteBuffer>[] newSlices2GC = new WeakReference[ 1 ]; slices2GC = newSlices2GC; @@ -352,7 +415,7 @@ public class MappedByteBufferInputStream extends InputStream { mark = -1; sliceIdx = 0; } else { - final long prePosition = position(); + final long prePosition = 0 <= currentPosition ? currentPosition : position(); final long sliceSize = 1L << sliceShift; final int newSliceCount = (int)( ( newTotalSize + ( sliceSize - 1 ) ) / sliceSize ); @@ -360,11 +423,13 @@ public class MappedByteBufferInputStream extends InputStream { final WeakReference<ByteBuffer>[] newSlices2GC = new WeakReference[ newSliceCount ]; final ByteBuffer[] newSlices = new ByteBuffer[ newSliceCount ]; final int copySliceCount = Math.min(newSliceCount, sliceCount-1); // drop last (resize) - if( 0 < copySliceCount ) { - System.arraycopy(slices2GC, 0, newSlices2GC, 0, copySliceCount); - System.arraycopy(slices, 0, newSlices, 0, copySliceCount); + if( 0 <= copySliceCount ) { + if( 0 < copySliceCount ) { + System.arraycopy(slices2GC, 0, newSlices2GC, 0, copySliceCount); + System.arraycopy(slices, 0, newSlices, 0, copySliceCount); + } for(int i=copySliceCount; i<sliceCount; i++) { // clip shrunken slices + 1 (last), incl. slices2GC! - cleanSlice(i); + cleanSlice(i, synchronous); } } slices2GC = newSlices2GC; @@ -374,7 +439,7 @@ public class MappedByteBufferInputStream extends InputStream { if( newTotalSize < mark ) { mark = -1; } - positionImpl( Math.min(prePosition, newTotalSize) ); // -> clipped position (set currSlice and re-map/-pos buffer) + position2( Math.min(prePosition, newTotalSize) ); // -> clipped position (set currSlice and re-map/-pos buffer) } /* if( DEBUG ) { System.err.println("notifyLengthChange.X: "+slices[currSlice]); @@ -383,16 +448,34 @@ public class MappedByteBufferInputStream extends InputStream { } /** - * + * Similar to {@link OutputStream#flush()}, synchronizes all mapped buffers + * from local storage via {@link MappedByteBuffer#force()} + * as well as the {@link FileChannel#force(boolean)} w/o {@code metaData}. + * @param metaData TODO * @throws IOException if this stream has been {@link #close() closed}. */ - public final synchronized void flush() throws IOException { + public final synchronized void flush(final boolean metaData) throws IOException { checkOpen(); - if( mmode != FileChannel.MapMode.READ_ONLY ) { - fc.force(true); + flushImpl(metaData, true); + } + private final synchronized void flushImpl(final boolean metaData, final boolean syncBuffer) throws IOException { + if( FileChannel.MapMode.READ_ONLY != mmode ) { + if( syncBuffer && FileChannel.MapMode.READ_WRITE == mmode ) { + for(int i=0; i<sliceCount; i++) { + syncSlice(slices[i], true); + } + for(int i=0; i<sliceCount; i++) { + final WeakReference<ByteBuffer> ref = slices2GC[i]; + if( null != ref ) { + syncSlice(ref.get(), true); + } + } + } + fc.force(metaData); } } + /** * Returns a new MappedByteBufferOutputStream instance sharing * all resources of this input stream, including all buffer slices. @@ -422,23 +505,28 @@ public class MappedByteBufferInputStream extends InputStream { * @throws IOException if a buffer slice operation failed. */ public final synchronized ByteBuffer currentSlice() throws IOException { - if ( null != slices[sliceIdx] ) { - return slices[sliceIdx]; + final ByteBuffer s0 = slices[sliceIdx]; + if ( null != s0 ) { + return s0; } else { if( CacheMode.FLUSH_PRE_SOFT == cmode ) { final WeakReference<ByteBuffer> ref = slices2GC[sliceIdx]; if( null != ref ) { final ByteBuffer mbb = ref.get(); slices2GC[sliceIdx] = null; + slices2GCEntries--; if( null != mbb ) { slices[sliceIdx] = mbb; + slicesEntries++; return mbb; } } } final long pos = (long)sliceIdx << sliceShift; - slices[sliceIdx] = fc.map(mmode, pos, Math.min(1L << sliceShift, totalSize - pos)); - return slices[sliceIdx]; + final MappedByteBuffer s1 = fc.map(mmode, pos, Math.min(1L << sliceShift, totalSize - pos)); + slices[sliceIdx] = s1; + slicesEntries++; + return s1; } } @@ -453,9 +541,7 @@ public class MappedByteBufferInputStream extends InputStream { */ public final synchronized ByteBuffer nextSlice() throws IOException { if ( sliceIdx < sliceCount - 1 ) { - if( CacheMode.FLUSH_NONE != cmode ) { - flushSlice(sliceIdx); - } + flushSlice(sliceIdx, synchronous); sliceIdx++; final ByteBuffer slice = currentSlice(); slice.position( 0 ); @@ -465,46 +551,109 @@ public class MappedByteBufferInputStream extends InputStream { } } - private synchronized void flushSlice(final int i) throws IOException { + synchronized void syncSlice(final ByteBuffer s) throws IOException { + syncSlice(s, synchronous); + } + synchronized void syncSlice(final ByteBuffer s, final boolean syncBuffer) throws IOException { + if( syncBuffer && null != s && FileChannel.MapMode.READ_WRITE == mmode ) { + try { + ((MappedByteBuffer)s).force(); + } catch( final Throwable t ) { + // On Windows .. this may happen, like: + // java.io.IOException: The process cannot access the file because another process has locked a portion of the file + // at java.nio.MappedByteBuffer.force0(Native Method) + // at java.nio.MappedByteBuffer.force(MappedByteBuffer.java:203) + if( DEBUG ) { + System.err.println("Caught "+t.getMessage()); + t.printStackTrace(); + } + } + } + } + private synchronized void flushSlice(final int i, final boolean syncBuffer) throws IOException { final ByteBuffer s = slices[i]; if ( null != s ) { - slices[i] = null; // GC a slice is enough - if( CacheMode.FLUSH_PRE_HARD == cmode ) { - if( !cleanBuffer(s) ) { - cmode = CacheMode.FLUSH_PRE_SOFT; + if( CacheMode.FLUSH_NONE != cmode ) { + slices[i] = null; // trigger slice GC + slicesEntries--; + if( CacheMode.FLUSH_PRE_HARD == cmode ) { + if( !cleanBuffer(s, syncBuffer) ) { + // buffer already synced in cleanBuffer(..) if requested + slices2GC[i] = new WeakReference<ByteBuffer>(s); + slices2GCEntries++; + } + } else { + syncSlice(s, syncBuffer); slices2GC[i] = new WeakReference<ByteBuffer>(s); + slices2GCEntries++; } } else { - slices2GC[i] = new WeakReference<ByteBuffer>(s); + syncSlice(s, syncBuffer); } } } - private synchronized void cleanSlice(final int i) { - final ByteBuffer s = slices[i]; - if( null != s ) { + private synchronized void cleanAllSlices(final boolean syncBuffers) throws IOException { + if( null != slices ) { + for(int i=0; i<sliceCount; i++) { + cleanSlice(i, syncBuffers); + } + if( 0 != slicesEntries || 0 != slices2GCEntries ) { // FIXME + final String err = "mappedSliceCount "+slicesEntries+", slices2GCEntries "+slices2GCEntries; + dbgDump(err+": ", System.err); + throw new InternalError(err); + } + } + } + + private synchronized void cleanSlice(final int i, final boolean syncBuffer) throws IOException { + final ByteBuffer s1 = slices[i]; + final ByteBuffer s2; + { + final WeakReference<ByteBuffer> ref = slices2GC[i]; + slices2GC[i] = null; + if( null != ref ) { + slices2GCEntries--; + s2 = ref.get(); + } else { + s2 = null; + } + } + if( null != s1 ) { slices[i] = null; - cleanBuffer(s); + slicesEntries--; + cleanBuffer(s1, syncBuffer); + if( null != s2 ) { + throw new InternalError("XXX"); + } + } else if( null != s2 ) { + cleanBuffer(s2, syncBuffer); } - slices2GC[i] = null; } - private synchronized boolean cleanBuffer(final ByteBuffer mbb) { + private synchronized boolean cleanBuffer(final ByteBuffer mbb, final boolean syncBuffer) throws IOException { if( !cleanerInit ) { initCleaner(mbb); } - if ( !hasCleaner || !mbb.isDirect() ) { + syncSlice(mbb, syncBuffer); + if( !mbb.isDirect() ) { return false; } - try { - cClean.invoke(mbbCleaner.invoke(mbb)); - return true; - } catch(final Throwable t) { - hasCleaner = false; - if( DEBUG ) { - System.err.println("Caught "+t.getMessage()); - t.printStackTrace(); + boolean res = false; + if ( hasCleaner ) { + try { + cClean.invoke(mbbCleaner.invoke(mbb)); + res = true; + } catch(final Throwable t) { + hasCleaner = false; + if( DEBUG ) { + System.err.println("Caught "+t.getMessage()); + t.printStackTrace(); + } } - return false; } + if( !res && CacheMode.FLUSH_PRE_HARD == cmode ) { + cmode = CacheMode.FLUSH_PRE_SOFT; + } + return res; } private synchronized void initCleaner(final ByteBuffer bb) { final Method[] _mbbCleaner = { null }; @@ -618,13 +767,25 @@ public class MappedByteBufferInputStream extends InputStream { throw new IllegalArgumentException("new position "+newPosition+" not within [0.."+totalSize+"]"); } final int preSlice = sliceIdx; - positionImpl( newPosition ); - if( CacheMode.FLUSH_NONE != cmode && preSlice != sliceIdx) { - flushSlice(preSlice); + + if ( totalSize == newPosition ) { + // EOF, pos == maxPos + 1 + sliceIdx = Math.max(0, sliceCount - 1); // handle zero size + if( preSlice != sliceIdx ) { + flushSlice(preSlice, synchronous); + } + final ByteBuffer s = currentSlice(); + s.position( s.capacity() ); + } else { + sliceIdx = (int)( newPosition >>> sliceShift ); + if( preSlice != sliceIdx ) { + flushSlice(preSlice, synchronous); + } + currentSlice().position( (int)( newPosition - ( (long)sliceIdx << sliceShift ) ) ); } return this; } - private final synchronized void positionImpl( final long newPosition ) throws IOException { + private final synchronized void position2( final long newPosition ) throws IOException { if ( totalSize == newPosition ) { // EOF, pos == maxPos + 1 sliceIdx = Math.max(0, sliceCount - 1); // handle zero size diff --git a/src/java/com/jogamp/common/nio/MappedByteBufferOutputStream.java b/src/java/com/jogamp/common/nio/MappedByteBufferOutputStream.java index f84e6c2..8adf0e4 100644 --- a/src/java/com/jogamp/common/nio/MappedByteBufferOutputStream.java +++ b/src/java/com/jogamp/common/nio/MappedByteBufferOutputStream.java @@ -79,6 +79,19 @@ public class MappedByteBufferOutputStream extends OutputStream { } /** + * See {@link MappedByteBufferInputStream#setSynchronous(boolean)}. + */ + public final synchronized void setSynchronous(final boolean s) { + parent.setSynchronous(s); + } + /** + * See {@link MappedByteBufferInputStream#getSynchronous()}. + */ + public final synchronized boolean getSynchronous() { + return parent.getSynchronous(); + } + + /** * See {@link MappedByteBufferInputStream#setLength(long)}. */ public final synchronized void setLength(final long newTotalSize) throws IOException { @@ -129,7 +142,15 @@ public class MappedByteBufferOutputStream extends OutputStream { @Override public final synchronized void flush() throws IOException { - parent.flush(); + parent.flush( true /* metaData */); + } + + /** + * See {@link MappedByteBufferInputStream#flush(boolean)}. + */ + // @Override + public final synchronized void flush(final boolean metaData) throws IOException { + parent.flush(metaData); } @Override @@ -156,6 +177,11 @@ public class MappedByteBufferOutputStream extends OutputStream { } } slice.put( (byte)(b & 0xFF) ); + + // sync last buffer (happens only in synchronous mode) + if( null != slice ) { + parent.syncSlice(slice); + } } @Override @@ -178,8 +204,9 @@ public class MappedByteBufferOutputStream extends OutputStream { parent.setLength( parent.length() + len - totalRem ); } int written = 0; + ByteBuffer slice = null; while( written < len ) { - ByteBuffer slice = parent.currentSlice(); + slice = parent.currentSlice(); int currRem = slice.remaining(); if ( 0 == currRem ) { if ( null == ( slice = parent.nextSlice() ) ) { @@ -197,6 +224,10 @@ public class MappedByteBufferOutputStream extends OutputStream { slice.put( b, off + written, currLen ); written += currLen; } + // sync last buffer (happens only in synchronous mode) + if( null != slice ) { + parent.syncSlice(slice); + } } /** @@ -221,8 +252,9 @@ public class MappedByteBufferOutputStream extends OutputStream { parent.setLength( parent.length() + len - totalRem ); } int written = 0; + ByteBuffer slice = null; while( written < len ) { - ByteBuffer slice = parent.currentSlice(); + slice = parent.currentSlice(); int currRem = slice.remaining(); if ( 0 == currRem ) { if ( null == ( slice = parent.nextSlice() ) ) { @@ -257,6 +289,10 @@ public class MappedByteBufferOutputStream extends OutputStream { } written += currLen; } + // sync last buffer (happens only in synchronous mode) + if( null != slice ) { + parent.syncSlice(slice); + } } /** @@ -285,8 +321,9 @@ public class MappedByteBufferOutputStream extends OutputStream { parent.setLength( parent.length() + len - totalRem ); } long written = 0; + ByteBuffer slice = null; while( written < len ) { - ByteBuffer slice = parent.currentSlice(); + slice = parent.currentSlice(); int currRem = slice.remaining(); if ( 0 == currRem ) { if ( null == ( slice = parent.nextSlice() ) ) { @@ -306,5 +343,9 @@ public class MappedByteBufferOutputStream extends OutputStream { } written += currLen; } + // sync last buffer (happens only in synchronous mode) + if( null != slice ) { + parent.syncSlice(slice); + } } } |