diff options
author | Sven Gothel <[email protected]> | 2014-09-25 23:51:04 +0200 |
---|---|---|
committer | Sven Gothel <[email protected]> | 2014-09-25 23:51:04 +0200 |
commit | ae17a5895088e321bc373318cc1e144a2f822f29 (patch) | |
tree | 77dafc05c93082d60e2849ed1bbba25d5ef084a3 | |
parent | 1350823035597f784f9cf871aa487f896f3d1840 (diff) |
Bug 1080 - Add read support for memory mapped big file I/O via specialized InputStream impl., incl. mark/reset
- ByteBufferInputStream simply impl. InputStream for an arbitrary 2MiB restricted ByteBuffer
- Users may only need a smaller implementation for 'smaller' file sizes
or for streaming a [native] ByteBuffer.
- MappedByteBufferInputStream impl. InputStream for any file size,
while slicing the total size to memory mapped buffers via the given FileChannel.
The latter are mapped lazily and diff. flush/cache methods are supported
to ease virtual memory usage.
- TestByteBufferInputStream: Basic unit test for basic functionality and perf. stats.
-rwxr-xr-x | make/scripts/runtest.sh | 6 | ||||
-rw-r--r-- | src/java/com/jogamp/common/nio/ByteBufferInputStream.java | 183 | ||||
-rw-r--r-- | src/java/com/jogamp/common/nio/MappedByteBufferInputStream.java | 487 | ||||
-rw-r--r-- | src/junit/com/jogamp/common/nio/TestByteBufferInputStream.java | 334 |
4 files changed, 1008 insertions, 2 deletions
diff --git a/make/scripts/runtest.sh b/make/scripts/runtest.sh index 5df5888..fa3d8ae 100755 --- a/make/scripts/runtest.sh +++ b/make/scripts/runtest.sh @@ -48,7 +48,7 @@ rm -f $LOG #D_ARGS="-Djogamp.debug.IOUtil -Djogamp.debug.JNILibLoader -Djogamp.debug.TempFileCache -Djogamp.debug.JarUtil -Djava.io.tmpdir=/run/tmp" #D_ARGS="-Djogamp.debug.IOUtil -Djogamp.debug.JNILibLoader -Djogamp.debug.TempFileCache -Djogamp.debug.JarUtil -Djogamp.debug.TempJarCache" #D_ARGS="-Djogamp.debug.IOUtil -Djogamp.debug.JarUtil -Djogamp.debug.TempJarCache -Djogamp.debug.Uri -Djogamp.debug.Uri.ShowFix" -D_ARGS="-Djogamp.debug.Uri -Djogamp.debug.Uri.ShowFix" +#D_ARGS="-Djogamp.debug.Uri -Djogamp.debug.Uri.ShowFix" #D_ARGS="-Djogamp.debug.JNILibLoader -Djogamp.gluegen.UseTempJarCache=false" #D_ARGS="-Djogamp.debug.JNILibLoader -Djogamp.debug.TempJarCache" #D_ARGS="-Djogamp.debug.JNILibLoader" @@ -56,6 +56,7 @@ D_ARGS="-Djogamp.debug.Uri -Djogamp.debug.Uri.ShowFix" #D_ARGS="-Djogamp.debug.Lock -Djogamp.debug.Lock.TraceLock" #D_ARGS="-Djogamp.debug.Lock.TraceLock" #D_ARGS="-Djogamp.debug.IOUtil" +#D_ARGS="-Djogamp.debug.ByteBufferInputStream" #D_ARGS="-Djogamp.debug.Bitstream" #D_ARGS="-Djogamp.debug=all" @@ -112,7 +113,7 @@ function onetest() { #onetest com.jogamp.common.util.TestBitstream04 2>&1 | tee -a $LOG #onetest com.jogamp.common.net.TestUrisWithAssetHandler 2>&1 | tee -a $LOG #onetest com.jogamp.common.net.TestUriQueryProps 2>&1 | tee -a $LOG -onetest com.jogamp.common.net.TestUri01 2>&1 | tee -a $LOG +#onetest com.jogamp.common.net.TestUri01 2>&1 | tee -a $LOG #onetest com.jogamp.common.net.TestUri02Composing 2>&1 | tee -a $LOG #onetest com.jogamp.common.net.TestUri03Resolving 2>&1 | tee -a $LOG #onetest com.jogamp.common.net.TestUri99LaunchOnReservedCharPathBug908 2>&1 | tee -a $LOG @@ -122,6 +123,7 @@ onetest com.jogamp.common.net.TestUri01 2>&1 | tee -a $LOG #onetest com.jogamp.common.nio.TestBuffersFloatDoubleConversion 2>&1 | tee -a $LOG #onetest com.jogamp.common.nio.TestPointerBufferEndian 2>&1 | tee -a $LOG #onetest com.jogamp.common.nio.TestStructAccessorEndian 2>&1 | tee -a $LOG +onetest com.jogamp.common.nio.TestByteBufferInputStream 2>&1 | tee -a $LOG #onetest com.jogamp.common.os.TestElfReader01 2>&1 | tee -a $LOG #onetest com.jogamp.gluegen.PCPPTest 2>&1 | tee -a $LOG #onetest com.jogamp.gluegen.test.junit.generation.Test1p1JavaEmitter 2>&1 | tee -a $LOG diff --git a/src/java/com/jogamp/common/nio/ByteBufferInputStream.java b/src/java/com/jogamp/common/nio/ByteBufferInputStream.java new file mode 100644 index 0000000..5b6f121 --- /dev/null +++ b/src/java/com/jogamp/common/nio/ByteBufferInputStream.java @@ -0,0 +1,183 @@ +/** + * Copyright 2014 JogAmp Community. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are + * permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this list of + * conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this list + * of conditions and the following disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY JogAmp Community ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND + * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JogAmp Community OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are those of the + * authors and should not be interpreted as representing official policies, either expressed + * or implied, of JogAmp Community. + */ +package com.jogamp.common.nio; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; + +/** + * An {@link InputStream} implementation based on an underlying {@link ByteBuffer} + * supporting {@link #markSupported() mark}. + * <p> + * May be utilized as well with a {@link MappedByteBuffer memory-mapped} {@link FileChannel#map(MapMode, long, long) FileChannel} + * using a size ≤ {@link Integer#MAX_VALUE}.<br> + * This becomes efficient with files ≥ 10 MiB, depending on the platform + * and whether the traditional method uses a {@link BufferedInputStream} supporting {@code mark} incl. it's buffer size.<br> + * See test case {@code com.jogamp.common.nio.TestByteBufferInputStream}. + * </p> + * @since 2.3.0 + */ +public class ByteBufferInputStream extends InputStream { + private final ByteBuffer buf; + private int mark; + + /** + * Creates a new byte-buffer input stream. + * + * @param buf the underlying byte buffer. + */ + public ByteBufferInputStream(final ByteBuffer buf) { + this.buf = buf; + this.mark = -1; + } + + @Override + public final int available() { + return buf.remaining(); + } + + /** + * <i>This implementation supports {@code mark}.</i> + * <p> + * {@inheritDoc} + * </p> + */ + @Override + public final boolean markSupported() { + return true; + } + + /** + * <i>This implementation supports {@code mark}.</i> + * <p> + * {@inheritDoc} + * </p> + * @see #markSupported() + */ + @Override + public final synchronized void mark(final int unused) { + mark = buf.position(); + } + + /** + * <i>This implementation supports {@code mark}.</i> + * <p> + * {@inheritDoc} + * </p> + * @see #markSupported() + */ + @Override + public final synchronized void reset() throws IOException { + if ( mark == -1 ) { + throw new IOException(); + } + buf.position(mark); + } + + @Override + public final long skip(final long n) throws IOException { + if( 0 > n ) { + return 0; + } + final int s = (int) Math.min( buf.remaining(), n ); + buf.position(buf.position() + s); + return s; + } + + @Override + public final int read() { + if ( ! buf.hasRemaining() ) { + return -1; + } + return buf.get() & 0xFF; + } + + @Override + public final int read(final byte[] b, final int off, final int len) { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException("offset "+off+", length "+len+", b.length "+b.length); + } + if ( 0 == len ) { + return 0; + } + final int totalRem = buf.remaining(); + if ( 0 == totalRem ) { + return -1; + } + final int maxLen = Math.min(totalRem, len); + if( buf.hasArray() ) { + System.arraycopy(buf.array(), buf.arrayOffset() + buf.position(), b, off, maxLen); + buf.position( buf.position() + maxLen ); + } else { + buf.get(b, off, maxLen); + } + return maxLen; + } + + // @Override + public final int read(final ByteBuffer b, final int len) { + if (b == null) { + throw new NullPointerException(); + } else if (len < 0 || len > b.remaining()) { + throw new IndexOutOfBoundsException("length "+len+", b "+b); + } + if ( 0 == len ) { + return 0; + } + final int remaining = buf.remaining(); + if ( 0 == remaining ) { + return -1; + } + final int maxLen = Math.min(remaining, len); + if( buf.hasArray() && b.hasArray() ) { + System.arraycopy(buf.array(), buf.arrayOffset() + buf.position(), b.array(), b.arrayOffset() + b.position(), maxLen); + buf.position( buf.position() + maxLen ); + b.position( b.position() + maxLen ); + } else if( maxLen == remaining ) { + b.put(buf); + } else { + final int _limit = buf.limit(); + buf.limit(maxLen); + try { + b.put(buf); + } finally { + buf.limit(_limit); + } + } + return maxLen; + } + + public final ByteBuffer getBuffer() { return buf; } +} diff --git a/src/java/com/jogamp/common/nio/MappedByteBufferInputStream.java b/src/java/com/jogamp/common/nio/MappedByteBufferInputStream.java new file mode 100644 index 0000000..52f5b5a --- /dev/null +++ b/src/java/com/jogamp/common/nio/MappedByteBufferInputStream.java @@ -0,0 +1,487 @@ +/** + * Copyright 2014 JogAmp Community. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are + * permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this list of + * conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this list + * of conditions and the following disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY JogAmp Community ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND + * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JogAmp Community OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are those of the + * authors and should not be interpreted as representing official policies, either expressed + * or implied, of JogAmp Community. + */ +package com.jogamp.common.nio; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.ref.WeakReference; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.security.AccessController; +import java.security.PrivilegedAction; + +import jogamp.common.Debug; + +import com.jogamp.common.os.Platform; + +/** + * An {@link InputStream} implementation based on an underlying {@link MappedByteBuffer} + * supporting {@link #markSupported() mark}. + * <p> + * Intended to be utilized with a {@link MappedByteBuffer memory-mapped} {@link FileChannel#map(MapMode, long, long) FileChannel} + * beyond its size limitation of {@link Integer#MAX_VALUE}.<br> + * </p> + * @since 2.3.0 + */ +public class MappedByteBufferInputStream extends InputStream { + public static enum CacheMode { + /** + * Keep all previous lazily cached buffer slices alive, useful for hopping readers, + * i.e. random access via {@link MappedByteBufferInputStream#position(long) position(p)} + * or {@link MappedByteBufferInputStream#reset() reset()}. + */ + FLUSH_NONE, + /** + * Soft flush the previous lazily cached buffer slice when caching the next buffer slice, + * useful for sequential forward readers, as well as for hopping readers like {@link #FLUSH_NONE} + * in case of relatively short periods between hopping across slices. + * <p> + * Implementation clears the buffer slice reference + * while preserving a {@link WeakReference} to allow its resurrection if not yet + * {@link System#gc() garbage collected}. + * </p> + * <p> + * This is the default. + * </p> + */ + FLUSH_PRE_SOFT, + /** + * Hard flush the previous lazily cached buffer slice when caching the next buffer slice, + * useful for sequential forward readers. + * <p> + * Besides clearing the buffer slice reference, + * implementation attempts to hard flush the mapped buffer + * using a {@code sun.misc.Cleaner} by reflection. + * In case such method does not exist nor works, implementation falls back to {@link #FLUSH_PRE_SOFT}. + * </p> + */ + FLUSH_PRE_HARD + }; + + /** + * Default slice shift, i.e. 1L << shift, denoting slice size in MiB: + * <ul> + * <li>{@link Platform#is64Bit() 64bit machines} -> 30 = 1024 MiB</li> + * <li>{@link Platform#is32Bit() 32bit machines} -> 29 = 512 MiB</li> + * </ul> + * <p> + * In case the default is too much of-used up address-space, one may choose other values: + * <ul> + * <li>29 -> 512 MiB</li> + * <li>28 -> 256 MiB</li> + * <li>27 -> 128 MiB</li> + * <li>26 -> 64 MiB</li> + * </ul> + * </p> + */ + public static final int DEFAULT_SLICE_SHIFT; + + private static final boolean DEBUG; + + static { + Platform.initSingleton(); + if( Platform.is32Bit() ) { + DEFAULT_SLICE_SHIFT = 29; + } else { + DEFAULT_SLICE_SHIFT = 30; + } + + DEBUG = Debug.debug("ByteBufferInputStream"); + } + + private final int sliceShift; + private final FileChannel fc; + private final FileChannel.MapMode mmode; + private final MappedByteBuffer[] slices; + private final WeakReference<MappedByteBuffer>[] slices2GC; + private final int sliceCount; + private final long totalSize; + + private Method mbbCleaner; + private Method cClean; + private boolean cleanerInit; + private boolean hasCleaner; + private CacheMode cmode; + + private int currSlice; + private long mark; + + @SuppressWarnings("unchecked") + MappedByteBufferInputStream(final FileChannel fc, final FileChannel.MapMode mmode, final CacheMode cmode, + final int sliceShift, final MappedByteBuffer[] bufs, final long totalSize, + final int currSlice) throws IOException { + this.sliceShift = sliceShift; + this.fc = fc; + this.mmode = mmode; + this.slices = bufs; + this.sliceCount = bufs.length; + this.slices2GC = new WeakReference[sliceCount]; + this.totalSize = totalSize; + if( 0 >= totalSize || 0 >= sliceCount ) { + throw new IllegalArgumentException("Zero sized: total "+totalSize+", slices "+sliceCount); + } + + this.cleanerInit = false; + this.hasCleaner = false; + this.cmode = cmode; + + this.currSlice = currSlice; + this.mark = -1; + + slice(currSlice).position(0); + } + + /** + * Creates a new instance using the given {@link FileChannel}, + * {@link FileChannel.MapMode#READ_ONLY read-only} mapping mode, {@link CacheMode#FLUSH_PRE_SOFT} + * and the {@link #DEFAULT_SLICE_SHIFT}. + * <p> + * The {@link MappedByteBuffer} slices will be mapped {@link FileChannel.MapMode#READ_ONLY} lazily at first usage. + * </p> + * @param fileChannel the file channel to be used. + * @throws IOException + */ + public static MappedByteBufferInputStream create(final FileChannel fileChannel) throws IOException { + return create(fileChannel, FileChannel.MapMode.READ_ONLY, CacheMode.FLUSH_PRE_SOFT, DEFAULT_SLICE_SHIFT); + } + + /** + * Creates a new instance using the given {@link FileChannel}, + * {@link FileChannel.MapMode#READ_ONLY read-only} mapping mode and the {@link #DEFAULT_SLICE_SHIFT}. + * <p> + * The {@link MappedByteBuffer} slices will be mapped {@link FileChannel.MapMode#READ_ONLY} lazily at first usage. + * </p> + * @param fileChannel the file channel to be used. + * @param cmode the caching mode, default is {@link CacheMode#FLUSH_PRE_SOFT}. + * @throws IOException + */ + public static MappedByteBufferInputStream create(final FileChannel fileChannel, final CacheMode cmode) throws IOException { + return create(fileChannel, FileChannel.MapMode.READ_ONLY, cmode, DEFAULT_SLICE_SHIFT); + } + + /** + * Creates a new instance using the given {@link FileChannel}. + * <p> + * The {@link MappedByteBuffer} slices will be mapped lazily at first usage. + * </p> + * @param fileChannel the file channel to be mapped lazily. + * @param mmode the map mode, default is {@link FileChannel.MapMode#READ_ONLY}. + * @param cmode the caching mode, default is {@link CacheMode#FLUSH_PRE_SOFT}. + * @param sliceShift the pow2 slice size, default is {@link #DEFAULT_SLICE_SHIFT}. + * @throws IOException + */ + public static MappedByteBufferInputStream create(final FileChannel fileChannel, + final FileChannel.MapMode mmode, + final CacheMode cmode, + final int sliceShift) throws IOException { + final long sliceSize = 1L << sliceShift; + final long totalSize = fileChannel.size(); + final int sliceCount = (int)( ( totalSize + ( sliceSize - 1 ) ) / sliceSize ); + final MappedByteBuffer[] bufs = new MappedByteBuffer[ sliceCount ]; + return new MappedByteBufferInputStream(fileChannel, mmode, cmode, sliceShift, bufs, totalSize, 0); + } + + @Override + public final synchronized void close() throws IOException { + for(int i=0; i<sliceCount; i++) { + final MappedByteBuffer s = slices[i]; + if( null != s ) { + slices[i] = null; + cleanBuffer(s); + } + slices2GC[i] = null; + } + if( mmode != FileChannel.MapMode.READ_ONLY ) { + fc.force(true); + } + fc.close(); + mark = -1; + currSlice = -1; + super.close(); + } + + private synchronized MappedByteBuffer slice(final int i) throws IOException { + if ( null != slices[i] ) { + return slices[i]; + } else { + if( CacheMode.FLUSH_PRE_SOFT == cmode ) { + final WeakReference<MappedByteBuffer> ref = slices2GC[i]; + if( null != ref ) { + final MappedByteBuffer mbb = ref.get(); + slices2GC[i] = null; + if( null != mbb ) { + slices[i] = mbb; + return mbb; + } + } + } + final long pos = (long)i << sliceShift; + slices[i] = fc.map(mmode, pos, Math.min(1L << sliceShift, totalSize - pos)); + return slices[i]; + } + } + + private synchronized void flushSlice(final int i) throws IOException { + final MappedByteBuffer s = slices[i]; + if ( null != s ) { + slices[i] = null; // GC a slice is enough + if( CacheMode.FLUSH_PRE_HARD == cmode ) { + if( !cleanBuffer(s) ) { + cmode = CacheMode.FLUSH_PRE_SOFT; + slices2GC[i] = new WeakReference<MappedByteBuffer>(s); + } + } else { + slices2GC[i] = new WeakReference<MappedByteBuffer>(s); + } + } + } + private synchronized boolean cleanBuffer(final MappedByteBuffer mbb) { + if( !cleanerInit ) { + initCleaner(mbb); + } + if ( !hasCleaner || !mbb.isDirect() ) { + return false; + } + try { + cClean.invoke(mbbCleaner.invoke(mbb)); + return true; + } catch(final Throwable t) { + hasCleaner = false; + if( DEBUG ) { + System.err.println("Caught "+t.getMessage()); + t.printStackTrace(); + } + return false; + } + } + private synchronized void initCleaner(final ByteBuffer bb) { + final Method[] _mbbCleaner = { null }; + final Method[] _cClean = { null }; + AccessController.doPrivileged(new PrivilegedAction<Object>() { + @Override + public Object run() { + try { + _mbbCleaner[0] = bb.getClass().getMethod("cleaner"); + _mbbCleaner[0].setAccessible(true); + _cClean[0] = Class.forName("sun.misc.Cleaner").getMethod("clean"); + _cClean[0].setAccessible(true); + } catch(final Throwable t) { + if( DEBUG ) { + System.err.println("Caught "+t.getMessage()); + t.printStackTrace(); + } + } + return null; + } } ); + mbbCleaner = _mbbCleaner[0]; + cClean = _cClean[0]; + final boolean res = null != mbbCleaner && null != cClean; + if( DEBUG ) { + System.err.println("initCleaner: Has cleaner: "+res+", mbbCleaner "+mbbCleaner+", cClean "+cClean); + } + hasCleaner = res; + cleanerInit = true; + } + + /** + * Return the used {@link CacheMode}. + * <p> + * If a desired {@link CacheMode} is not available, it may fall back to an available one at runtime, + * see {@link CacheMode#FLUSH_PRE_HARD}.<br> + * This evaluation only happens if the {@link CacheMode} != {@link CacheMode#FLUSH_NONE} + * and while attempting to flush an unused buffer slice. + * </p> + */ + public final synchronized CacheMode getCacheMode() { return cmode; } + + /** + * Returns the total size in bytes of the {@link InputStream} + * <pre> + * <code>0 <= {@link #position()} <= {@link #length()}</code> + * </pre> + */ + // @Override + public final long length() { + return totalSize; + } + + /** + * Returns the number of remaining available bytes of the {@link InputStream}, + * i.e. <code>{@link #length()} - {@link #position()}</code>. + * <pre> + * <code>0 <= {@link #position()} <= {@link #length()}</code> + * </pre> + * <p> + * In contrast to {@link InputStream}'s {@link #available()} method, + * this method returns the proper return type {@code long}. + * </p> + * @throws IOException + */ + public final synchronized long remaining() throws IOException { + return totalSize - position(); + } + + /** + * <i>See {@link #remaining()} for an accurate variant.</i> + * <p> + * {@inheritDoc} + * </p> + */ + @Override + public final synchronized int available() throws IOException { + final long available = remaining(); + return available <= Integer.MAX_VALUE ? (int)available : Integer.MAX_VALUE; + } + + /** + * Returns the absolute position of the {@link InputStream}. + * <pre> + * <code>0 <= {@link #position()} <= {@link #length()}</code> + * </pre> + * @throws IOException + */ + // @Override + public final synchronized long position() throws IOException { + return ( (long)currSlice << sliceShift ) + slice( currSlice ).position(); + } + + /** + * Sets the absolute position of the {@link InputStream} to {@code newPosition}. + * <pre> + * <code>0 <= {@link #position()} <= {@link #length()}</code> + * </pre> + * @param newPosition The new position, which must be non-negative and ≤ {@link #length()}. + * @return this instance + * @throws IOException + */ + // @Override + public final synchronized MappedByteBufferInputStream position( final long newPosition ) throws IOException { + if ( totalSize < newPosition || 0 > newPosition ) { + throw new IllegalArgumentException("new position "+newPosition+" not within [0.."+totalSize+"]"); + } + final int preSlice = currSlice; + if ( totalSize == newPosition ) { + currSlice = sliceCount - 1; + final MappedByteBuffer s = slice( currSlice ); + s.position( s.capacity() ); + } else { + currSlice = (int)( newPosition >>> sliceShift ); + slice( currSlice ).position( (int)( newPosition - ( (long)currSlice << sliceShift ) ) ); + } + if( CacheMode.FLUSH_NONE != cmode && preSlice != currSlice) { + flushSlice(preSlice); + } + return this; + } + + @Override + public final boolean markSupported() { + return true; + } + + @Override + public final synchronized void mark( final int unused ) { + try { + mark = position(); + } catch (final IOException e) { + throw new RuntimeException(e); // FIXME: oops + } + } + + @Override + public final synchronized void reset() throws IOException { + if ( mark == -1 ) { + throw new IOException("mark not set"); + } + position( mark ); + } + + @Override + public final synchronized long skip( final long n ) throws IOException { + if( 0 > n ) { + return 0; + } + final long pos = position(); + final long rem = totalSize - pos; // remaining + final long s = Math.min( rem, n ); + position( pos + s ); + return s; + } + + @Override + public final synchronized int read() throws IOException { + if ( ! slice( currSlice ).hasRemaining() ) { + if ( currSlice < sliceCount - 1 ) { + final int preSlice = currSlice; + currSlice++; + slice( currSlice ).position( 0 ); + if( CacheMode.FLUSH_NONE != cmode ) { + flushSlice(preSlice); + } + } else { + return -1; + } + } + return slices[ currSlice ].get() & 0xFF; + } + + @Override + public final synchronized int read( final byte[] b, final int off, final int len ) throws IOException { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException("offset "+off+", length "+len+", b.length "+b.length); + } + if ( 0 == len ) { + return 0; + } + final long totalRem = remaining(); + if ( 0 == totalRem ) { + return -1; + } + final int maxLen = (int)Math.min( totalRem, len ); + int read = 0; + while( read < maxLen ) { + int currRem = slice( currSlice ).remaining(); + if ( 0 == currRem ) { + final int preSlice = currSlice; + currSlice++; + slice( currSlice ).position( 0 ); + currRem = slice( currSlice ).remaining(); + if( CacheMode.FLUSH_NONE != cmode ) { + flushSlice(preSlice); + } + } + slices[ currSlice ].get( b, off + read, Math.min( maxLen - read, currRem ) ); + read += Math.min( maxLen - read, currRem ); + } + return maxLen; + } +} diff --git a/src/junit/com/jogamp/common/nio/TestByteBufferInputStream.java b/src/junit/com/jogamp/common/nio/TestByteBufferInputStream.java new file mode 100644 index 0000000..f87fc43 --- /dev/null +++ b/src/junit/com/jogamp/common/nio/TestByteBufferInputStream.java @@ -0,0 +1,334 @@ +/** + * Copyright 2014 JogAmp Community. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are + * permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this list of + * conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, this list + * of conditions and the following disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY JogAmp Community ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND + * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JogAmp Community OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are those of the + * authors and should not be interpreted as representing official policies, either expressed + * or implied, of JogAmp Community. + */ +package com.jogamp.common.nio; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.jogamp.common.util.IOUtil; +import com.jogamp.junit.util.JunitTracer; + +import org.junit.FixMethodOrder; +import org.junit.runners.MethodSorters; + +/** + * Testing serial read of {@link ByteBufferInputStream} and {@link MappedByteBufferInputStream}, + * i.e. basic functionality only. + * <p> + * Focusing on comparison with {@link BufferedInputStream} regarding + * performance, used memory heap and used virtual memory. + * </p> + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class TestByteBufferInputStream extends JunitTracer { + /** {@value} */ + static final int buffer__8KiB = 1 << 13; + + /** {@value} */ + static final int halfMiB = 1 << 19; + /** {@value} */ + static final int oneMiB = 1 << 20; + /** {@value} */ + static final int tenMiB = 1 << 24; + /** {@value} */ + static final int hunMiB = 1 << 27; + /** {@value} */ + static final int halfGiB = 1 << 29; + /** {@value} */ + static final int oneGiB = 1 << 30; + /** {@value} */ + static final int twoGiB = Integer.MAX_VALUE; + /** {@value} */ + static final long fourGiB = 4L << 30L; + + static final String fileHalfMiB = "./testHalfMiB.bin" ; + static final String fileOneMiB = "./testOneMiB.bin" ; + static final String fileTenMiB = "./testTenMiB.bin" ; + static final String fileHunMiB = "./testHunMiB.bin" ; + static final String fileHalfGiB = "./testHalfGiB.bin" ; + static final String fileOneGiB = "./testOneGiB.bin" ; + static final String fileTwoGiB = "./testTwoGiB.bin" ; + static final String fileFourGiB = "./testFourGiB.bin" ; + static final String fileOut = "./testOut.bin" ; + + static final String printPrecision = "%8.3f"; + static final double mib = 1024.0*1024.0; + + + @BeforeClass + public static void setup() throws IOException { + final Runtime runtime = Runtime.getRuntime(); + System.err.printf("Total Memory : "+printPrecision+" MiB%n", runtime.totalMemory() / mib); + System.err.printf("Max Memory : "+printPrecision+" MiB%n", runtime.maxMemory() / mib); + + setup(fileHalfMiB, halfMiB); + setup(fileOneMiB, oneMiB); + setup(fileTenMiB, tenMiB); + setup(fileHunMiB, hunMiB); + setup(fileHalfGiB, halfGiB); + setup(fileOneGiB, oneGiB); + setup(fileTwoGiB, twoGiB); + setup(fileFourGiB, fourGiB); + } + static void setup(final String fname, final long size) throws IOException { + final File file = new File(fname); + final RandomAccessFile out = new RandomAccessFile(file, "rws"); + out.setLength(size); + out.close(); + } + + @AfterClass + public static void cleanup() { + cleanup(fileHalfMiB); + cleanup(fileOneMiB); + cleanup(fileTenMiB); + cleanup(fileHunMiB); + cleanup(fileHalfGiB); + cleanup(fileOneGiB); + cleanup(fileTwoGiB); + cleanup(fileFourGiB); + cleanup(fileOut); + } + static void cleanup(final String fname) { + final File file = new File(fname); + file.delete(); + } + + @Test + public void test01MixedIntSize() throws IOException { + testCopyIntSize1Impl(fileHalfMiB, halfMiB); + + testCopyIntSize1Impl(fileOneMiB, oneMiB); + + testCopyIntSize1Impl(fileTenMiB, tenMiB); + + testCopyIntSize1Impl(fileHunMiB, hunMiB); + + testCopyIntSize1Impl(fileHalfGiB, halfGiB); + + testCopyIntSize1Impl(fileOneGiB, oneGiB); + + // testCopyIntSize1Impl(fileTwoGiB, twoGiB); + } + + static enum SrcType { COPY, MMAP1, MMAP2_NONE, MMAP2_SOFT, MMAP2_HARD }; + + @Test + public void test11MMapFlushNone() throws IOException { + testCopyIntSize1Impl2(0, SrcType.MMAP2_NONE, 0, fileOneGiB, oneGiB); + testCopyIntSize1Impl2(0, SrcType.MMAP2_NONE, 0, fileTwoGiB, twoGiB); + testCopyIntSize1Impl2(0, SrcType.MMAP2_NONE, 0, fileFourGiB, fourGiB); + } + + @Test + public void test12MMapFlushSoft() throws IOException { + testCopyIntSize1Impl2(0, SrcType.MMAP2_SOFT, 0, fileOneGiB, oneGiB); + testCopyIntSize1Impl2(0, SrcType.MMAP2_SOFT, 0, fileTwoGiB, twoGiB); + testCopyIntSize1Impl2(0, SrcType.MMAP2_SOFT, 0, fileFourGiB, fourGiB); + } + + @Test + public void test13MMapFlushHard() throws IOException { + testCopyIntSize1Impl2(0, SrcType.MMAP2_HARD, 0, fileOneGiB, oneGiB); + testCopyIntSize1Impl2(0, SrcType.MMAP2_HARD, 0, fileTwoGiB, twoGiB); + testCopyIntSize1Impl2(0, SrcType.MMAP2_HARD, 0, fileFourGiB, fourGiB); + } + + void testCopyIntSize1Impl(final String testFileName, final long expSize) throws IOException { + testCopyIntSize1Impl(SrcType.COPY, buffer__8KiB, testFileName, expSize); + testCopyIntSize1Impl(SrcType.COPY, hunMiB, testFileName, expSize); + testCopyIntSize1Impl(SrcType.MMAP1, 0, testFileName, expSize); + testCopyIntSize1Impl(SrcType.MMAP2_SOFT, 0, testFileName, expSize); + System.err.println(); + } + void testCopyIntSize1Impl(final SrcType srcType, final int reqBufferSize, final String testFileName, final long expSize) throws IOException { + if( testCopyIntSize1Impl2(0, srcType, reqBufferSize, testFileName, expSize) ) { + if( testCopyIntSize1Impl2(1, srcType, reqBufferSize, testFileName, expSize) ) { + // testCopyIntSize1Impl2(2, srcType, reqBufferSize, testFileName, expSize); + } + } + System.err.println(); + } + boolean testCopyIntSize1Impl2(final int iter, final SrcType srcType, final int reqBufferSize, final String testFileName, final long expSize) throws IOException { + final int expSizeI = (int) ( expSize <= Integer.MAX_VALUE ? expSize : Integer.MAX_VALUE ); + final int bufferSize = reqBufferSize < expSizeI ? reqBufferSize : expSizeI; + final File testFile = new File(testFileName); + final long hasSize1 = testFile.length(); + final long t0 = System.currentTimeMillis(); + Assert.assertEquals(expSize, hasSize1); + + final Runtime runtime = Runtime.getRuntime(); + final long[] usedMem0 = { 0 }; + final long[] freeMem0 = { 0 }; + final long[] usedMem1 = { 0 }; + final long[] freeMem1 = { 0 }; + + final String prefix = "test #"+iter+" "+String.format(printPrecision+" MiB", expSize/mib); + System.err.printf("%s: mode %-5s, bufferSize %9d: BEGIN%n", prefix, srcType.toString(), bufferSize); + dumpMem(prefix+" before", runtime, -1, -1, usedMem0, freeMem0 ); + + final IOException[] ioe = { null }; + OutOfMemoryError oome = null; + InputStream bis = null; + FileInputStream fis = null; + FileChannel fic = null; + boolean isMappedByteBufferInputStream = false; + try { + fis = new FileInputStream(testFile); + if( SrcType.COPY == srcType ) { + if( hasSize1 > Integer.MAX_VALUE ) { + fis.close(); + throw new IllegalArgumentException("file size > MAX_INT for "+srcType+": "+hasSize1+" of "+testFile); + } + bis = new BufferedInputStream(fis, bufferSize); + } else if( SrcType.MMAP1 == srcType ) { + if( hasSize1 > Integer.MAX_VALUE ) { + fis.close(); + throw new IllegalArgumentException("file size > MAX_INT for "+srcType+": "+hasSize1+" of "+testFile); + } + fic = fis.getChannel(); + final MappedByteBuffer fmap = fic.map(FileChannel.MapMode.READ_ONLY, 0, hasSize1); // survives channel/stream closing until GC'ed! + bis = new ByteBufferInputStream(fmap); + } else { + isMappedByteBufferInputStream = true; + MappedByteBufferInputStream.CacheMode cmode; + switch(srcType) { + case MMAP2_NONE: cmode = MappedByteBufferInputStream.CacheMode.FLUSH_NONE; + break; + case MMAP2_SOFT: cmode = MappedByteBufferInputStream.CacheMode.FLUSH_PRE_SOFT; + break; + case MMAP2_HARD: cmode = MappedByteBufferInputStream.CacheMode.FLUSH_PRE_HARD; + break; + default: fis.close(); + throw new InternalError("XX: "+srcType); + } + final MappedByteBufferInputStream mis = MappedByteBufferInputStream.create(fis.getChannel(), cmode); + Assert.assertEquals(expSize, mis.remaining()); + Assert.assertEquals(expSize, mis.length()); + Assert.assertEquals(0, mis.position()); + bis = mis; + } + } catch (final IOException e) { + ioe[0] = e; + } catch (final OutOfMemoryError m) { + oome = m; // oops :) + } + try { + if( null != ioe[0] || null != oome ) { + if( null != oome ) { + System.err.printf("%s: mode %-5s, bufferSize %9d: OutOfMemoryError %s%n", + prefix, srcType.toString(), bufferSize, oome.getMessage()); + return false; + } else { + Assert.assertNull(ioe[0]); + } + } + Assert.assertEquals(expSizeI, bis.available()); + + final long t1 = System.currentTimeMillis(); + + final File out = new File(fileOut); + IOUtil.copyStream2File(bis, out, -1); + final long t2 = System.currentTimeMillis(); + + final String suffix; + if( isMappedByteBufferInputStream ) { + suffix = ", cacheMode "+((MappedByteBufferInputStream)bis).getCacheMode(); + } else { + suffix = ""; + } + System.err.printf("%s: mode %-5s, bufferSize %9d: total %5d, setup %5d, copy %5d ms%s%n", + prefix, srcType.toString(), bufferSize, t2-t0, t1-t0, t2-t1, suffix); + + Assert.assertEquals(expSize, out.length()); + out.delete(); + + Assert.assertEquals(0, bis.available()); + if( isMappedByteBufferInputStream ) { + final MappedByteBufferInputStream mis = (MappedByteBufferInputStream)bis; + Assert.assertEquals(0, mis.remaining()); + Assert.assertEquals(expSize, mis.length()); + Assert.assertEquals(expSize, mis.position()); + } + dumpMem(prefix+" after ", runtime, usedMem0[0], freeMem0[0], usedMem1, freeMem1 ); + System.gc(); + try { + Thread.sleep(500); + } catch (final InterruptedException e) { } + dumpMem(prefix+" gc'ed ", runtime, usedMem0[0], freeMem0[0], usedMem1, freeMem1 ); + } finally { + if( null != fic ) { + fic.close(); + } + if( null != fis ) { + fis.close(); + } + bis.close(); + System.err.printf("%s: mode %-5s, bufferSize %9d: END%n", prefix, srcType.toString(), bufferSize); + System.err.println(); + } + return true; + } + + static void dumpMem(final String pre, + final Runtime runtime, final long usedMem0, + final long freeMem0, final long[] usedMemN, + final long[] freeMemN ) + { + usedMemN[0] = runtime.totalMemory() - runtime.freeMemory(); + freeMemN[0] = runtime.freeMemory(); + + System.err.printf("%s Used Memory : "+printPrecision, pre, usedMemN[0] / mib); + if( 0 < usedMem0 ) { + System.err.printf(", delta "+printPrecision, (usedMemN[0]-usedMem0) / mib); + } + System.err.println(" MiB"); + /** + System.err.printf("%s Free Memory : "+printPrecision, pre, freeMemN[0] / mib); + if( 0 < freeMem0 ) { + System.err.printf(", delta "+printPrecision, (freeMemN[0]-freeMem0) / mib); + } + System.err.println(" MiB"); */ + } + + public static void main(final String args[]) throws IOException { + final String tstname = TestByteBufferInputStream.class.getName(); + org.junit.runner.JUnitCore.main(tstname); + } +} |