Ну, давайте сделаем ваш набор данных немного интереснее:
val rdd = sc.parallelize(for {
x <- 1 to 3
y <- 1 to 2
} yield (x, None), 8)
У нас есть шесть элементов:
rdd.count
Long = 6
нет разделителя:
rdd.partitioner
Option[org.apache.spark.Partitioner] = None
и восемь разделов:
rdd.partitions.length
Int = 8
Теперь давайте определим небольшой помощник для подсчета количества элементов на раздел:
import org.apache.spark.rdd.RDD
def countByPartition(rdd: RDD[(Int, None.type)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
Поскольку мы надеваем 't иметь разделитель, наш набор данных распределяется равномерно между разделами ( Схема разбиения по умолчанию в Spark ):
countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)
Теперь давайте переделаем наш набор данных:
import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))
Поскольку параметр, переданный в HashPartitioner
, определяет число разделов, мы ожидаем один раздел:
rddOneP.partitions.length
Int = 1
Поскольку у нас есть только один раздел, в котором содержатся все элементы:
countByPartition(rddOneP).collect
Array[Int] = Array(6)
Обратите внимание, что порядок значений после тасования не является детерминированным .
Так же, если мы используем HashPartitioner(2)
val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
, мы получим 2 раздела:
rddTwoP.partitions.length
Int = 2
Поскольку rdd
разделяется по ключевым данным, не будет распределяться равномерно:
countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)
Поскольку у нас есть три ключа и только два разных значения hashCode
mod numPartitions
, здесь нет ничего неожиданного:
(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
Просто для подтверждения выше:
rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))
Наконец, с HashPartitioner(7)
мы получаем семь разделов, три непустых с двумя элементами:
val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
HashPartitioner
принимает единственный аргумент, который определяет число разделов hash
ключей. Функция hash
может отличаться в зависимости от языка (Scala RDD может использовать hashCode
, DataSets
использовать MurmurHash 3, PySpark, portable_hash
). В таком простом случае, когда ключ является маленьким целым числом, вы можете предположить, что hash
является идентификатором (i = hash(i)
). Scala API использует nonNegativeMod
для определения раздела на основе вычисленного хэша, HashPartitioner
(или любой другой Partitioner
) перетасовывает данные. Если секция не используется повторно между несколькими операциями, она не уменьшает количество данных, которые нужно перетасовать. Для таких случаев, когда вам нужен простой объект JS вместо экземпляра полной модели, вы можете вызвать lean()
в цепочке запросов так:
Survey.findById(req.params.id).lean().exec(function(err, data){
var len = data.survey_questions.length;
var counter = 0;
_.each(data.survey_questions, function(sq){
Question.findById(sq.question, function(err, q){
sq.question = q;
if(++counter == len) {
res.send(data);
}
});
});
});
Этот способ data
уже является простым JS-объектом, с которым вы можете манипулировать, как вам нужно.
Я думаю, что документация Mongoose не делает этого достаточно ясным, но данные, возвращенные в запросе (хотя вы можете res.send () это) на самом деле объект Mongoose Document, а НЕ объект JSON. Но вы можете исправить это одной строкой ...
Survey.findById(req.params.id, function(err, data){
var len = data.survey_questions.length;
var counter = 0;
var data = data.toJSON(); //turns it into JSON YAY!
_.each(data.survey_questions, function(sq){
Question.findById(sq.question, function(err, q){
sq.question = q;
if(++counter == len) {
res.send(data);
}
});
});
});