При работе только с spark sql запросы на базу обрабатываются очень быстро, но при подключении JavaPairJDD он начинает тормозить.
Я хотел бы узнать, есть ли возможность ускорить обработку данных или же как-то улучшить код для того чтобы он не зависал?
Вот запуск без JavaPairRDD
collectAsList at StackOverFlow.java:56, took 0.883860 s
А вот с JavaPairRDD
collectAsMap at StackOverFlow.java:61, took 128.340516 s
А вот сам Код
public List<Order> getAllWithoutPairRDD(Optional<String> search, Optional<Integer> size) {
SparkSession session = SparkSession.builder().config(config).getOrCreate();
Properties properties = new Properties();
properties.setProperty("partitionColumn", "id");
properties.setProperty("fetchsize", "1000");
properties.setProperty("driver", "org.postgresql.Driver");
properties.setProperty("user", "postgres");
properties.setProperty("password", "password");
Dataset<Row> jdbc = session.read().jdbc("jdbc:postgresql://localhost/orders"
, "orders"
, "id"
, 1L
, 60000000L
, 100
, properties
);
Dataset<Row> ordersData = jdbc.select(col("*")).where(col("city_id").equalTo(3L)).limit(size.orElse(1));
JavaPairRDD<Timestamp, Integer> analyticPairRDD = ordersData.toJavaRDD().mapToPair((PairFunction<Row, Timestamp, Integer>) row -> new Tuple2<Timestamp, Integer>((Timestamp) row.get(0), 1));
JavaPairRDD<Timestamp,Integer> result = analyticPairRDD.groupByKey().mapValues(Iterables::size);
return ordersData.as(orderEncoder).collectAsList();
}
public Map<Timestamp, Integer> getAllWithPairRDD(Optional<String> search, Optional<Integer> size) {
SparkSession session = SparkSession.builder().config(config).getOrCreate();
Properties properties = new Properties();
properties.setProperty("partitionColumn", "id");
properties.setProperty("fetchsize", "1000");
properties.setProperty("driver", "org.postgresql.Driver");
properties.setProperty("user", "postgres");
properties.setProperty("password", "password");
Dataset<Row> jdbc = session.read().jdbc("jdbc:postgresql://localhost/orders"
, "orders"
, "id"
, 1L
, 60000000L
, 100
, properties
);
Dataset<Row> ordersData = jdbc.select(col("operation_date")).where(col("city_id").equalTo(3L)).limit(size.orElse(1));
JavaPairRDD<Timestamp, Integer> analyticPairRDD = ordersData.toJavaRDD().mapToPair((PairFunction<Row, Timestamp, Integer>) row -> new Tuple2<Timestamp, Integer>((Timestamp) row.get(0), 1));
JavaPairRDD<Timestamp,Integer> result = analyticPairRDD.groupByKey().mapValues(Iterables::size);
return result.collectAsMap();
}
@Bean
public SparkConf sparkConf() {
return new SparkConf()
.setAppName(appName)
.setMaster(masterUri)
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "10")
.set("spark.executor.memoryOverhead", "1g")
.set("spark.driver.cores", "10")
.set("spark.driver.memory", "3g")
.set("spark.yarn.am.memory", "2g")
.set("spark.yarn.am.cores", "4")
.set("spark.sql.shuffle.partitions", "1000")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
}
скорее всего он тормозит из-за того, что данные должны быть раскиданы по экзекуторам - pair RDD требует раскидывания данных по ключу, и это может быть медленным процессом, конечно это зависит от объема данных...
Но в данном случае я не вижу нужды использовать RDD - лучше использовать функции Spark SQL, или вообще работать в чистом SQL, выполняя все нужные аггрегации - тогда спарк сам оптимизует выполнение, в отличии от RDD где это не делается.
P.S. есть много других проблем в этом случае:
spark.sql.shuffle.partitions
установлен в слишком большое число - 1000
- это означает что данные будут раскиданы по 1000 разделов (если будет shuffle), и для каждого будет создан таск, который должен быть выполнен, а при 10 ядрах на экзекуторе, это будет 100 итераций. Обычно этот параметр устанавливается в 1 или 2 умножить на полное количество ядер на экзекуторахВиртуальный выделенный сервер (VDS) становится отличным выбором
У меня в java FX приложении при нажатии на ENTER должно выполняться определённое действие, но есть проблема: если я кликаю на какой-то Button (мышкой),...
ЗдраствуйтеНадо в JAVA регулярным выражением найти по шаблону слово и из этого слова извлечь "подслово"
Задание создать ArrayList с продуктами, у которых есть название, id, и количествоА затем осуществить поиск по части названия и вывести подходящие...