Взял пример отсюда, изменил только папку baseDir, файл в нее положил. Постоянно появляется ошибка: Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1.0 (TID 2, localhost): java.lang.NullPointerException. Ругается на последнюю строку, файл нужный не записывается.
public class StormReportsRecordReader {
public static void main(String[] args)throws Exception {
int numLinesToSkip = 0;
String delimiter = ",";
* Specify the root directory
* If you are working from home replace baseDir
* with the location you downloaded the reports.csv
* file to.
String baseDir = "/Users/storm/";
String fileName = "reports.csv";
String inputPath = baseDir + fileName;
String timeStamp = String.valueOf(new Date().getTime());
String outputPath = baseDir + "reports_processed_" + timeStamp;
* Data file looks like this
* 161006-1655,UNK,2 SE BARTLETT,LABETTE,KS,37.03,-95.19,
* Fields are
* datetime,severity,location,county,state,lat,lon,comment,type
Schema inputDataSchema = new Schema.Builder()
* Define a transform process to extract lat and lon
* and also transform type from one of three strings
* to either 0,1,2
TransformProcess tp = new TransformProcess.Builder(inputDataSchema)
* Some code to step through and print the before
* and after Schema
int numActions = tp.getActionList().size();
for (int i = 0; i<numActions; i++){
System.out.println("--- Schema after step " + i +
" (" + tp.getActionList().get(i) + ")--" );
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("Storm Reports Record Reader Transform");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
* Get our data into a spark RDD
* and transform that spark RDD using our
* transform process
// read the data file
JavaRDD<String> lines = sc.textFile(inputPath);
// convert to Writable
JavaRDD<List<Writable>> stormReports = lines.map(new StringToWritablesFunction(new CSVRecordReader()));
// run our transform process
JavaRDD<List<Writable>> processed = SparkTransformExecutor.execute(stormReports,tp);
// convert Writable back to string for export
JavaRDD<String> toSave= processed.map(new WritablesToStringFunction(","));
Пропишите полный путь к директории с файлом ("./Users/storm/"), если же вы запускаете через студию, то укажите рабочую дирректорию проекта.
