001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one or more
003 *  contributor license agreements.  See the NOTICE file distributed with
004 *  this work for additional information regarding copyright ownership.
005 *  The ASF licenses this file to You under the Apache License, Version 2.0
006 *  (the "License"); you may not use this file except in compliance with
007 *  the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software
012 *  distributed under the License is distributed on an "AS IS" BASIS,
013 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 *  See the License for the specific language governing permissions and
015 *  limitations under the License.
016 *
017 */
018package org.apache.commons.compress.archivers.zip;
019
020import java.io.Closeable;
021import java.io.DataOutput;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import java.nio.ByteBuffer;
026import java.nio.channels.SeekableByteChannel;
027import java.util.zip.CRC32;
028import java.util.zip.Deflater;
029import java.util.zip.ZipEntry;
030
031import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
032
033/**
034 * Encapsulates a {@link Deflater} and crc calculator, handling multiple types of output streams.
035 * Currently {@link java.util.zip.ZipEntry#DEFLATED} and {@link java.util.zip.ZipEntry#STORED} are the only
036 * supported compression methods.
037 *
038 * @since 1.10
039 */
040public abstract class StreamCompressor implements Closeable {
041
042    private static final class DataOutputCompressor extends StreamCompressor {
043        private final DataOutput raf;
044
045        public DataOutputCompressor(final Deflater deflater, final DataOutput raf) {
046            super(deflater);
047            this.raf = raf;
048        }
049
050        @Override
051        protected void writeOut(final byte[] data, final int offset, final int length)
052                throws IOException {
053            raf.write(data, offset, length);
054        }
055    }
056
057    private static final class OutputStreamCompressor extends StreamCompressor {
058        private final OutputStream os;
059
060        public OutputStreamCompressor(final Deflater deflater, final OutputStream os) {
061            super(deflater);
062            this.os = os;
063        }
064
065        @Override
066        protected void writeOut(final byte[] data, final int offset, final int length)
067                throws IOException {
068            os.write(data, offset, length);
069        }
070    }
071
072    private static final class ScatterGatherBackingStoreCompressor extends StreamCompressor {
073        private final ScatterGatherBackingStore bs;
074
075        public ScatterGatherBackingStoreCompressor(final Deflater deflater, final ScatterGatherBackingStore bs) {
076            super(deflater);
077            this.bs = bs;
078        }
079
080        @Override
081        protected void writeOut(final byte[] data, final int offset, final int length)
082                throws IOException {
083            bs.writeOut(data, offset, length);
084        }
085    }
086
087    private static final class SeekableByteChannelCompressor extends StreamCompressor {
088        private final SeekableByteChannel channel;
089
090        public SeekableByteChannelCompressor(final Deflater deflater,
091                                             final SeekableByteChannel channel) {
092            super(deflater);
093            this.channel = channel;
094        }
095
096        @Override
097        protected void writeOut(final byte[] data, final int offset, final int length)
098                throws IOException {
099            channel.write(ByteBuffer.wrap(data, offset, length));
100        }
101    }
102    /*
103     * Apparently Deflater.setInput gets slowed down a lot on Sun JVMs
104     * when it gets handed a really big buffer.  See
105     * https://issues.apache.org/bugzilla/show_bug.cgi?id=45396
106     *
107     * Using a buffer size of 8 kB proved to be a good compromise
108     */
109    private static final int DEFLATER_BLOCK_SIZE = 8192;
110    private static final int BUFFER_SIZE = 4096;
111
112    /**
113     * Create a stream compressor with the given compression level.
114     *
115     * @param os       The DataOutput to receive output
116     * @param deflater The deflater to use for the compressor
117     * @return A stream compressor
118     */
119    static StreamCompressor create(final DataOutput os, final Deflater deflater) {
120        return new DataOutputCompressor(deflater, os);
121    }
122    /**
123     * Create a stream compressor with the given compression level.
124     *
125     * @param compressionLevel The {@link Deflater}  compression level
126     * @param bs               The ScatterGatherBackingStore to receive output
127     * @return A stream compressor
128     */
129    public static StreamCompressor create(final int compressionLevel, final ScatterGatherBackingStore bs) {
130        final Deflater deflater = new Deflater(compressionLevel, true);
131        return new ScatterGatherBackingStoreCompressor(deflater, bs);
132    }
133    /**
134     * Create a stream compressor with the default compression level.
135     *
136     * @param os The stream to receive output
137     * @return A stream compressor
138     */
139    static StreamCompressor create(final OutputStream os) {
140        return create(os, new Deflater(Deflater.DEFAULT_COMPRESSION, true));
141    }
142
143    /**
144     * Create a stream compressor with the given compression level.
145     *
146     * @param os       The stream to receive output
147     * @param deflater The deflater to use
148     * @return A stream compressor
149     */
150    static StreamCompressor create(final OutputStream os, final Deflater deflater) {
151        return new OutputStreamCompressor(deflater, os);
152    }
153
154    /**
155     * Create a stream compressor with the default compression level.
156     *
157     * @param bs The ScatterGatherBackingStore to receive output
158     * @return A stream compressor
159     */
160    public static StreamCompressor create(final ScatterGatherBackingStore bs) {
161        return create(Deflater.DEFAULT_COMPRESSION, bs);
162    }
163
164    /**
165     * Create a stream compressor with the given compression level.
166     *
167     * @param os       The SeekableByteChannel to receive output
168     * @param deflater The deflater to use for the compressor
169     * @return A stream compressor
170     * @since 1.13
171     */
172    static StreamCompressor create(final SeekableByteChannel os, final Deflater deflater) {
173        return new SeekableByteChannelCompressor(deflater, os);
174    }
175
176    private final Deflater def;
177
178    private final CRC32 crc = new CRC32();
179
180    private long writtenToOutputStreamForLastEntry;
181
182    private long sourcePayloadLength;
183
184    private long totalWrittenToOutputStream;
185
186    private final byte[] outputBuffer = new byte[BUFFER_SIZE];
187
188    private final byte[] readerBuf = new byte[BUFFER_SIZE];
189
190    StreamCompressor(final Deflater deflater) {
191        this.def = deflater;
192    }
193
194
195    @Override
196    public void close() throws IOException {
197        def.end();
198    }
199
200    void deflate() throws IOException {
201        final int len = def.deflate(outputBuffer, 0, outputBuffer.length);
202        if (len > 0) {
203            writeCounted(outputBuffer, 0, len);
204        }
205    }
206
207
208    /**
209     * Deflate the given source using the supplied compression method
210     *
211     * @param source The source to compress
212     * @param method The #ZipArchiveEntry compression method
213     * @throws IOException When failures happen
214     */
215
216    public void deflate(final InputStream source, final int method) throws IOException {
217        reset();
218        int length;
219
220        while ((length = source.read(readerBuf, 0, readerBuf.length)) >= 0) {
221            write(readerBuf, 0, length, method);
222        }
223        if (method == ZipEntry.DEFLATED) {
224            flushDeflater();
225        }
226    }
227
228    private void deflateUntilInputIsNeeded() throws IOException {
229        while (!def.needsInput()) {
230            deflate();
231        }
232    }
233
234    void flushDeflater() throws IOException {
235        def.finish();
236        while (!def.finished()) {
237            deflate();
238        }
239    }
240
241    /**
242     * Return the number of bytes read from the source stream
243     *
244     * @return The number of bytes read, never negative
245     */
246    public long getBytesRead() {
247        return sourcePayloadLength;
248    }
249
250    /**
251     * The number of bytes written to the output for the last entry
252     *
253     * @return The number of bytes, never negative
254     */
255    public long getBytesWrittenForLastEntry() {
256        return writtenToOutputStreamForLastEntry;
257    }
258
259    /**
260     * The crc32 of the last deflated file
261     *
262     * @return the crc32
263     */
264
265    public long getCrc32() {
266        return crc.getValue();
267    }
268
269    /**
270     * The total number of bytes written to the output for all files
271     *
272     * @return The number of bytes, never negative
273     */
274    public long getTotalBytesWritten() {
275        return totalWrittenToOutputStream;
276    }
277
278    void reset() {
279        crc.reset();
280        def.reset();
281        sourcePayloadLength = 0;
282        writtenToOutputStreamForLastEntry = 0;
283    }
284
285    /**
286     * Writes bytes to ZIP entry.
287     *
288     * @param b      the byte array to write
289     * @param offset the start position to write from
290     * @param length the number of bytes to write
291     * @param method the comrpession method to use
292     * @return the number of bytes written to the stream this time
293     * @throws IOException on error
294     */
295    long write(final byte[] b, final int offset, final int length, final int method) throws IOException {
296        final long current = writtenToOutputStreamForLastEntry;
297        crc.update(b, offset, length);
298        if (method == ZipEntry.DEFLATED) {
299            writeDeflated(b, offset, length);
300        } else {
301            writeCounted(b, offset, length);
302        }
303        sourcePayloadLength += length;
304        return writtenToOutputStreamForLastEntry - current;
305    }
306
307    public void writeCounted(final byte[] data) throws IOException {
308        writeCounted(data, 0, data.length);
309    }
310
311    public void writeCounted(final byte[] data, final int offset, final int length) throws IOException {
312        writeOut(data, offset, length);
313        writtenToOutputStreamForLastEntry += length;
314        totalWrittenToOutputStream += length;
315    }
316
317    private void writeDeflated(final byte[] b, final int offset, final int length)
318            throws IOException {
319        if (length > 0 && !def.finished()) {
320            if (length <= DEFLATER_BLOCK_SIZE) {
321                def.setInput(b, offset, length);
322                deflateUntilInputIsNeeded();
323            } else {
324                final int fullblocks = length / DEFLATER_BLOCK_SIZE;
325                for (int i = 0; i < fullblocks; i++) {
326                    def.setInput(b, offset + i * DEFLATER_BLOCK_SIZE,
327                            DEFLATER_BLOCK_SIZE);
328                    deflateUntilInputIsNeeded();
329                }
330                final int done = fullblocks * DEFLATER_BLOCK_SIZE;
331                if (done < length) {
332                    def.setInput(b, offset + done, length - done);
333                    deflateUntilInputIsNeeded();
334                }
335            }
336        }
337    }
338
339    protected abstract void writeOut(byte[] data, int offset, int length) throws IOException;
340}