001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 */ 018package org.apache.commons.compress.archivers.zip; 019 020import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest; 021 022import java.io.IOException; 023import java.io.UncheckedIOException; 024import java.util.Deque; 025import java.util.concurrent.Callable; 026import java.util.concurrent.ConcurrentLinkedDeque; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.Future; 031import java.util.concurrent.TimeUnit; 032import java.util.zip.Deflater; 033 034import org.apache.commons.compress.parallel.InputStreamSupplier; 035import org.apache.commons.compress.parallel.ScatterGatherBackingStore; 036import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier; 037 038/** 039 * Creates a ZIP in parallel by using multiple threadlocal {@link ScatterZipOutputStream} instances. 040 * <p> 041 * Note that until 1.18, this class generally made no guarantees about the order of things written to the output file. Things that needed to come in a specific 042 * order (manifests, directories) had to be handled by the client of this class, usually by writing these things to the {@link ZipArchiveOutputStream} 043 * <em>before</em> calling {@link #writeTo writeTo} on this class. 044 * </p> 045 * <p> 046 * The client can supply an {@link java.util.concurrent.ExecutorService}, but for reasons of memory model consistency, this will be shut down by this class 047 * prior to completion. 048 * </p> 049 * 050 * @since 1.10 051 */ 052public class ParallelScatterZipCreator { 053 054 private final Deque<ScatterZipOutputStream> streams = new ConcurrentLinkedDeque<>(); 055 private final ExecutorService executorService; 056 private final ScatterGatherBackingStoreSupplier backingStoreSupplier; 057 058 private final Deque<Future<? extends ScatterZipOutputStream>> futures = new ConcurrentLinkedDeque<>(); 059 private final long startedAt = System.currentTimeMillis(); 060 private long compressionDoneAt; 061 private long scatterDoneAt; 062 063 private final int compressionLevel; 064 065 private final ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() { 066 @Override 067 protected ScatterZipOutputStream initialValue() { 068 try { 069 final ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier); 070 streams.add(scatterStream); 071 return scatterStream; 072 } catch (final IOException e) { 073 throw new UncheckedIOException(e); //NOSONAR 074 } 075 } 076 }; 077 078 /** 079 * Constructs a ParallelScatterZipCreator with default threads, which is set to the number of available 080 * processors, as defined by {@link java.lang.Runtime#availableProcessors} 081 */ 082 public ParallelScatterZipCreator() { 083 this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 084 } 085 086 /** 087 * Constructs a ParallelScatterZipCreator 088 * 089 * @param executorService The executorService to use for parallel scheduling. For technical reasons, 090 * this will be shut down by this class. 091 */ 092 public ParallelScatterZipCreator(final ExecutorService executorService) { 093 this(executorService, new DefaultBackingStoreSupplier(null)); 094 } 095 096 /** 097 * Constructs a ParallelScatterZipCreator 098 * 099 * @param executorService The executorService to use. For technical reasons, this will be shut down 100 * by this class. 101 * @param backingStoreSupplier The supplier of backing store which shall be used 102 */ 103 public ParallelScatterZipCreator(final ExecutorService executorService, 104 final ScatterGatherBackingStoreSupplier backingStoreSupplier) { 105 this(executorService, backingStoreSupplier, Deflater.DEFAULT_COMPRESSION); 106 } 107 108 /** 109 * Constructs a ParallelScatterZipCreator 110 * 111 * @param executorService The executorService to use. For technical reasons, this will be shut down 112 * by this class. 113 * @param backingStoreSupplier The supplier of backing store which shall be used 114 * @param compressionLevel The compression level used in compression, this value should be 115 * -1(default level) or between 0~9. 116 * @throws IllegalArgumentException if the compression level is illegal 117 * @since 1.21 118 */ 119 public ParallelScatterZipCreator(final ExecutorService executorService, 120 final ScatterGatherBackingStoreSupplier backingStoreSupplier, 121 final int compressionLevel) throws IllegalArgumentException { 122 if ((compressionLevel < Deflater.NO_COMPRESSION || compressionLevel > Deflater.BEST_COMPRESSION) 123 && compressionLevel != Deflater.DEFAULT_COMPRESSION) { 124 throw new IllegalArgumentException("Compression level is expected between -1~9"); 125 } 126 127 this.backingStoreSupplier = backingStoreSupplier; 128 this.executorService = executorService; 129 this.compressionLevel = compressionLevel; 130 } 131 132 /** 133 * Adds an archive entry to this archive. 134 * <p> 135 * This method is expected to be called from a single client thread 136 * </p> 137 * 138 * @param zipArchiveEntry The entry to add. 139 * @param source The source input stream supplier 140 */ 141 142 public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) { 143 submitStreamAwareCallable(createCallable(zipArchiveEntry, source)); 144 } 145 146 /** 147 * Adds an archive entry to this archive. 148 * <p> 149 * This method is expected to be called from a single client thread 150 * </p> 151 * 152 * @param zipArchiveEntryRequestSupplier Should supply the entry to be added. 153 * @since 1.13 154 */ 155 public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) { 156 submitStreamAwareCallable(createCallable(zipArchiveEntryRequestSupplier)); 157 } 158 159 private void closeAll() { 160 for (final ScatterZipOutputStream scatterStream : streams) { 161 try { 162 scatterStream.close(); 163 } catch (final IOException ex) { //NOSONAR 164 // no way to properly log this 165 } 166 } 167 } 168 169 /** 170 * Creates a callable that will compress the given archive entry. 171 * 172 * <p>This method is expected to be called from a single client thread.</p> 173 * 174 * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submitStreamAwareCallable submitStreamAwareCallable}. 175 * The most common use case for using {@link #createCallable createCallable} and {@link #submitStreamAwareCallable submitStreamAwareCallable} from a 176 * client is if you want to wrap the callable in something that can be prioritized by the supplied 177 * {@link ExecutorService}, for instance to process large or slow files first. 178 * Since the creation of the {@link ExecutorService} is handled by the client, all of this is up to the client. 179 * 180 * @param zipArchiveEntry The entry to add. 181 * @param source The source input stream supplier 182 * @return A callable that should subsequently passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The 183 * value of this callable is not used, but any exceptions happening inside the compression 184 * will be propagated through the callable. 185 */ 186 187 public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry, 188 final InputStreamSupplier source) { 189 final int method = zipArchiveEntry.getMethod(); 190 if (method == ZipMethod.UNKNOWN_CODE) { 191 throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry); 192 } 193 final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source); 194 return () -> { 195 final ScatterZipOutputStream scatterStream = tlScatterStreams.get(); 196 scatterStream.addArchiveEntry(zipArchiveEntryRequest); 197 return scatterStream; 198 }; 199 } 200 201 /** 202 * Creates a callable that will compress archive entry supplied by {@link ZipArchiveEntryRequestSupplier}. 203 * 204 * <p>This method is expected to be called from a single client thread.</p> 205 * 206 * The same as {@link #createCallable(ZipArchiveEntry, InputStreamSupplier)}, but the archive entry 207 * to be added is supplied by a {@link ZipArchiveEntryRequestSupplier}. 208 * 209 * @see #createCallable(ZipArchiveEntry, InputStreamSupplier) 210 * 211 * @param zipArchiveEntryRequestSupplier Should supply the entry to be added. 212 * @return A callable that should subsequently passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The 213 * value of this callable is not used, but any exceptions happening inside the compression 214 * will be propagated through the callable. 215 * @since 1.13 216 */ 217 public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) { 218 return () -> { 219 final ScatterZipOutputStream scatterStream = tlScatterStreams.get(); 220 scatterStream.addArchiveEntry(zipArchiveEntryRequestSupplier.get()); 221 return scatterStream; 222 }; 223 } 224 225 private ScatterZipOutputStream createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) 226 throws IOException { 227 final ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get(); 228 // lifecycle is bound to the ScatterZipOutputStream returned 229 final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); //NOSONAR 230 return new ScatterZipOutputStream(bs, sc); 231 } 232 233 /** 234 * Gets a message describing the overall statistics of the compression run 235 * 236 * @return A string 237 */ 238 public ScatterStatistics getStatisticsMessage() { 239 return new ScatterStatistics(compressionDoneAt - startedAt, scatterDoneAt - compressionDoneAt); 240 } 241 242 /** 243 * Submits a callable for compression. 244 * 245 * @see ParallelScatterZipCreator#createCallable for details of if/when to use this. 246 * 247 * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller. 248 */ 249 public final void submit(final Callable<? extends Object> callable) { 250 submitStreamAwareCallable(() -> { 251 callable.call(); 252 return tlScatterStreams.get(); 253 }); 254 } 255 256 /** 257 * Submits a callable for compression. 258 * 259 * @see ParallelScatterZipCreator#createCallable for details of if/when to use this. 260 * 261 * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller. 262 * @since 1.19 263 */ 264 public final void submitStreamAwareCallable(final Callable<? extends ScatterZipOutputStream> callable) { 265 futures.add(executorService.submit(callable)); 266 } 267 268 /** 269 * Writes the contents this to the target {@link ZipArchiveOutputStream}. 270 * <p> 271 * It may be beneficial to write things like directories and manifest files to the targetStream before calling this method. 272 * </p> 273 * <p> 274 * Calling this method will shut down the {@link ExecutorService} used by this class. If any of the {@link Callable}s {@link #submitStreamAwareCallable 275 * submit}ted to this instance throws an exception, the archive can not be created properly and this method will throw an exception. 276 * </p> 277 * 278 * @param targetStream The {@link ZipArchiveOutputStream} to receive the contents of the scatter streams 279 * @throws IOException If writing fails 280 * @throws InterruptedException If we get interrupted 281 * @throws ExecutionException If something happens in the parallel execution 282 */ 283 public void writeTo(final ZipArchiveOutputStream targetStream) 284 throws IOException, InterruptedException, ExecutionException { 285 286 try { 287 // Make sure we catch any exceptions from parallel phase 288 try { 289 for (final Future<?> future : futures) { 290 future.get(); 291 } 292 } finally { 293 executorService.shutdown(); 294 } 295 296 executorService.awaitTermination(1000 * 60L, TimeUnit.SECONDS); // == Infinity. We really *must* wait for this to complete 297 298 // It is important that all threads terminate before we go on, ensure happens-before relationship 299 compressionDoneAt = System.currentTimeMillis(); 300 301 for (final Future<? extends ScatterZipOutputStream> future : futures) { 302 final ScatterZipOutputStream scatterStream = future.get(); 303 scatterStream.zipEntryWriter().writeNextZipEntry(targetStream); 304 } 305 306 for (final ScatterZipOutputStream scatterStream : streams) { 307 scatterStream.close(); 308 } 309 310 scatterDoneAt = System.currentTimeMillis(); 311 } finally { 312 closeAll(); 313 } 314 } 315} 316