Мы можем использовать новый DataFrameRDD для чтения и записи CSV-данных. Существует несколько преимуществ DataFrameRDD над NormalRDD:
Вам понадобится библиотека: Добавьте ее в build.sbt
libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.2.0"
Код искры Scala для него:
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val csvInPath = "/path/to/csv/abc.csv"
val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(csvInPath)
//format is for specifying the type of file you are reading
//header = true indicates that the first line is header in it
Чтобы преобразовать в обычный RDD, взяв некоторые из столбцов из него и
val rddData = df.map(x=>Row(x.getAs("colA")))
//Do other RDD operation on it
Сохранение формат RDD в формате CSV:
val aDf = sqlContext.createDataFrame(rddData,StructType(Array(StructField("colANew",StringType,true))))
aDF.write.format("com.databricks.spark.csv").option("header","true").save("/csvOutPath/aCSVOp")
Поскольку заголовок имеет значение true, мы получим имя заголовка во всех выходных файлах.