001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one or more
003 *  contributor license agreements.  See the NOTICE file distributed with
004 *  this work for additional information regarding copyright ownership.
005 *  The ASF licenses this file to You under the Apache License, Version 2.0
006 *  (the "License"); you may not use this file except in compliance with
007 *  the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software
012 *  distributed under the License is distributed on an "AS IS" BASIS,
013 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 *  See the License for the specific language governing permissions and
015 *  limitations under the License.
016 *
017 */
018package org.apache.commons.compress.archivers.zip;
019
020import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
021
022import java.io.IOException;
023import java.io.UncheckedIOException;
024import java.util.Deque;
025import java.util.concurrent.Callable;
026import java.util.concurrent.ConcurrentLinkedDeque;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.Future;
031import java.util.concurrent.TimeUnit;
032import java.util.zip.Deflater;
033
034import org.apache.commons.compress.parallel.InputStreamSupplier;
035import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
036import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
037
038/**
039 * Creates a ZIP in parallel by using multiple threadlocal {@link ScatterZipOutputStream} instances.
040 * <p>
041 * Note that until 1.18, this class generally made no guarantees about the order of things written to the output file. Things that needed to come in a specific
042 * order (manifests, directories) had to be handled by the client of this class, usually by writing these things to the {@link ZipArchiveOutputStream}
043 * <em>before</em> calling {@link #writeTo writeTo} on this class.
044 * </p>
045 * <p>
046 * The client can supply an {@link java.util.concurrent.ExecutorService}, but for reasons of memory model consistency, this will be shut down by this class
047 * prior to completion.
048 * </p>
049 *
050 * @since 1.10
051 */
052public class ParallelScatterZipCreator {
053
054    private final Deque<ScatterZipOutputStream> streams = new ConcurrentLinkedDeque<>();
055    private final ExecutorService executorService;
056    private final ScatterGatherBackingStoreSupplier backingStoreSupplier;
057
058    private final Deque<Future<? extends ScatterZipOutputStream>> futures = new ConcurrentLinkedDeque<>();
059    private final long startedAt = System.currentTimeMillis();
060    private long compressionDoneAt;
061    private long scatterDoneAt;
062
063    private final int compressionLevel;
064
065    private final ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() {
066        @Override
067        protected ScatterZipOutputStream initialValue() {
068            try {
069                final ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier);
070                streams.add(scatterStream);
071                return scatterStream;
072            } catch (final IOException e) {
073                throw new UncheckedIOException(e); //NOSONAR
074            }
075        }
076    };
077
078    /**
079     * Constructs a ParallelScatterZipCreator with default threads, which is set to the number of available
080     * processors, as defined by {@link java.lang.Runtime#availableProcessors}
081     */
082    public ParallelScatterZipCreator() {
083        this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
084    }
085
086    /**
087     * Constructs a ParallelScatterZipCreator
088     *
089     * @param executorService The executorService to use for parallel scheduling. For technical reasons,
090     *                        this will be shut down by this class.
091     */
092    public ParallelScatterZipCreator(final ExecutorService executorService) {
093        this(executorService, new DefaultBackingStoreSupplier(null));
094    }
095
096    /**
097     * Constructs a ParallelScatterZipCreator
098     *
099     * @param executorService The executorService to use. For technical reasons, this will be shut down
100     *                        by this class.
101     * @param backingStoreSupplier The supplier of backing store which shall be used
102     */
103    public ParallelScatterZipCreator(final ExecutorService executorService,
104                                     final ScatterGatherBackingStoreSupplier backingStoreSupplier) {
105        this(executorService, backingStoreSupplier, Deflater.DEFAULT_COMPRESSION);
106    }
107
108    /**
109     * Constructs a ParallelScatterZipCreator
110     *
111     * @param executorService      The executorService to use. For technical reasons, this will be shut down
112     *                             by this class.
113     * @param backingStoreSupplier The supplier of backing store which shall be used
114     * @param compressionLevel     The compression level used in compression, this value should be
115     *                             -1(default level) or between 0~9.
116     * @throws IllegalArgumentException if the compression level is illegal
117     * @since 1.21
118     */
119    public ParallelScatterZipCreator(final ExecutorService executorService,
120                                     final ScatterGatherBackingStoreSupplier backingStoreSupplier,
121                                     final int compressionLevel) throws IllegalArgumentException {
122        if ((compressionLevel < Deflater.NO_COMPRESSION || compressionLevel > Deflater.BEST_COMPRESSION)
123                && compressionLevel != Deflater.DEFAULT_COMPRESSION) {
124            throw new IllegalArgumentException("Compression level is expected between -1~9");
125        }
126
127        this.backingStoreSupplier = backingStoreSupplier;
128        this.executorService = executorService;
129        this.compressionLevel = compressionLevel;
130    }
131
132    /**
133     * Adds an archive entry to this archive.
134     * <p>
135     * This method is expected to be called from a single client thread
136     * </p>
137     *
138     * @param zipArchiveEntry The entry to add.
139     * @param source          The source input stream supplier
140     */
141
142    public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
143        submitStreamAwareCallable(createCallable(zipArchiveEntry, source));
144    }
145
146    /**
147     * Adds an archive entry to this archive.
148     * <p>
149     * This method is expected to be called from a single client thread
150     * </p>
151     *
152     * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
153     * @since 1.13
154     */
155    public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
156        submitStreamAwareCallable(createCallable(zipArchiveEntryRequestSupplier));
157    }
158
159    private void closeAll() {
160        for (final ScatterZipOutputStream scatterStream : streams) {
161            try {
162                scatterStream.close();
163            } catch (final IOException ex) { //NOSONAR
164                // no way to properly log this
165            }
166        }
167    }
168
169    /**
170     * Creates a callable that will compress the given archive entry.
171     *
172     * <p>This method is expected to be called from a single client thread.</p>
173     *
174     * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submitStreamAwareCallable submitStreamAwareCallable}.
175     * The most common use case for using {@link #createCallable createCallable} and {@link #submitStreamAwareCallable submitStreamAwareCallable} from a
176     * client is if you want to wrap the callable in something that can be prioritized by the supplied
177     * {@link ExecutorService}, for instance to process large or slow files first.
178     * Since the creation of the {@link ExecutorService} is handled by the client, all of this is up to the client.
179     *
180     * @param zipArchiveEntry The entry to add.
181     * @param source          The source input stream supplier
182     * @return A callable that should subsequently passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The
183     * value of this callable is not used, but any exceptions happening inside the compression
184     * will be propagated through the callable.
185     */
186
187    public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry,
188        final InputStreamSupplier source) {
189        final int method = zipArchiveEntry.getMethod();
190        if (method == ZipMethod.UNKNOWN_CODE) {
191            throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry);
192        }
193        final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source);
194        return () -> {
195            final ScatterZipOutputStream scatterStream = tlScatterStreams.get();
196            scatterStream.addArchiveEntry(zipArchiveEntryRequest);
197            return scatterStream;
198        };
199    }
200
201    /**
202     * Creates a callable that will compress archive entry supplied by {@link ZipArchiveEntryRequestSupplier}.
203     *
204     * <p>This method is expected to be called from a single client thread.</p>
205     *
206     * The same as {@link #createCallable(ZipArchiveEntry, InputStreamSupplier)}, but the archive entry
207     * to be added is supplied by a {@link ZipArchiveEntryRequestSupplier}.
208     *
209     * @see #createCallable(ZipArchiveEntry, InputStreamSupplier)
210     *
211     * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
212     * @return A callable that should subsequently passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The
213     * value of this callable is not used, but any exceptions happening inside the compression
214     * will be propagated through the callable.
215     * @since 1.13
216     */
217    public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
218        return () -> {
219            final ScatterZipOutputStream scatterStream = tlScatterStreams.get();
220            scatterStream.addArchiveEntry(zipArchiveEntryRequestSupplier.get());
221            return scatterStream;
222        };
223    }
224
225    private ScatterZipOutputStream createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier)
226            throws IOException {
227        final ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
228        // lifecycle is bound to the ScatterZipOutputStream returned
229        final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); //NOSONAR
230        return new ScatterZipOutputStream(bs, sc);
231    }
232
233    /**
234     * Gets a message describing the overall statistics of the compression run
235     *
236     * @return A string
237     */
238    public ScatterStatistics getStatisticsMessage() {
239        return new ScatterStatistics(compressionDoneAt - startedAt, scatterDoneAt - compressionDoneAt);
240    }
241
242    /**
243     * Submits a callable for compression.
244     *
245     * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
246     *
247     * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
248     */
249    public final void submit(final Callable<? extends Object> callable) {
250        submitStreamAwareCallable(() -> {
251            callable.call();
252            return tlScatterStreams.get();
253        });
254    }
255
256    /**
257     * Submits a callable for compression.
258     *
259     * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
260     *
261     * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
262     * @since 1.19
263     */
264    public final void submitStreamAwareCallable(final Callable<? extends ScatterZipOutputStream> callable) {
265        futures.add(executorService.submit(callable));
266    }
267
268    /**
269     * Writes the contents this to the target {@link ZipArchiveOutputStream}.
270     * <p>
271     * It may be beneficial to write things like directories and manifest files to the targetStream before calling this method.
272     * </p>
273     * <p>
274     * Calling this method will shut down the {@link ExecutorService} used by this class. If any of the {@link Callable}s {@link #submitStreamAwareCallable
275     * submit}ted to this instance throws an exception, the archive can not be created properly and this method will throw an exception.
276     * </p>
277     *
278     * @param targetStream The {@link ZipArchiveOutputStream} to receive the contents of the scatter streams
279     * @throws IOException          If writing fails
280     * @throws InterruptedException If we get interrupted
281     * @throws ExecutionException   If something happens in the parallel execution
282     */
283    public void writeTo(final ZipArchiveOutputStream targetStream)
284            throws IOException, InterruptedException, ExecutionException {
285
286        try {
287            // Make sure we catch any exceptions from parallel phase
288            try {
289                for (final Future<?> future : futures) {
290                    future.get();
291                }
292            } finally {
293                executorService.shutdown();
294            }
295
296            executorService.awaitTermination(1000 * 60L, TimeUnit.SECONDS);  // == Infinity. We really *must* wait for this to complete
297
298            // It is important that all threads terminate before we go on, ensure happens-before relationship
299            compressionDoneAt = System.currentTimeMillis();
300
301            for (final Future<? extends ScatterZipOutputStream> future : futures) {
302                final ScatterZipOutputStream scatterStream = future.get();
303                scatterStream.zipEntryWriter().writeNextZipEntry(targetStream);
304            }
305
306            for (final ScatterZipOutputStream scatterStream : streams) {
307                scatterStream.close();
308            }
309
310            scatterDoneAt = System.currentTimeMillis();
311        } finally {
312            closeAll();
313        }
314    }
315}
316