Spark: как данные перераспределения groupBy [дубликаты]

== проверяет ссылки на объекты, .equals() проверяет строковые значения.

Иногда кажется, что == сравнивает значения, потому что Java делает некоторые закулисные вещи, чтобы убедиться, что одинаковые строки в строке являются одним и тем же объектом.

Для Например:

String fooString1 = new String("foo");
String fooString2 = new String("foo");

// Evaluates to false
fooString1 == fooString2;

// Evaluates to true
fooString1.equals(fooString2);

// Evaluates to true, because Java uses the same object
"bar" == "bar";

Но будьте осторожны с нулями!

== обрабатывает строки null в порядке, но вызов .equals() из пустой строки приведет к исключению:

String nullString1 = null;
String nullString2 = null;

// Evaluates to true
System.out.print(nullString1 == nullString2);

// Throws a NullPointerException
System.out.print(nullString1.equals(nullString2));

Итак, если вы знаете, что fooString1 может но не менее очевидно, что он проверяет значение null (из Java 7):

System.out.print(Objects.equals(fooString1, "bar"));
56
задан Community 4 December 2017 в 02:49
поделиться

3 ответа

Ну, давайте сделаем ваш набор данных немного интереснее:

val rdd = sc.parallelize(for {
    x <- 1 to 3
    y <- 1 to 2
} yield (x, None), 8)

У нас есть шесть элементов:

rdd.count
Long = 6

нет разделителя:

rdd.partitioner
Option[org.apache.spark.Partitioner] = None

и восемь разделов:

rdd.partitions.length
Int = 8

Теперь давайте определим небольшой помощник для подсчета количества элементов на раздел:

import org.apache.spark.rdd.RDD

def countByPartition(rdd: RDD[(Int, None.type)]) = {
    rdd.mapPartitions(iter => Iterator(iter.length))
}

Поскольку мы надеваем 't иметь разделитель, наш набор данных распределяется равномерно между разделами ( Схема разбиения по умолчанию в Spark ):

countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)

Теперь давайте переделаем наш набор данных:

import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))

Поскольку параметр, переданный в HashPartitioner, определяет число разделов, мы ожидаем один раздел:

rddOneP.partitions.length
Int = 1

Поскольку у нас есть только один раздел, в котором содержатся все элементы:

countByPartition(rddOneP).collect
Array[Int] = Array(6)

Обратите внимание, что порядок значений после тасования не является детерминированным .

Так же, если мы используем HashPartitioner(2)

val rddTwoP = rdd.partitionBy(new HashPartitioner(2))

, мы получим 2 раздела:

rddTwoP.partitions.length
Int = 2

Поскольку rdd разделяется по ключевым данным, не будет распределяться равномерно:

countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)

Поскольку у нас есть три ключа и только два разных значения hashCode mod numPartitions, здесь нет ничего неожиданного:

(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))

Просто для подтверждения выше:

rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))

Наконец, с HashPartitioner(7) мы получаем семь разделов, три непустых с двумя элементами:

val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)

Сводка и примечания

  • HashPartitioner принимает единственный аргумент, который определяет число разделов
  • значения присваиваются разделам с помощью hash ключей. Функция hash может отличаться в зависимости от языка (Scala RDD может использовать hashCode, DataSets использовать MurmurHash 3, PySpark, portable_hash ). В таком простом случае, когда ключ является маленьким целым числом, вы можете предположить, что hash является идентификатором (i = hash(i)). Scala API использует nonNegativeMod для определения раздела на основе вычисленного хэша,
  • , если распределение ключей не является однородным, вы можете оказаться в ситуациях, когда часть вашего кластера простаивает
  • ключи должны быть хешируемыми. Вы можете проверить мой ответ на Список в качестве ключа для PySpark's reduceByKey , чтобы прочитать о проблемах PySpark. Другая возможная проблема выделяется документацией HashPartitioner : массивы Java имеют hashCodes, которые основаны на идентификаторах массивов, а не на их содержимом, поэтому попытка разбиения RDD [Array [] или RDD [ (Массив [], _)] с использованием HashPartitioner приведет к неожиданному или некорректному результату.
  • В Python 3 вы должны убедиться, что хеширование согласовано. См. Что делает исключение: случайность хеша строки должна быть отключена через значение PYTHONHASHSEED в pyspark?
  • Разделитель хэшей не является ни инъективным, ни сюръективным.
  • Обратите внимание, что в настоящее время хэш-методы не работают в Scala в сочетании с определенными классами классов REPL ( Равномерность класса Case в Apache Spark ).
  • HashPartitioner (или любой другой Partitioner) перетасовывает данные. Если секция не используется повторно между несколькими операциями, она не уменьшает количество данных, которые нужно перетасовать.
107
ответ дан user6910411 17 August 2018 в 09:29
поделиться
  • 1
    Отлично напишите, спасибо. Тем не менее, я заметил на изображениях, что у вас есть (1, None) с hash(2) % P, где P - это раздел. Разве это не должно быть hash(1) % P? – javamonkey79 31 July 2018 в 17:05

RDD распределен, это означает, что он разделен на некоторое количество частей. Каждый из этих разделов потенциально находится на разных машинах. Разделитель хэшей с помощью arument numPartitions выбирает, на каком разделе помещать пар (key, value) следующим образом:

  1. Создает точно numPartitions разделы.
  2. Места (key, value) в раздел с номером Hash(key) % numPartitions
3
ответ дан abalcerek 17 August 2018 в 09:29
поделиться

Метод HashPartitioner.getPartition принимает в качестве аргумента ключ и возвращает индекс раздела, к которому принадлежит ключ. Разделитель должен знать, что такое действительные индексы, поэтому он возвращает числа в правом диапазоне. Количество разделов задается с помощью аргумента конструктора numPartitions.

Реализация возвращается примерно key.hashCode() % numPartitions. Подробнее см. Partitioner.scala .

2
ответ дан Daniel Darabos 17 August 2018 в 09:29
поделиться
Другие вопросы по тегам:

Похожие вопросы: