Поиск во всех полях от каждой таблицы базы данных MySQL

Насколько я помню, такая ситуация возникает, когда вы пишете в режиме добавления, и у вас есть много разделов в конечном местоположении. Spark извлекает существующие разделы и, возможно, схемы. Я бы предложил два возможных решения.

1) Если у вас не много разделов для записи за исполнение, вы можете попробовать следующее:

// Prepare data and cache it
// There are a lot of data, so a part of it most probably will be written to disk
val toBePublishedSignals = hiveCtx.sql("some query").persist(StorageLevel.MEMORY_AND_DISK_SER_2)

// Get all unique combinations of partitions columns
val partitions = toBePublishedSignals.selectExpr("A", "B", "C").distinct().collect()

// Write each combination as a separate partition
partitions.foreach { p =>
    val a = p.getAs[String]("A"))
    val b = p.getAs[String]("B"))
    val c = p.getAs[String]("C"))
    val path = new Path(new Path(new Path(getS3DataPath(), s"A=$a"), s"B=$b"), s"C=$c")
    toBePublishedSignals.filter(col("A") === a && col("B") === b && col("C") === c)
                       .write.format(JSON_DATA_FORMAT).mode(SaveMode.Append).save(path.toUri.toString)
}

И то же самое для метаданных.

// Prepare data and cache it
val metadataFiles = hiveCtx.sql("some query").distinct().persist(StorageLevel.MEMORY_AND_DISK_SER_2)

// Get all unique combinations of partitions columns
val partitions = metadataFiles.selectExpr("A", "C").distinct().collect()

// Write each combination as a separate partition
partitions.foreach { p =>
    val a = p.getAs[String]("A"))
    val c = p.getAs[String]("C"))
    val path = new Path(new Path(getS3MetadataPath(), s"A=$a"), s"C=$c")
    metadataFiles.filter(col("A") === a && col("C") === c)
                 .write.format(JSON_DATA_FORMAT).mode(SaveMode.Append).save(path.toUri.toString)
}

Я не знаю о типах данных столбцов разделов, поэтому в моем примере это строки. Код выше является только примером. Его можно переписать более общим способом, используя операцию свертывания и извлекая типы данных из схемы DataFrame.

2) Как вариант, можно читать записи из разделов, к которым вы собираетесь прикоснуться, в существующих данных и объединять их с входящими записями. Давайте представим, что A/B/C - это year/month/day соответственно. У нас есть некоторые новые данные, и df DataFrame является результатом обработки данных. После обработки мы следующие данные

2018|10|11|f1|f2|f3
2018|11|14|f1|f2|f3
2018|11|15|f1|f2|f3

Это означает, что нам нужно прочитать разделы из местоположения, которое содержит окончательные данные (местоположение, которое возвращается getS3DataPath())

year=2018/month=10/day=11
year=2018/month=11/day=14
year=2018/month=11/day=15

To Для этого нам нужно создать функцию фильтра, которая является комбинацией нескольких других функций. Мы используем Reduce для их объединения с использованием следующей логики:

year=2018 && month=10 && day=11
or
year=2018 && month=11 && day=14
or
year=2018 && month=11 && day=15
// Do processing
val toBePublishedSignalsNew = hiveCtx.sql("some query")

// Create a filter function for querying existing data
val partitions = toBePublishedSignalsNew.selectExpr("A", "B", "C").distinct().collect()
val filterFunction = partitions.map { partitionValues =>
    partitionColumns.map { columnName =>
        (input: Row) => input.getAs[String](columnName) == partitionValues.getAs[String](columnName)
    }.reduceOption((f1, f2) => (row: Row) => f1(row) && f2(row)).getOrElse((_: Row) => false)
}.reduceOption((f1, f2) => (row: Row) => f1(row) || f2(row)).getOrElse((_: Row) => false)

// Read existing partitions that match incoming data
val toBePublishedSignalsExisting = sparkSession.read.json(getS3DataPath()).filter(filterFunction)

// Combine new and existing data and write the result to a temporary location
toBePublishedSignalsExisting
    .union(toBePublishedSignalsNew)
    .write
    .partitionBy("A", "B", "C")
    .format(JSON_DATA_FORMAT)
    .mode(SaveMode.Overwrite)
    .save(temporaryLocationS3)

После этого вам нужно будет заменить разделы в местоположении, которое возвращает getS3DataPath(), на разделы, расположенные в temporaryLocationS3. Приведенный выше пример будет работать, только если столбцы разделов содержат строки. Если они имеют другие типы данных, вам, вероятно, придется добавить некоторое сопоставление для функций фильтра. Например, для IntegerType это будет выглядеть как

(input: Row) => input.getAs[Int](columnName) == partitionValues.getAs[Int](columnName)
289
задан random 28 May 2015 в 18:11
поделиться

3 ответа

Можно посмотреть в information_schema схема. Это имеет список всех таблиц и всех полей, которые находятся в таблице. Можно тогда выполнить запросы с помощью информации, которую Вы получили от этой таблицы.

включенными таблицами являются СХЕМЫ, ТАБЛИЦЫ и СТОЛБЦЫ. Существуют внешние ключи, таким образом, что можно расти точно, как таблицы составлены в схеме.

78
ответ дан Milhous 23 November 2019 в 01:45
поделиться

Вы могли использовать

SHOW TABLES;

, Тогда получают столбцы в тех таблицах (в цикле) с

SHOW COLUMNS FROM table;

, и затем с той информацией создают много много запросов, которые Вы можете также ОБЪЕДИНЕНИЕ, если Вам нужно.

, Но это чрезвычайно тяжело на базе данных. Особенно, если Вы делаете ПОДОБНЫЙ поиск.

3
ответ дан Ólafur Waage 23 November 2019 в 01:45
поделиться

Вы можете использовать этот проект: http://code.google.com/p/anywhereindb

Будет произведен поиск всех данных во всей таблице.

41
ответ дан 23 November 2019 в 01:45
поделиться
Другие вопросы по тегам:

Похожие вопросы: