Почему Spark Core тормозит?

212
15 апреля 2022, 17:40

При работе только с spark sql запросы на базу обрабатываются очень быстро, но при подключении JavaPairJDD он начинает тормозить.

Я хотел бы узнать, есть ли возможность ускорить обработку данных или же как-то улучшить код для того чтобы он не зависал?

Вот запуск без JavaPairRDD

collectAsList at StackOverFlow.java:56, took 0.883860 s

А вот с JavaPairRDD

collectAsMap at StackOverFlow.java:61, took 128.340516 s

А вот сам Код

public List<Order> getAllWithoutPairRDD(Optional<String> search, Optional<Integer> size) {
    SparkSession session = SparkSession.builder().config(config).getOrCreate();
    Properties properties = new Properties();
    properties.setProperty("partitionColumn", "id");
    properties.setProperty("fetchsize", "1000");
    properties.setProperty("driver", "org.postgresql.Driver");
    properties.setProperty("user", "postgres");
    properties.setProperty("password", "password");
    Dataset<Row> jdbc = session.read().jdbc("jdbc:postgresql://localhost/orders"
            , "orders"
            , "id"
            , 1L
            , 60000000L
            , 100
            , properties
    );
    Dataset<Row> ordersData = jdbc.select(col("*")).where(col("city_id").equalTo(3L)).limit(size.orElse(1));
    JavaPairRDD<Timestamp, Integer> analyticPairRDD = ordersData.toJavaRDD().mapToPair((PairFunction<Row, Timestamp, Integer>) row -> new Tuple2<Timestamp, Integer>((Timestamp) row.get(0), 1));
    JavaPairRDD<Timestamp,Integer> result = analyticPairRDD.groupByKey().mapValues(Iterables::size);
    return ordersData.as(orderEncoder).collectAsList();
}
public Map<Timestamp, Integer> getAllWithPairRDD(Optional<String> search, Optional<Integer> size) {
    SparkSession session = SparkSession.builder().config(config).getOrCreate();
    Properties properties = new Properties();
    properties.setProperty("partitionColumn", "id");
    properties.setProperty("fetchsize", "1000");
    properties.setProperty("driver", "org.postgresql.Driver");
    properties.setProperty("user", "postgres");
    properties.setProperty("password", "password");
    Dataset<Row> jdbc = session.read().jdbc("jdbc:postgresql://localhost/orders"
            , "orders"
            , "id"
            , 1L
            , 60000000L
            , 100
            , properties
    );
    Dataset<Row> ordersData = jdbc.select(col("operation_date")).where(col("city_id").equalTo(3L)).limit(size.orElse(1));
    JavaPairRDD<Timestamp, Integer> analyticPairRDD = ordersData.toJavaRDD().mapToPair((PairFunction<Row, Timestamp, Integer>) row -> new Tuple2<Timestamp, Integer>((Timestamp) row.get(0), 1));
    JavaPairRDD<Timestamp,Integer> result = analyticPairRDD.groupByKey().mapValues(Iterables::size);
    return result.collectAsMap();
}
@Bean
public SparkConf sparkConf() {
    return new SparkConf()
            .setAppName(appName)
            .setMaster(masterUri)
            .set("spark.executor.memory", "2g")
            .set("spark.executor.cores", "10")
            .set("spark.executor.memoryOverhead", "1g")
            .set("spark.driver.cores", "10")
            .set("spark.driver.memory", "3g")
            .set("spark.yarn.am.memory", "2g")
            .set("spark.yarn.am.cores", "4")
            .set("spark.sql.shuffle.partitions", "1000")
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
}
Answer 1

скорее всего он тормозит из-за того, что данные должны быть раскиданы по экзекуторам - pair RDD требует раскидывания данных по ключу, и это может быть медленным процессом, конечно это зависит от объема данных...

Но в данном случае я не вижу нужды использовать RDD - лучше использовать функции Spark SQL, или вообще работать в чистом SQL, выполняя все нужные аггрегации - тогда спарк сам оптимизует выполнение, в отличии от RDD где это не делается.

P.S. есть много других проблем в этом случае:

  1. запрос к базе не обязательно будет выполнятся в параллель, тогда данные будут вытаскиваться только одним экзекутором
  2. spark.sql.shuffle.partitions установлен в слишком большое число - 1000 - это означает что данные будут раскиданы по 1000 разделов (если будет shuffle), и для каждого будет создан таск, который должен быть выполнен, а при 10 ядрах на экзекуторе, это будет 100 итераций. Обычно этот параметр устанавливается в 1 или 2 умножить на полное количество ядер на экзекуторах
READ ALSO
Как отключить горячие клавиши в java FX

Как отключить горячие клавиши в java FX

У меня в java FX приложении при нажатии на ENTER должно выполняться определённое действие, но есть проблема: если я кликаю на какой-то Button (мышкой),...

179
JAVA регулярные выражения. Найти по шаблону слово и из этого слова извлечь &quot;подслово&quot; [дубликат]

JAVA регулярные выражения. Найти по шаблону слово и из этого слова извлечь "подслово" [дубликат]

ЗдраствуйтеНадо в JAVA регулярным выражением найти по шаблону слово и из этого слова извлечь "подслово"

224
Java. Как найти объект в ArrayList&#39;е?

Java. Как найти объект в ArrayList'е?

Задание создать ArrayList с продуктами, у которых есть название, id, и количествоА затем осуществить поиск по части названия и вывести подходящие...

250