Есть вот такой вот участок кода:
BlockingQueue<String> queue = new ArrayBlockingQueue(links.size());
for (String link : links) {
queue.add(link);
}
ExecutorService executorService = Executors.newCachedThreadPool();
int threadsNumber = Integer.valueOf(arguments.get("-n"));
for (int i = 0; i < threadsNumber; i++) {
executorService.submit(new Consumer(queue, arguments.get("-o"), Integer.valueOf(arguments.get("-l"))));
}
while (!queue.isEmpty()) {
Thread.sleep(1000);
}
executorService.shutdownNow();
System.out.println("Все файлы были успешно скачены");
Последние строчки означают, что я собираюсь закрыть пул с потоками когда все сообщения из очереди будут прочитаны. Все отлично, но возникает проблема в момент, когда один из потоков забирает последнее сообщение из очереди. Условие !queue.isEmpty() перестает выполняться и программа завершается до того как это сообщение обработалось.
Как мне сделать так, чтобы программа завершалась только после обработки всех сообщений в очереде ?
Метод run в Consumer
@Override
public void run() {
try {
while (true) {
if (!queue.isEmpty()) {
String stringURL = queue.take();
String inputFileName = Paths.get(stringURL).getFileName().toString();
String outputFileName = new File(new File(storageFilesDirectory), inputFileName).toString();
Downloader.download(stringURL, outputFileName, speedLimit);
System.out.println(String.format("Файл %s был успешно скачен", inputFileName));
}
Thread.sleep(500);
}
} catch (InterruptedException e) {
} catch (FileCouldNotBeDownloaded e) {
System.out.println(String.format("Произошла ошибка скачивания файла."));
}
}
В пуле у меня всего два потока, а сообщений в очереди может быть неограничено.
Если нужно выполнить N заранее известных задач в K потоков, то проще использовать Executor с фиксированным числом потоков и скормить ему N Runnable выполняющих одну задачу:
final int NUMBER_OF_WORKERS = 2;
List<String> urls = IntStream.range( 0, 20 ).mapToObj( String::valueOf ).collect( Collectors.toList() );
ExecutorService executor = Executors.newFixedThreadPool( NUMBER_OF_WORKERS );
for ( String url : urls ) {
executor.submit( () -> { // new SingleUrlDownloadTask( url ) etc...
System.out.printf( "Worker [%s] downloading url: %s%n", Thread.currentThread().getName(), url );
try {
Thread.sleep( 300 + ThreadLocalRandom.current().nextInt(400) );
} catch (Exception e) {}
System.out.printf( "Worker [%s] completed url: %s%n", Thread.currentThread().getName(), url );
});
}
System.out.println( "All tasks queued." );
executor.shutdown(); // пул перестает принимать новые задачи,
// уже поставленные в очередь задачи будут выполнены рано или поздно
executor.awaitTermination( Long.MAX_VALUE, TimeUnit.NANOSECONDS );
System.out.println( "All tasks done.");
Обычно пулы потоков завершаются только при завершении самого приложения: автор приложения знает, что у него есть некоторая фоновая обработка задач, пул слушателей базы данных, поэтому можно их один раз создать и не беспокоиться о них - в худшем случае потоки просто будут простаивать, занимая какое-то небольшое количество оперативки. Если вы оставите сам пул потоков живым до полного завершения приложения, на работе самого приложения это не скажется, а вот работать с ним будет легче, потому что не придется иметь в виду его жизненный цикл.
В вашем случае вы ищете не момент завершения пула потоков, а некоторый синхронизатор, который вам скажет о том, что все потребители отработали. Проще всего завести CountDownLatch, инициализировав его с помощью размера очереди, и снимая по единице при каждой обработке сущности из очереди - в этом случае вы можете с помощью него дождаться конца обработки. Можно обойтись и более низкоуровневыми примитивами (никогда с ними не работал, поэтому могу ошибиться):
public class Consumer implements Runnable {
private final BlockingQueue<String> queue;
private final Object synchronizer;
public Consumer(BlockingQueue<String> queue, Object synchronizer) {
this.queue = queue;
this.synchronizer = synchronizer;
}
public void run() {
String value;
while ((value = queue.poll(0, TimeUnit.Milliseconds)) !== null) {
// обработка
}
// в эту область код попадет только тогда, когда в очереди кончатся элементы
synchronizer.notify();
}
}
// код главного потока
for (Consumer consumer : consumers) {
executor.submit(consumer);
}
synchronizer.wait();
Используйте java.util.concurrent.ThreadPoolExecutor, он гибче в использовании.
Пример для вашей задачи:
ThreadPoolExecutor poolExecutor =
new ThreadPoolExecutor(4, 4, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
for (int i = 0; i < 1_000_000; i++)
poolExecutor.execute(()->{
//задача выполняемая в другом потоке
});
while (poolExecutor.getActiveCount() != 0)
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
poolExecutor.shutdownNow();
Продвижение своими сайтами как стратегия роста и независимости