Как корректно остановить приложение с ThreadPoolExecutor при SIGTERM?

126
19 октября 2019, 18:10

Есть приложение с пулом потоков.

public static void main(String[] args) {
...
ThreadPoolExecutor executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(threadPoolSize));
registerShutdownHook(executor);
        while (!executor.isShutdown()) {
                startExecutor(isOrderBy, isCheckOldChanges);
                if (countIterations == 0) isCheckOldChanges = false;
                countIterations++;
        }
...
System.out.println("Application was stop!");
}
private static void startExecutor(boolean isOrderBy, boolean isCheckOldChanges) {
        for (EntityModel model : allModels.values()) {
            if (executor.isShutdown()) {
                return;
            } else {
                Runnable worker = new ProducerThread(isOrderBy, isCheckOldChanges, model);
                if (executor.getQueue().remainingCapacity()==1){
                    while (executor.getQueue().remainingCapacity()==1){
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
                executor.execute(worker);
            }
        }
    }
private static void registerShutdownHook(ExecutorService executor) {
        final Thread mainThread = Thread.currentThread();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                executor.shutdown();
                mainThread.join();
            } catch (InterruptedException e) {
                log.error(e.getStackTrace());
            }
        }));
    }

При остановке приложения выбрасывает:

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.kafkaproducer.ProducerThread@57175e74 rejected from java.util.concurrent.ThreadPoolExecutor@7bb58ca3[Shutting down, pool size = 10, active threads = 10, queued tasks = 1, completed tasks = 35] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at com.ApplicationRun.startExecutor(ApplicationRun.java:101) at com.ApplicationRun.main(ApplicationRun.java:77)

Как сделать корректную остановку приложения?

Answer 1

Решил проблему с помощью

new ThreadPoolExecutor.CallerRunsPolicy().

А именно:

ExecutorService executor = new ThreadPoolExecutor(
                threadPoolSize,
                threadPoolSize,
                0L,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(threadPoolSize),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
        registerShutdownHook(executor); 

Не уверен на 100%, что это именно то решение, но помогло.

READ ALSO
Фабричный метод и анонимные классы

Фабричный метод и анонимные классы

Разбираюсь с абстрактными классами в java и в качестве примера их использования приводиться фабричный метод, и говориться что в данном случае...

92
Spring Data JPA закрывается соеденение с БД

Spring Data JPA закрывается соеденение с БД

Имеется Spring boot приложение, которое перегоняет большое количество данных получаемых по API в удаленную БД (MySQL)В качестве пула соединений использую...

126
Сигнатура конструктора в интерфейсе

Сигнатура конструктора в интерфейсе

Почему нельзя в интерфейсе задать сигнатуру конструктора без реализации?

105
При использовании Spring Data JpaRepository настройки через JavaConfig, вылетает NoSuchBeanDefinitionException

При использовании Spring Data JpaRepository настройки через JavaConfig, вылетает NoSuchBeanDefinitionException

Уже неделю голову ломаю, вроде всё так и всё на месте, но что-то не вижу, подскажите, пожалуйста, что не учёл

102