Вам нужно использовать flatMap
, а не map
, поскольку вы хотите сделать несколько выходных строк из каждой строки ввода.
from pyspark.sql import Row
def dualExplode(r):
rowDict = r.asDict()
bList = rowDict.pop('b')
cList = rowDict.pop('c')
for b,c in zip(bList, cList):
newDict = dict(rowDict)
newDict['b'] = b
newDict['c'] = c
yield Row(**newDict)
df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))
Начиная с MongoDB 3.4, мы можем использовать оператор $switch
для выполнения оператора с несколькими коммутаторами на этапе $project
.
Оператор конвейера $group
группирует документы по «диапазону» и возвращает «счет» для каждой группы с помощью аккумулятора аккумулятора $sum
.
db.collection.aggregate(
[
{ "$project": {
"range": {
"$switch": {
"branches": [
{
"case": { "$lte": [ "$timespent", 250 ] },
"then": "0-250"
},
{
"case": {
"$and": [
{ "$gt": [ "$timespent", 250 ] },
{ "$lte": [ "$timespent", 450 ] }
]
},
"then": "251-450"
},
{
"case": {
"$and": [
{ "$gt": [ "$timespent", 450 ] },
{ "$lte": [ "$timespent", 650 ] }
]
},
"then": "451-650"
}
],
"default": "650+"
}
}
}},
{ "$group": {
"_id": "$range",
"count": { "$sum": 1 }
}}
]
)
С помощью следующих документов в нашей коллекции
{ "_id" : ObjectId("514919fb23700b41723f94dc"), "name" : "A", "timespent" : 100 },
{ "_id" : ObjectId("514919fb23700b41723f94dd"), "name" : "B", "timespent" : 200 },
{ "_id" : ObjectId("514919fb23700b41723f94de"), "name" : "C", "timespent" : 300 },
{ "_id" : ObjectId("514919fb23700b41723f94df"), "name" : "D", "timespent" : 400 },
{ "_id" : ObjectId("514919fb23700b41723f94e0"), "name" : "E", "timespent" : 500 }
наш запрос дает:
{ "_id" : "451-650", "count" : 1 }
{ "_id" : "251-450", "count" : 2 }
{ "_id" : "0-250", "count" : 2 }
Мы можем захотеть добавьте этап $sort
к конвейеру, сортируя наш документ по диапазону, но это будет сортировать документы только в лексикографическом порядке из-за типа «диапазон».