Как сохранить данные в формате текстового файла GZ в pyspark? ((Но не в формате csv) [дублировать]

yes: :has()

поддержка браузера: none

10
задан Jacek Laskowski 28 November 2016 в 21:06
поделиться

4 ответа

В искро-csv github: https://github.com/databricks/spark-csv

Можно прочитать:

codec: кодек сжатия для использования при сохранении в файл. Должно быть полное имя класса, реализующего org.apache.hadoop.io.compress.CompressionCodec или один из нечувствительных к регистру сокращений имен (bzip2, gzip, lz4 и snappy).

В вашем случае это должно работать: df.write.format("com.databricks.spark.csv").codec("gzip")\ .save('my_directory/my_file.gzip')

5
ответ дан Alex-Antoine Fortin 19 August 2018 в 15:45
поделиться

Этот код работает для Spark 2.1, где .codec недоступен.

df.write
  .format("com.databricks.spark.csv")
  .option("codec", "org.apache.hadoop.io.compress.GzipCodec")
  .save(my_directory)

Для Spark 2.2 вы можете использовать опцию df.write.csv(...,codec="gzip"), описанную здесь: https://spark.apache.org/docs/latest/api/python/pyspark.sql. HTML? изюминка = кодек

15
ответ дан Andy Reagan 19 August 2018 в 15:45
поделиться
  • 1
    Хотя этот код может ответить на вопрос, предоставляя дополнительный контекст относительно того, почему и / или как этот код отвечает на вопрос, улучшает его долгосрочную ценность. – manniL 23 March 2017 в 23:16
  • 2
    В случае использования "json" формат, сжатие не получает – Disha 8 November 2017 в 03:32
  • 3
    Похоже, что аргумент ключевого слова был изменен на compression. [Д0] spark.apache.org/docs/latest/api/python/… – volker238 2 March 2018 в 17:53

Чтобы записать CSV-файл с заголовками и переименовать файл part-000 в .csv.gzip

DF.coalesce(1).write.format("com.databricks.spark.csv").mode("overwrite")
.option("header","true")
.option("codec",org.apache.hadoop.io.compress.GzipCodec").save(tempLocationFileName)

copyRename(tempLocationFileName, finalLocationFileName)

def copyRename(srcPath: String, dstPath: String): Unit =  {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig)
  FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null)
  // the "true" setting deletes the source files once they are merged into the new output
}

Если вам не нужен заголовок, установите его в false, и вы не будете необходимо также выполнить объединение. Это будет быстрее написать тоже.

1
ответ дан morfious902002 19 August 2018 в 15:45
поделиться

С Spark 2.0+ это стало немного проще:

df.write.csv("path", compression="gzip")

Вам больше не нужен внешний CSV-пакет Databricks.

Автор csv() поддерживает несколько удобных опций. Например:

  • sep: для установки символа разделителя.
  • quote: и как указывать значения.
  • header: Включить ли строку заголовка.

В дополнение к gzip также можно использовать еще несколько кодеков сжатия:

  • bzip2
  • lz4
  • snappy
  • deflate

Полные документы Spark для csv() здесь: Python / Scala

7
ответ дан Nick Chammas 19 August 2018 в 15:45
поделиться
  • 1
    Спасибо, что ссылались на документы csv writer, и не давали ответные данные только ответ! – Laurens Koppenol 14 December 2017 в 09:36
  • 2
    @LaurensKoppenol - Ну, честно говоря, поддержка CSV, добавленная в исходную игру Spark, первоначально начиналась как внешний CSV-пакет Databricks , связанный с в принятом ответе. :) Этот пакет доступен любому пользователю Spark, но, начиная с Spark 2.0, он больше не нужен. – Nick Chammas 15 December 2017 в 17:44
Другие вопросы по тегам:

Похожие вопросы: