Оптимизация записи разделенных данных в S3 в spark sql

.*[^0-9]{1,}.*

Отлично работает для нас.

Мы хотим использовать использованный ответ, но он не работает в модели YANG.

И тот, который я здесь предоставил, понять и ясно: начало и конец могут быть любыми символами, но, но должен быть хотя бы один НЕМЕРИАЛЬНЫЙ характер, который является самым большим.

2
задан GothamGirl 16 January 2019 в 14:03
поделиться

1 ответ

Насколько я помню, такая ситуация возникает, когда вы пишете в режиме добавления, и у вас есть много разделов в конечном местоположении. Spark извлекает существующие разделы и, возможно, схемы. Я бы предложил два возможных решения.

1) Если у вас не много разделов для записи за исполнение, вы можете попробовать следующее:

// Prepare data and cache it
// There are a lot of data, so a part of it most probably will be written to disk
val toBePublishedSignals = hiveCtx.sql("some query").persist(StorageLevel.MEMORY_AND_DISK_SER_2)

// Get all unique combinations of partitions columns
val partitions = toBePublishedSignals.selectExpr("A", "B", "C").distinct().collect()

// Write each combination as a separate partition
partitions.foreach { p =>
    val a = p.getAs[String]("A"))
    val b = p.getAs[String]("B"))
    val c = p.getAs[String]("C"))
    val path = new Path(new Path(new Path(getS3DataPath(), s"A=$a"), s"B=$b"), s"C=$c")
    toBePublishedSignals.filter(col("A") === a && col("B") === b && col("C") === c)
                       .write.format(JSON_DATA_FORMAT).mode(SaveMode.Append).save(path.toUri.toString)
}

И то же самое для метаданных.

// Prepare data and cache it
val metadataFiles = hiveCtx.sql("some query").distinct().persist(StorageLevel.MEMORY_AND_DISK_SER_2)

// Get all unique combinations of partitions columns
val partitions = metadataFiles.selectExpr("A", "C").distinct().collect()

// Write each combination as a separate partition
partitions.foreach { p =>
    val a = p.getAs[String]("A"))
    val c = p.getAs[String]("C"))
    val path = new Path(new Path(getS3MetadataPath(), s"A=$a"), s"C=$c")
    metadataFiles.filter(col("A") === a && col("C") === c)
                 .write.format(JSON_DATA_FORMAT).mode(SaveMode.Append).save(path.toUri.toString)
}

Я не знаю о типах данных столбцов разделов, поэтому в моем примере это строки. Код выше является только примером. Его можно переписать более общим способом, используя операцию свертывания и извлекая типы данных из схемы DataFrame.

2) Как вариант, можно читать записи из разделов, к которым вы собираетесь прикоснуться, в существующих данных и объединять их с входящими записями. Давайте представим, что A/B/C - это year/month/day соответственно. У нас есть некоторые новые данные, и df DataFrame является результатом обработки данных. После обработки мы следующие данные

2018|10|11|f1|f2|f3
2018|11|14|f1|f2|f3
2018|11|15|f1|f2|f3

Это означает, что нам нужно прочитать разделы из местоположения, которое содержит окончательные данные (местоположение, которое возвращается getS3DataPath())

year=2018/month=10/day=11
year=2018/month=11/day=14
year=2018/month=11/day=15

To Для этого нам нужно создать функцию фильтра, которая является комбинацией нескольких других функций. Мы используем Reduce для их объединения с использованием следующей логики:

year=2018 && month=10 && day=11
or
year=2018 && month=11 && day=14
or
year=2018 && month=11 && day=15
// Do processing
val toBePublishedSignalsNew = hiveCtx.sql("some query")

// Create a filter function for querying existing data
val partitions = toBePublishedSignalsNew.selectExpr("A", "B", "C").distinct().collect()
val filterFunction = partitions.map { partitionValues =>
    partitionColumns.map { columnName =>
        (input: Row) => input.getAs[String](columnName) == partitionValues.getAs[String](columnName)
    }.reduceOption((f1, f2) => (row: Row) => f1(row) && f2(row)).getOrElse((_: Row) => false)
}.reduceOption((f1, f2) => (row: Row) => f1(row) || f2(row)).getOrElse((_: Row) => false)

// Read existing partitions that match incoming data
val toBePublishedSignalsExisting = sparkSession.read.json(getS3DataPath()).filter(filterFunction)

// Combine new and existing data and write the result to a temporary location
toBePublishedSignalsExisting
    .union(toBePublishedSignalsNew)
    .write
    .partitionBy("A", "B", "C")
    .format(JSON_DATA_FORMAT)
    .mode(SaveMode.Overwrite)
    .save(temporaryLocationS3)

После этого вам нужно будет заменить разделы в местоположении, которое возвращает getS3DataPath(), на разделы, расположенные в temporaryLocationS3. Приведенный выше пример будет работать, только если столбцы разделов содержат строки. Если они имеют другие типы данных, вам, вероятно, придется добавить некоторое сопоставление для функций фильтра. Например, для IntegerType это будет выглядеть как

(input: Row) => input.getAs[Int](columnName) == partitionValues.getAs[Int](columnName)
0
ответ дан Dmitry Y. 16 January 2019 в 14:03
поделиться
Другие вопросы по тегам:

Похожие вопросы: