Как изменить массив объектов, которые возвращаются из запроса мангуста [duplicate]

Ну, давайте сделаем ваш набор данных немного интереснее:

val rdd = sc.parallelize(for {
    x <- 1 to 3
    y <- 1 to 2
} yield (x, None), 8)

У нас есть шесть элементов:

rdd.count
Long = 6

нет разделителя:

rdd.partitioner
Option[org.apache.spark.Partitioner] = None

и восемь разделов:

rdd.partitions.length
Int = 8

Теперь давайте определим небольшой помощник для подсчета количества элементов на раздел:

import org.apache.spark.rdd.RDD

def countByPartition(rdd: RDD[(Int, None.type)]) = {
    rdd.mapPartitions(iter => Iterator(iter.length))
}

Поскольку мы надеваем 't иметь разделитель, наш набор данных распределяется равномерно между разделами ( Схема разбиения по умолчанию в Spark ):

countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)

Теперь давайте переделаем наш набор данных:

import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))

Поскольку параметр, переданный в HashPartitioner, определяет число разделов, мы ожидаем один раздел:

rddOneP.partitions.length
Int = 1

Поскольку у нас есть только один раздел, в котором содержатся все элементы:

countByPartition(rddOneP).collect
Array[Int] = Array(6)

Обратите внимание, что порядок значений после тасования не является детерминированным .

Так же, если мы используем HashPartitioner(2)

val rddTwoP = rdd.partitionBy(new HashPartitioner(2))

, мы получим 2 раздела:

rddTwoP.partitions.length
Int = 2

Поскольку rdd разделяется по ключевым данным, не будет распределяться равномерно:

countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)

Поскольку у нас есть три ключа и только два разных значения hashCode mod numPartitions, здесь нет ничего неожиданного:

(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))

Просто для подтверждения выше:

rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))

Наконец, с HashPartitioner(7) мы получаем семь разделов, три непустых с двумя элементами:

val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)

Сводка и примечания

  • HashPartitioner принимает единственный аргумент, который определяет число разделов
  • значения присваиваются разделам с помощью hash ключей. Функция hash может отличаться в зависимости от языка (Scala RDD может использовать hashCode, DataSets использовать MurmurHash 3, PySpark, portable_hash ). В таком простом случае, когда ключ является маленьким целым числом, вы можете предположить, что hash является идентификатором (i = hash(i)). Scala API использует nonNegativeMod для определения раздела на основе вычисленного хэша,
  • , если распределение ключей не является однородным, вы можете оказаться в ситуациях, когда часть вашего кластера простаивает
  • ключи должны быть хешируемыми. Вы можете проверить мой ответ на Список в качестве ключа для PySpark's reduceByKey , чтобы прочитать о проблемах PySpark. Другая возможная проблема выделяется документацией HashPartitioner : массивы Java имеют hashCodes, которые основаны на идентификаторах массивов, а не на их содержимом, поэтому попытка разбиения RDD [Array [] или RDD [ (Массив [], _)] с использованием HashPartitioner приведет к неожиданному или некорректному результату.
  • В Python 3 вы должны убедиться, что хеширование согласовано. См. Что делает исключение: случайность хеша строки должна быть отключена через значение PYTHONHASHSEED в pyspark?
  • Разделитель хэшей не является ни инъективным, ни сюръективным.
  • Обратите внимание, что в настоящее время хэш-методы не работают в Scala в сочетании с определенными классами классов REPL ( Равномерность класса Case в Apache Spark ).
  • HashPartitioner (или любой другой Partitioner) перетасовывает данные. Если секция не используется повторно между несколькими операциями, она не уменьшает количество данных, которые нужно перетасовать.

71
задан Toli Zaslavskiy 24 January 2013 в 17:03
поделиться

2 ответа

Для таких случаев, когда вам нужен простой объект JS вместо экземпляра полной модели, вы можете вызвать lean() в цепочке запросов так:

Survey.findById(req.params.id).lean().exec(function(err, data){
    var len = data.survey_questions.length;
    var counter = 0;

    _.each(data.survey_questions, function(sq){
        Question.findById(sq.question, function(err, q){
            sq.question = q;

            if(++counter == len) {
                res.send(data);
            }
        });
    });
});

Этот способ data уже является простым JS-объектом, с которым вы можете манипулировать, как вам нужно.

131
ответ дан JohnnyHK 28 August 2018 в 01:53
поделиться

Я думаю, что документация Mongoose не делает этого достаточно ясным, но данные, возвращенные в запросе (хотя вы можете res.send () это) на самом деле объект Mongoose Document, а НЕ объект JSON. Но вы можете исправить это одной строкой ...

Survey.findById(req.params.id, function(err, data){
    var len = data.survey_questions.length;
    var counter = 0;

    var data = data.toJSON(); //turns it into JSON YAY!

    _.each(data.survey_questions, function(sq){
        Question.findById(sq.question, function(err, q){
            sq.question = q;

            if(++counter == len) {
                res.send(data);
            }
        });
    });
});
38
ответ дан Toli Zaslavskiy 28 August 2018 в 01:53
поделиться
Другие вопросы по тегам:

Похожие вопросы: