Древовидные / вложенные структуры в Spark из реляционной модели данных

Что касается Монго 3.2, ответы на этот вопрос больше не верны. Новый оператор $ lookup, добавленный в конвейер агрегации, по существу идентичен левому внешнему соединению:

https://docs.mongodb.org/master/reference/operator/aggregation/lookup/# pipe._S_lookup

Из документов:

{
   $lookup:
     {
       from: ,
       localField: ,
       foreignField: ,
       as: 
     }
}

Конечно, Mongo является не реляционной базой данных, а разработчики стараются рекомендовать конкретные варианты использования для $ lookup, но по крайней мере, начиная с 3.2, соединение теперь возможно с MongoDB.

2
задан Sim 17 March 2019 в 20:03
поделиться

2 ответа

Кажется, вы пытаетесь прочитать нормализованные данные в дереве объектов Scala. Конечно, вы можете сделать это с помощью Spark, но Spark, возможно, не является оптимальным инструментом для этого. Если данные достаточно малы, чтобы поместиться в памяти, что, как я полагаю, соответствует вашему вопросу, библиотеки объектно-реляционного отображения (ORM) могут лучше подходить для этой работы.

Если вы все еще хотите использовать Spark, вы находитесь на правильном пути с groupBy и collect_list. Чего вам не хватает, так это функции struct().

case class Customer(id: Int)
case class Invoice(id: Int, customer_id: Int)

val customers = spark.createDataset(Seq(Customer(1))).as("customers")
val invoices = spark.createDataset(Seq(Invoice(1, 1), Invoice(2, 1)))

case class CombinedCustomer(id: Int, invoices: Option[Seq[Invoice]])

customers
  .join(
    invoices
      .groupBy('customer_id)
      .agg(collect_list(struct('*)).as("invoices"))
      .withColumnRenamed("customer_id", "id"), 
    Seq("id"), "left_outer")
  .as[CombinedCustomer]
  .show

struct('*) строит столбец StructType из всей строки. Вы также можете выбрать любые столбцы, например, struct('x.as("colA"), 'colB).

Это приводит к

+---+----------------+
| id|        invoices|
+---+----------------+
|  1|[[1, 1], [2, 1]]|
+---+----------------+

Теперь, в случае, когда ожидается, что данные клиента не помещаются в памяти, т. Е. Использование простого collect не является опцией, существует ряд различных стратегии вы можете принять.

Самый простой и тот, который вы должны рассмотреть вместо того, чтобы собирать его водителю, требует, чтобы независимая обработка данных каждого клиента была приемлемой. В этом случае попробуйте использовать map и распространить логику обработки для каждого клиента.

Если независимая обработка клиентом неприемлема, общая стратегия выглядит следующим образом:

  1. Агрегирование данных в структурированные строки по мере необходимости с использованием вышеуказанного подхода.

    [ 1112]
  2. Разделите данные, чтобы убедиться, что все, что вам нужно для обработки, находится в одном разделе.

  3. (опционально) sortWithinPartitions, чтобы гарантировать, что данные в разделе упорядочены так, как вам нужно.

  4. Использовать mapPartitions.

0
ответ дан Sim 17 March 2019 в 20:03
поделиться

Вы можете использовать Spark-SQL и иметь один набор данных для каждого клиента, счетов и товаров. Затем вы можете просто использовать соединения и агрегатные функции между этими наборами данных, чтобы получить желаемый результат.

Spark SQL имеет очень незначительную разницу в производительности между стилем SQL и программным способом.

0
ответ дан sandesh dahake 17 March 2019 в 20:03
поделиться
Другие вопросы по тегам:

Похожие вопросы: