001/*
002 * Licensed under the Apache License, Version 2.0 (the "License");
003 * you may not use this file except in compliance with the License.
004 * You may obtain a copy of the License at
005 *
006 *     http://www.apache.org/licenses/LICENSE-2.0
007 *
008 * Unless required by applicable law or agreed to in writing, software
009 * distributed under the License is distributed on an "AS IS" BASIS,
010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011 * See the License for the specific language governing permissions and
012 * limitations under the License.
013 */
014package org.apache.commons.io.input;
015
016import static org.apache.commons.io.IOUtils.EOF;
017
018import java.io.File;
019import java.io.IOException;
020import java.io.InputStream;
021import java.nio.ByteBuffer;
022import java.nio.channels.FileChannel;
023import java.nio.file.Path;
024import java.nio.file.StandardOpenOption;
025import java.util.Objects;
026
027import org.apache.commons.io.IOUtils;
028import org.apache.commons.io.build.AbstractOrigin;
029import org.apache.commons.io.build.AbstractStreamBuilder;
030
031/**
032 * {@link InputStream} implementation which uses direct buffer to read a file to avoid extra copy of data between Java and native memory which happens when
033 * using {@link java.io.BufferedInputStream}. Unfortunately, this is not something already available in JDK, {@code sun.nio.ch.ChannelInputStream} supports
034 * reading a file using NIO, but does not support buffering.
035 * <p>
036 * To build an instance, see {@link Builder}.
037 * </p>
038 * <p>
039 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 where it was called {@code NioBufferedFileInputStream}.
040 * </p>
041 *
042 * @since 2.9.0
043 */
044public final class BufferedFileChannelInputStream extends InputStream {
045
046    /**
047     * Builds a new {@link BufferedFileChannelInputStream} instance.
048     * <p>
049     * Using File IO:
050     * </p>
051     *
052     * <pre>{@code
053     * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
054     *   .setFile(file)
055     *   .setBufferSize(4096)
056     *   .get();}
057     * </pre>
058     * <p>
059     * Using NIO Path:
060     * </p>
061     *
062     * <pre>{@code
063     * BufferedFileChannelInputStream s = BufferedFileChannelInputStream.builder()
064     *   .setPath(path)
065     *   .setBufferSize(4096)
066     *   .get();}
067     * </pre>
068     *
069     * @since 2.12.0
070     */
071    public static class Builder extends AbstractStreamBuilder<BufferedFileChannelInputStream, Builder> {
072
073        /**
074         * Constructs a new instance.
075         * <p>
076         * This builder use the aspects Path and buffer size.
077         * </p>
078         * <p>
079         * You must provide an origin that can be converted to a Path by this builder, otherwise, this call will throw an
080         * {@link UnsupportedOperationException}.
081         * </p>
082         *
083         * @return a new instance.
084         * @throws UnsupportedOperationException if the origin cannot provide a Path.
085         * @see AbstractOrigin#getPath()
086         */
087        @Override
088        public BufferedFileChannelInputStream get() throws IOException {
089            return new BufferedFileChannelInputStream(getPath(), getBufferSize());
090        }
091
092    }
093
094    /**
095     * Constructs a new {@link Builder}.
096     *
097     * @return a new {@link Builder}.
098     * @since 2.12.0
099     */
100    public static Builder builder() {
101        return new Builder();
102    }
103
104    private final ByteBuffer byteBuffer;
105
106    private final FileChannel fileChannel;
107
108    /**
109     * Constructs a new instance for the given File.
110     *
111     * @param file The file to stream.
112     * @throws IOException If an I/O error occurs
113     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
114     */
115    @Deprecated
116    public BufferedFileChannelInputStream(final File file) throws IOException {
117        this(file, IOUtils.DEFAULT_BUFFER_SIZE);
118    }
119
120    /**
121     * Constructs a new instance for the given File and buffer size.
122     *
123     * @param file       The file to stream.
124     * @param bufferSize buffer size.
125     * @throws IOException If an I/O error occurs
126     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
127     */
128    @Deprecated
129    public BufferedFileChannelInputStream(final File file, final int bufferSize) throws IOException {
130        this(file.toPath(), bufferSize);
131    }
132
133    /**
134     * Constructs a new instance for the given Path.
135     *
136     * @param path The path to stream.
137     * @throws IOException If an I/O error occurs
138     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
139     */
140    @Deprecated
141    public BufferedFileChannelInputStream(final Path path) throws IOException {
142        this(path, IOUtils.DEFAULT_BUFFER_SIZE);
143    }
144
145    /**
146     * Constructs a new instance for the given Path and buffer size.
147     *
148     * @param path       The path to stream.
149     * @param bufferSize buffer size.
150     * @throws IOException If an I/O error occurs
151     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}
152     */
153    @Deprecated
154    public BufferedFileChannelInputStream(final Path path, final int bufferSize) throws IOException {
155        Objects.requireNonNull(path, "path");
156        fileChannel = FileChannel.open(path, StandardOpenOption.READ);
157        byteBuffer = ByteBuffer.allocateDirect(bufferSize);
158        byteBuffer.flip();
159    }
160
161    @Override
162    public synchronized int available() throws IOException {
163        return byteBuffer.remaining();
164    }
165
166    /**
167     * Attempts to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun API that will cause errors if one attempts to read from the
168     * disposed buffer. However, neither the bytes allocated to direct buffers nor file descriptors opened for memory-mapped buffers put pressure on the garbage
169     * collector. Waiting for garbage collection may lead to the depletion of off-heap memory or huge numbers of open files. There's unfortunately no standard
170     * API to manually dispose of these kinds of buffers.
171     *
172     * @param buffer the buffer to clean.
173     */
174    private void clean(final ByteBuffer buffer) {
175        if (buffer.isDirect()) {
176            cleanDirectBuffer(buffer);
177        }
178    }
179
180    /**
181     * In Java 8, the type of {@code sun.nio.ch.DirectBuffer.cleaner()} was {@code sun.misc.Cleaner}, and it was possible to access the method
182     * {@code sun.misc.Cleaner.clean()} to invoke it. The type changed to {@code jdk.internal.ref.Cleaner} in later JDKs, and the {@code clean()} method is not
183     * accessible even with reflection. However {@code sun.misc.Unsafe} added an {@code invokeCleaner()} method in JDK 9+ and this is still accessible with
184     * reflection.
185     *
186     * @param buffer the buffer to clean. must be a DirectBuffer.
187     */
188    private void cleanDirectBuffer(final ByteBuffer buffer) {
189        if (ByteBufferCleaner.isSupported()) {
190            ByteBufferCleaner.clean(buffer);
191        }
192    }
193
194    @Override
195    public synchronized void close() throws IOException {
196        try {
197            fileChannel.close();
198        } finally {
199            clean(byteBuffer);
200        }
201    }
202
203    @Override
204    public synchronized int read() throws IOException {
205        if (!refill()) {
206            return EOF;
207        }
208        return byteBuffer.get() & 0xFF;
209    }
210
211    @Override
212    public synchronized int read(final byte[] b, final int offset, int len) throws IOException {
213        if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) {
214            throw new IndexOutOfBoundsException();
215        }
216        if (!refill()) {
217            return EOF;
218        }
219        len = Math.min(len, byteBuffer.remaining());
220        byteBuffer.get(b, offset, len);
221        return len;
222    }
223
224    /**
225     * Checks whether data is left to be read from the input stream.
226     *
227     * @return true if data is left, false otherwise
228     * @throws IOException if an I/O error occurs.
229     */
230    private boolean refill() throws IOException {
231        if (!byteBuffer.hasRemaining()) {
232            byteBuffer.clear();
233            int nRead = 0;
234            while (nRead == 0) {
235                nRead = fileChannel.read(byteBuffer);
236            }
237            byteBuffer.flip();
238            return nRead >= 0;
239        }
240        return true;
241    }
242
243    @Override
244    public synchronized long skip(final long n) throws IOException {
245        if (n <= 0L) {
246            return 0L;
247        }
248        if (byteBuffer.remaining() >= n) {
249            // The buffered content is enough to skip
250            byteBuffer.position(byteBuffer.position() + (int) n);
251            return n;
252        }
253        final long skippedFromBuffer = byteBuffer.remaining();
254        final long toSkipFromFileChannel = n - skippedFromBuffer;
255        // Discard everything we have read in the buffer.
256        byteBuffer.position(0);
257        byteBuffer.flip();
258        return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel);
259    }
260
261    private long skipFromFileChannel(final long n) throws IOException {
262        final long currentFilePosition = fileChannel.position();
263        final long size = fileChannel.size();
264        if (n > size - currentFilePosition) {
265            fileChannel.position(size);
266            return size - currentFilePosition;
267        }
268        fileChannel.position(currentFilePosition + n);
269        return n;
270    }
271
272}