001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 */ 018package org.apache.commons.compress.archivers.zip; 019 020import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore; 021import org.apache.commons.compress.parallel.InputStreamSupplier; 022import org.apache.commons.compress.parallel.ScatterGatherBackingStore; 023import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier; 024 025import java.io.File; 026import java.io.IOException; 027import java.util.Deque; 028import java.util.concurrent.Callable; 029import java.util.concurrent.ConcurrentLinkedDeque; 030import java.util.concurrent.ExecutionException; 031import java.util.concurrent.ExecutorService; 032import java.util.concurrent.Executors; 033import java.util.concurrent.Future; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.zip.Deflater; 037 038import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest; 039 040/** 041 * Creates a zip in parallel by using multiple threadlocal {@link ScatterZipOutputStream} instances. 042 * <p> 043 * Note that until 1.18, this class generally made no guarantees about the order of things written to 044 * the output file. Things that needed to come in a specific order (manifests, directories) 045 * had to be handled by the client of this class, usually by writing these things to the 046 * {@link ZipArchiveOutputStream} <em>before</em> calling {@link #writeTo writeTo} on this class.</p> 047 * <p> 048 * The client can supply an {@link java.util.concurrent.ExecutorService}, but for reasons of 049 * memory model consistency, this will be shut down by this class prior to completion. 050 * </p> 051 * @since 1.10 052 */ 053public class ParallelScatterZipCreator { 054 private final Deque<ScatterZipOutputStream> streams = new ConcurrentLinkedDeque<>(); 055 private final ExecutorService es; 056 private final ScatterGatherBackingStoreSupplier backingStoreSupplier; 057 private final Deque<Future<? extends ScatterZipOutputStream>> futures = new ConcurrentLinkedDeque<>(); 058 059 private final long startedAt = System.currentTimeMillis(); 060 private long compressionDoneAt; 061 private long scatterDoneAt; 062 private final int compressionLevel; 063 064 private static class DefaultBackingStoreSupplier implements ScatterGatherBackingStoreSupplier { 065 final AtomicInteger storeNum = new AtomicInteger(0); 066 067 @Override 068 public ScatterGatherBackingStore get() throws IOException { 069 final File tempFile = File.createTempFile("parallelscatter", "n" + storeNum.incrementAndGet()); 070 return new FileBasedScatterGatherBackingStore(tempFile); 071 } 072 } 073 074 private ScatterZipOutputStream createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) 075 throws IOException { 076 final ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get(); 077 // lifecycle is bound to the ScatterZipOutputStream returned 078 final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); //NOSONAR 079 return new ScatterZipOutputStream(bs, sc); 080 } 081 082 private final ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() { 083 @Override 084 protected ScatterZipOutputStream initialValue() { 085 try { 086 final ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier); 087 streams.add(scatterStream); 088 return scatterStream; 089 } catch (final IOException e) { 090 throw new RuntimeException(e); //NOSONAR 091 } 092 } 093 }; 094 095 /** 096 * Create a ParallelScatterZipCreator with default threads, which is set to the number of available 097 * processors, as defined by {@link java.lang.Runtime#availableProcessors} 098 */ 099 public ParallelScatterZipCreator() { 100 this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 101 } 102 103 /** 104 * Create a ParallelScatterZipCreator 105 * 106 * @param executorService The executorService to use for parallel scheduling. For technical reasons, 107 * this will be shut down by this class. 108 */ 109 public ParallelScatterZipCreator(final ExecutorService executorService) { 110 this(executorService, new DefaultBackingStoreSupplier()); 111 } 112 113 /** 114 * Create a ParallelScatterZipCreator 115 * 116 * @param executorService The executorService to use. For technical reasons, this will be shut down 117 * by this class. 118 * @param backingStoreSupplier The supplier of backing store which shall be used 119 */ 120 public ParallelScatterZipCreator(final ExecutorService executorService, 121 final ScatterGatherBackingStoreSupplier backingStoreSupplier) { 122 this(executorService, backingStoreSupplier, Deflater.DEFAULT_COMPRESSION); 123 } 124 125 /** 126 * Create a ParallelScatterZipCreator 127 * 128 * @param executorService The executorService to use. For technical reasons, this will be shut down 129 * by this class. 130 * @param backingStoreSupplier The supplier of backing store which shall be used 131 * @param compressionLevel The compression level used in compression, this value should be 132 * -1(default level) or between 0~9. 133 * @throws IllegalArgumentException if the compression level is illegal 134 * @since 1.21 135 */ 136 public ParallelScatterZipCreator(final ExecutorService executorService, 137 final ScatterGatherBackingStoreSupplier backingStoreSupplier, 138 final int compressionLevel) throws IllegalArgumentException { 139 if ((compressionLevel < Deflater.NO_COMPRESSION || compressionLevel > Deflater.BEST_COMPRESSION) 140 && compressionLevel != Deflater.DEFAULT_COMPRESSION) { 141 throw new IllegalArgumentException("Compression level is expected between -1~9"); 142 } 143 144 this.backingStoreSupplier = backingStoreSupplier; 145 es = executorService; 146 this.compressionLevel = compressionLevel; 147 } 148 149 /** 150 * Adds an archive entry to this archive. 151 * <p> 152 * This method is expected to be called from a single client thread 153 * </p> 154 * 155 * @param zipArchiveEntry The entry to add. 156 * @param source The source input stream supplier 157 */ 158 159 public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) { 160 submitStreamAwareCallable(createCallable(zipArchiveEntry, source)); 161 } 162 163 /** 164 * Adds an archive entry to this archive. 165 * <p> 166 * This method is expected to be called from a single client thread 167 * </p> 168 * 169 * @param zipArchiveEntryRequestSupplier Should supply the entry to be added. 170 * @since 1.13 171 */ 172 public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) { 173 submitStreamAwareCallable(createCallable(zipArchiveEntryRequestSupplier)); 174 } 175 176 /** 177 * Submit a callable for compression. 178 * 179 * @see ParallelScatterZipCreator#createCallable for details of if/when to use this. 180 * 181 * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller. 182 */ 183 public final void submit(final Callable<? extends Object> callable) { 184 submitStreamAwareCallable(() -> { 185 callable.call(); 186 return tlScatterStreams.get(); 187 }); 188 } 189 190 /** 191 * Submit a callable for compression. 192 * 193 * @see ParallelScatterZipCreator#createCallable for details of if/when to use this. 194 * 195 * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller. 196 * @since 1.19 197 */ 198 public final void submitStreamAwareCallable(final Callable<? extends ScatterZipOutputStream> callable) { 199 futures.add(es.submit(callable)); 200 } 201 202 /** 203 * Create a callable that will compress the given archive entry. 204 * 205 * <p>This method is expected to be called from a single client thread.</p> 206 * 207 * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submitStreamAwareCallable submitStreamAwareCallable}. 208 * The most common use case for using {@link #createCallable createCallable} and {@link #submitStreamAwareCallable submitStreamAwareCallable} from a 209 * client is if you want to wrap the callable in something that can be prioritized by the supplied 210 * {@link ExecutorService}, for instance to process large or slow files first. 211 * Since the creation of the {@link ExecutorService} is handled by the client, all of this is up to the client. 212 * 213 * @param zipArchiveEntry The entry to add. 214 * @param source The source input stream supplier 215 * @return A callable that should subsequently passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The 216 * value of this callable is not used, but any exceptions happening inside the compression 217 * will be propagated through the callable. 218 */ 219 220 public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry, 221 final InputStreamSupplier source) { 222 final int method = zipArchiveEntry.getMethod(); 223 if (method == ZipMethod.UNKNOWN_CODE) { 224 throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry); 225 } 226 final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source); 227 return () -> { 228 final ScatterZipOutputStream scatterStream = tlScatterStreams.get(); 229 scatterStream.addArchiveEntry(zipArchiveEntryRequest); 230 return scatterStream; 231 }; 232 } 233 234 /** 235 * Create a callable that will compress archive entry supplied by {@link ZipArchiveEntryRequestSupplier}. 236 * 237 * <p>This method is expected to be called from a single client thread.</p> 238 * 239 * The same as {@link #createCallable(ZipArchiveEntry, InputStreamSupplier)}, but the archive entry 240 * to be added is supplied by a {@link ZipArchiveEntryRequestSupplier}. 241 * 242 * @see #createCallable(ZipArchiveEntry, InputStreamSupplier) 243 * 244 * @param zipArchiveEntryRequestSupplier Should supply the entry to be added. 245 * @return A callable that should subsequently passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The 246 * value of this callable is not used, but any exceptions happening inside the compression 247 * will be propagated through the callable. 248 * @since 1.13 249 */ 250 public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) { 251 return () -> { 252 final ScatterZipOutputStream scatterStream = tlScatterStreams.get(); 253 scatterStream.addArchiveEntry(zipArchiveEntryRequestSupplier.get()); 254 return scatterStream; 255 }; 256 } 257 258 /** 259 * Write the contents this to the target {@link ZipArchiveOutputStream}. 260 * <p> 261 * It may be beneficial to write things like directories and manifest files to the targetStream 262 * before calling this method. 263 * </p> 264 * 265 * <p>Calling this method will shut down the {@link ExecutorService} used by this class. If any of the {@link 266 * Callable}s {@link #submitStreamAwareCallable submit}ted to this instance throws an exception, the archive can not be created properly and 267 * this method will throw an exception.</p> 268 * 269 * @param targetStream The {@link ZipArchiveOutputStream} to receive the contents of the scatter streams 270 * @throws IOException If writing fails 271 * @throws InterruptedException If we get interrupted 272 * @throws ExecutionException If something happens in the parallel execution 273 */ 274 public void writeTo(final ZipArchiveOutputStream targetStream) 275 throws IOException, InterruptedException, ExecutionException { 276 277 try { 278 // Make sure we catch any exceptions from parallel phase 279 try { 280 for (final Future<?> future : futures) { 281 future.get(); 282 } 283 } finally { 284 es.shutdown(); 285 } 286 287 es.awaitTermination(1000 * 60L, TimeUnit.SECONDS); // == Infinity. We really *must* wait for this to complete 288 289 // It is important that all threads terminate before we go on, ensure happens-before relationship 290 compressionDoneAt = System.currentTimeMillis(); 291 292 for (final Future<? extends ScatterZipOutputStream> future : futures) { 293 final ScatterZipOutputStream scatterStream = future.get(); 294 scatterStream.zipEntryWriter().writeNextZipEntry(targetStream); 295 } 296 297 for (final ScatterZipOutputStream scatterStream : streams) { 298 scatterStream.close(); 299 } 300 301 scatterDoneAt = System.currentTimeMillis(); 302 } finally { 303 closeAll(); 304 } 305 } 306 307 /** 308 * Returns a message describing the overall statistics of the compression run 309 * 310 * @return A string 311 */ 312 public ScatterStatistics getStatisticsMessage() { 313 return new ScatterStatistics(compressionDoneAt - startedAt, scatterDoneAt - compressionDoneAt); 314 } 315 316 private void closeAll() { 317 for (final ScatterZipOutputStream scatterStream : streams) { 318 try { 319 scatterStream.close(); 320 } catch (final IOException ex) { //NOSONAR 321 // no way to properly log this 322 } 323 } 324 } 325} 326