Что касается Монго 3.2, ответы на этот вопрос больше не верны. Новый оператор $ lookup, добавленный в конвейер агрегации, по существу идентичен левому внешнему соединению:
https://docs.mongodb.org/master/reference/operator/aggregation/lookup/# pipe._S_lookup
Из документов:
{
$lookup:
{
from: ,
localField: ,
foreignField: ,
as:
Конечно, Mongo является не реляционной базой данных, а разработчики стараются рекомендовать конкретные варианты использования для $ lookup, но по крайней мере, начиная с 3.2, соединение теперь возможно с MongoDB.
Кажется, вы пытаетесь прочитать нормализованные данные в дереве объектов Scala. Конечно, вы можете сделать это с помощью Spark, но Spark, возможно, не является оптимальным инструментом для этого. Если данные достаточно малы, чтобы поместиться в памяти, что, как я полагаю, соответствует вашему вопросу, библиотеки объектно-реляционного отображения (ORM) могут лучше подходить для этой работы.
Если вы все еще хотите использовать Spark, вы находитесь на правильном пути с groupBy
и collect_list
. Чего вам не хватает, так это функции struct()
.
case class Customer(id: Int)
case class Invoice(id: Int, customer_id: Int)
val customers = spark.createDataset(Seq(Customer(1))).as("customers")
val invoices = spark.createDataset(Seq(Invoice(1, 1), Invoice(2, 1)))
case class CombinedCustomer(id: Int, invoices: Option[Seq[Invoice]])
customers
.join(
invoices
.groupBy('customer_id)
.agg(collect_list(struct('*)).as("invoices"))
.withColumnRenamed("customer_id", "id"),
Seq("id"), "left_outer")
.as[CombinedCustomer]
.show
struct('*)
строит столбец StructType
из всей строки. Вы также можете выбрать любые столбцы, например, struct('x.as("colA"), 'colB)
.
Это приводит к
+---+----------------+
| id| invoices|
+---+----------------+
| 1|[[1, 1], [2, 1]]|
+---+----------------+
Теперь, в случае, когда ожидается, что данные клиента не помещаются в памяти, т. Е. Использование простого collect
не является опцией, существует ряд различных стратегии вы можете принять.
Самый простой и тот, который вы должны рассмотреть вместо того, чтобы собирать его водителю, требует, чтобы независимая обработка данных каждого клиента была приемлемой. В этом случае попробуйте использовать map
и распространить логику обработки для каждого клиента.
Если независимая обработка клиентом неприемлема, общая стратегия выглядит следующим образом:
Агрегирование данных в структурированные строки по мере необходимости с использованием вышеуказанного подхода.
[ 1112]Разделите данные, чтобы убедиться, что все, что вам нужно для обработки, находится в одном разделе.
(опционально) sortWithinPartitions
, чтобы гарантировать, что данные в разделе упорядочены так, как вам нужно.
Использовать mapPartitions
.
Вы можете использовать Spark-SQL и иметь один набор данных для каждого клиента, счетов и товаров. Затем вы можете просто использовать соединения и агрегатные функции между этими наборами данных, чтобы получить желаемый результат.
Spark SQL имеет очень незначительную разницу в производительности между стилем SQL и программным способом.