Взял пример отсюда, изменил только папку baseDir, файл в нее положил. Постоянно появляется ошибка: Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1.0 (TID 2, localhost): java.lang.NullPointerException. Ругается на последнюю строку, файл нужный не записывается.
public class StormReportsRecordReader {
public static void main(String[] args)throws Exception {
int numLinesToSkip = 0;
String delimiter = ",";
/**
* Specify the root directory
* If you are working from home replace baseDir
* with the location you downloaded the reports.csv
* file to.
*/
String baseDir = "/Users/storm/";
String fileName = "reports.csv";
String inputPath = baseDir + fileName;
String timeStamp = String.valueOf(new Date().getTime());
String outputPath = baseDir + "reports_processed_" + timeStamp;
/**
* Data file looks like this
* 161006-1655,UNK,2 SE BARTLETT,LABETTE,KS,37.03,-95.19,
* TRAINED SPOTTER REPORTS TORNADO ON THE GROUND. (ICT),TOR
* Fields are
* datetime,severity,location,county,state,lat,lon,comment,type
*/
Schema inputDataSchema = new Schema.Builder()
.addColumnsString("datetime","severity","location","county","state")
.addColumnsDouble("lat","lon")
.addColumnsString("comment")
.addColumnCategorical("type","TOR","WIND","HAIL")
.build();
/**
* Define a transform process to extract lat and lon
* and also transform type from one of three strings
* to either 0,1,2
*/
TransformProcess tp = new TransformProcess.Builder(inputDataSchema)
.removeColumns("datetime","severity","location","county","state","comment")
.categoricalToInteger("type")
.build();
/**
* Some code to step through and print the before
* and after Schema
*/
int numActions = tp.getActionList().size();
for (int i = 0; i<numActions; i++){
System.out.println("\n\n===============================");
System.out.println("--- Schema after step " + i +
" (" + tp.getActionList().get(i) + ")--" );
System.out.println(tp.getSchemaAfterStep(i));
}
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[*]");
sparkConf.setAppName("Storm Reports Record Reader Transform");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
/**
* Get our data into a spark RDD
* and transform that spark RDD using our
* transform process
*/
// read the data file
JavaRDD<String> lines = sc.textFile(inputPath);
// convert to Writable
JavaRDD<List<Writable>> stormReports = lines.map(new StringToWritablesFunction(new CSVRecordReader()));
// run our transform process
JavaRDD<List<Writable>> processed = SparkTransformExecutor.execute(stormReports,tp);
// convert Writable back to string for export
JavaRDD<String> toSave= processed.map(new WritablesToStringFunction(","));
toSave.saveAsTextFile(outputPath);
}
}
Пропишите полный путь к директории с файлом ("./Users/storm/"), если же вы запускаете через студию, то укажите рабочую дирректорию проекта.
Кофе для программистов: как напиток влияет на продуктивность кодеров?
Рекламные вывески: как привлечь внимание и увеличить продажи
Стратегії та тренди в SMM - Технології, що формують майбутнє сьогодні
Выделенный сервер, что это, для чего нужен и какие характеристики важны?
Современные решения для бизнеса: как облачные и виртуальные технологии меняют рынок
В сервлете необходимо получить инстанс DAO который обернут в AtomicReference, но при приведении типов компилятор волнуется и пишет:
Есть ли такой макрос, который автоматически генерировал в коде все объекты из filefxml?