Условие закрытие пула потоков

359
23 декабря 2016, 12:47

Есть вот такой вот участок кода:

    BlockingQueue<String> queue = new ArrayBlockingQueue(links.size());
    for (String link : links) {
        queue.add(link);
    }
    ExecutorService executorService = Executors.newCachedThreadPool();
    int threadsNumber = Integer.valueOf(arguments.get("-n"));
    for (int i = 0; i < threadsNumber; i++) {
        executorService.submit(new Consumer(queue, arguments.get("-o"), Integer.valueOf(arguments.get("-l"))));
    }
    while (!queue.isEmpty()) {
        Thread.sleep(1000);
    }
    executorService.shutdownNow();
    System.out.println("Все файлы были успешно скачены");

Последние строчки означают, что я собираюсь закрыть пул с потоками когда все сообщения из очереди будут прочитаны. Все отлично, но возникает проблема в момент, когда один из потоков забирает последнее сообщение из очереди. Условие !queue.isEmpty() перестает выполняться и программа завершается до того как это сообщение обработалось.

Как мне сделать так, чтобы программа завершалась только после обработки всех сообщений в очереде ?

Метод run в Consumer

@Override
public void run() {
    try {
        while (true) {
            if (!queue.isEmpty()) {
                String stringURL = queue.take();
                String inputFileName = Paths.get(stringURL).getFileName().toString();
                String outputFileName = new File(new File(storageFilesDirectory), inputFileName).toString();
                Downloader.download(stringURL, outputFileName, speedLimit);
                System.out.println(String.format("Файл %s был успешно скачен", inputFileName));
            }
            Thread.sleep(500);
        }
    } catch (InterruptedException e) {
    } catch (FileCouldNotBeDownloaded e) {
        System.out.println(String.format("Произошла ошибка скачивания файла."));
    }
}

В пуле у меня всего два потока, а сообщений в очереди может быть неограничено.

Answer 1

Если нужно выполнить N заранее известных задач в K потоков, то проще использовать Executor с фиксированным числом потоков и скормить ему N Runnable выполняющих одну задачу:

final int NUMBER_OF_WORKERS = 2;
List<String> urls = IntStream.range( 0, 20 ).mapToObj( String::valueOf ).collect( Collectors.toList() );
ExecutorService executor = Executors.newFixedThreadPool( NUMBER_OF_WORKERS );
for ( String url : urls ) {
    executor.submit( () -> { // new SingleUrlDownloadTask( url ) etc...
        System.out.printf( "Worker [%s] downloading url: %s%n", Thread.currentThread().getName(), url );
        try {
            Thread.sleep( 300 + ThreadLocalRandom.current().nextInt(400) );
        } catch (Exception e) {}
        System.out.printf( "Worker [%s] completed url: %s%n", Thread.currentThread().getName(), url );
    });
}
System.out.println( "All tasks queued." );
executor.shutdown(); // пул перестает принимать новые задачи,
                     // уже поставленные в очередь задачи будут выполнены рано или поздно
executor.awaitTermination( Long.MAX_VALUE, TimeUnit.NANOSECONDS );
System.out.println( "All tasks done.");
Answer 2

Обычно пулы потоков завершаются только при завершении самого приложения: автор приложения знает, что у него есть некоторая фоновая обработка задач, пул слушателей базы данных, поэтому можно их один раз создать и не беспокоиться о них - в худшем случае потоки просто будут простаивать, занимая какое-то небольшое количество оперативки. Если вы оставите сам пул потоков живым до полного завершения приложения, на работе самого приложения это не скажется, а вот работать с ним будет легче, потому что не придется иметь в виду его жизненный цикл.

В вашем случае вы ищете не момент завершения пула потоков, а некоторый синхронизатор, который вам скажет о том, что все потребители отработали. Проще всего завести CountDownLatch, инициализировав его с помощью размера очереди, и снимая по единице при каждой обработке сущности из очереди - в этом случае вы можете с помощью него дождаться конца обработки. Можно обойтись и более низкоуровневыми примитивами (никогда с ними не работал, поэтому могу ошибиться):

public class Consumer implements Runnable {
    private final BlockingQueue<String> queue;
    private final Object synchronizer;
    public Consumer(BlockingQueue<String> queue, Object synchronizer) {
        this.queue = queue;
        this.synchronizer = synchronizer;
    }
    public void run() {
        String value;
        while ((value = queue.poll(0, TimeUnit.Milliseconds)) !== null) {
            // обработка
        }
        // в эту область код попадет только тогда, когда в очереди кончатся элементы
        synchronizer.notify();
    }
}
// код главного потока
for (Consumer consumer : consumers) {
    executor.submit(consumer);
}
synchronizer.wait();
Answer 3

Используйте java.util.concurrent.ThreadPoolExecutor, он гибче в использовании.

Пример для вашей задачи:

ThreadPoolExecutor poolExecutor = 
            new ThreadPoolExecutor(4, 4, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
for (int i = 0; i < 1_000_000; i++) 
     poolExecutor.execute(()->{
        //задача выполняемая в другом потоке
     });
while (poolExecutor.getActiveCount() != 0) 
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
poolExecutor.shutdownNow();
READ ALSO
Проблема с java кодом

Проблема с java кодом

Проблема в вычислении суммыВ первом коде использовали W(i+1)/Wi=f(i) - разница между текущим элементом и последующим, и складываем по очереди текущий...

295
Применение нейронных сетей и ИИ в Мобильных Приложениях.

Применение нейронных сетей и ИИ в Мобильных Приложениях.

Здравствуйте, хотелось бы узнать , примеры приложений , да и в целом, где и как применяются нейронные сети и ИИ в приложенияхСам пишу на java под...

339
Проблема с EditText

Проблема с EditText

Добрый день

312