Thread pool и ForkJoin pool

203
03 сентября 2021, 23:30

Я делаю свой домашний проект и столкнулся с проблемой производительности. В моей бд лежит около 10000 записей, в каждой записи лежит ссылка. Каждые полторы минуты, мне нужно достать все ссылки из базы, обратиться к vk api по каждой ссылке, спарсить json и записать данные в базу. В последовательном режиме это было бы очень долго, поэтому я написал такой код `

@Override
@Scheduled(cron = "${collecting_cron}")
public void collectingInformation() {
    try {
        val allUserInformation = userInformationService.findAll();
        val countDownLatch = new CountDownLatch(allUserInformation.size());
        log.info("Information collector executor launched");
        UtilService.createSequentialOrParallelStreamFromCollection(allUserInformation).forEach(userInformation -> {
            collectorThreadPoolExecutor.submit(() -> {
                collectorCommand.computeOnlineTime(userInformation);
                collectorCommand.checkLastOnlineTime(userInformation);
                collectorCommand.checkNewFriends(userInformation);
                collectorCommand.checkNewWallPosts(userInformation);
                collectorCommand.checkNewPhotos(userInformation);
                collectorCommand.checkNewSubscribes(userInformation);
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();
        log.info("All information collected, executor stopped");
    } catch (InterruptedException ignored) {
    }
}` 

После этого я узнал про ForkJoinPool и сделал реализацию этой же задачи на нем: `

@Override
@Scheduled(cron = "${collecting_cron}")
public void collectingInformation() {
    val allUserInformation = userInformationService.findAll();
    val recursiveCollectorTask = new InformationCollectorRecursiveAction(allUserInformation);
    log.info("Information collector forkJoinPool launched");
    collectorForkJoinPool.execute(recursiveCollectorTask);
    recursiveCollectorTask.join();
    log.info("All information collected, executor stopped");
}
@AllArgsConstructor
private class InformationCollectorRecursiveAction extends RecursiveAction {
    private Set<UserInformation> allUserInformation;
    @Override
    protected void compute() {
        if (allUserInformation.size() >= THRESHOLD) {
            ForkJoinTask.invokeAll(this.splitTask());
        } else {
            allUserInformation.forEach(userInformation -> {
                collectorCommand.computeOnlineTime(userInformation);
                collectorCommand.checkLastOnlineTime(userInformation);
                collectorCommand.checkNewFriends(userInformation);
                collectorCommand.checkNewPhotos(userInformation);
                collectorCommand.checkNewSubscribes(userInformation);
                collectorCommand.checkNewWallPosts(userInformation);
            });
        }
    }
    private Collection<InformationCollectorRecursiveAction> splitTask() {
        val tasks = new ArrayList<InformationCollectorRecursiveAction>();
        val leftTask = allUserInformation.stream()
                .limit((long) Math.floor(allUserInformation.size() >> 1))
                .collect(Collectors.toSet());
        val rightTask = allUserInformation.stream()
                .skip(allUserInformation.size() >> 1)
                .collect(Collectors.toSet());
        tasks.add(new InformationCollectorRecursiveAction(leftTask));
        tasks.add(new InformationCollectorRecursiveAction(rightTask));
        return tasks;
    }
}`

В данный момент я еще не реализовал методы collectorCommand и там стоит Thread.sleep(5). Сейчас я измеряю скорость работы через аспект: `

    public Object printComputeMethodTime(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
    val start = System.currentTimeMillis();
    val obj = proceedingJoinPoint.proceed();
    val end = System.currentTimeMillis() - start;
    log.info(String.format("%s.%s() completed in %d ms"
            , proceedingJoinPoint.getSignature().getDeclaringType().getSimpleName()
            , proceedingJoinPoint.getSignature().getName(), end));
    return obj;`

Оба пула я объявляю как spring бины: `

@Bean
public ThreadPoolTaskExecutor collectorThreadPoolExecutor() {
    val collectorThreadPoolExecutor = new ThreadPoolTaskExecutor();
    collectorThreadPoolExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors() + 1);
    collectorThreadPoolExecutor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
    collectorThreadPoolExecutor.setThreadPriority(Thread.MAX_PRIORITY);
    collectorThreadPoolExecutor.setThreadNamePrefix("collector-");
    return collectorThreadPoolExecutor;
}
@Bean
public ForkJoinPool collectorForkJoinPool() {
    return new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
}`

Так вот у меня возникло несколько вопросов:

1) Стоит ли таким образом измерять время работы метода? Если есть другие варианты, то буду очень признателен за подсказку.

2) После измерений через аспект, forkjoin pool с Runtime.getRuntime().availableProcessors() потоков, почему-то в 2 раза медленнее, чем тот же forkjoin c Runtime.getRuntime().availableProcessors() * 2 потоков, хотя в многих туториалах рекомендуют commonPool(). Из-за чего это возможно?

3) Как грамотно выбрать threshold для forkjoin пула?

Это мой первый проект, поэтому хотелось бы услышать советы по использованию пулов. Заранее спасибо.

Answer 1

1) Судя по названиям методов, задержки приложения будут скорее связаны не с Вашей машиной - а с ожиданиями ответов от стороннего сервиса. Так что тут можно как угодно измерять - точность не так важна.

2) commonPool рекомендуют потому, что если Вы создаете через new новый ForkJoinPool, то их в системе появляется два(!) (как минимум) - Ваш и системный commonPool. Планировать оптимально загрузку при наличии двух мешающих друг другу пулов очень трудно. Поэтому потери на организацию работы потоков резко возрастают. Поэтому создавать через new ForkJoinPool нужно реально только тогда, когда Вы точно знаете, что делаете, и сможете это обосновать.

3) threshold надо планировать таким образом: задача объемом N разбивается на примерное количество ядер машины, а потом еще делится на 10..100 - в результате получите количество задач, которое должен выполнить один поток пула.

READ ALSO
Java веб стэк технологий

Java веб стэк технологий

Вроде изучаю Spring, но тем не менее никак не могу понять(и найти), реальный пример с продакшн стэком технологий, которые используют в реальных...

157
Бесконечное меню

Бесконечное меню

Здраствуйте, может кто знает, как сделать так, чтобы меню бесконечно повторялось после выполнение задания (выход с меню в 4 пункте есть)То...

345
Почему два одинаковые алгоритмы дают разный результат?

Почему два одинаковые алгоритмы дают разный результат?

Уже много раз сталкиваюсь с одной проблемойЕсть два алгоритма на Java и Python

135
Подчеркивание нескольких подстрок в JTextPane

Подчеркивание нескольких подстрок в JTextPane

Я делаю в программе поле для ввода (JTextPane) с проверкой правописанияИ хочу сделать подчеркивание у каждого слова, к которому есть варианты...

209