==
проверяет ссылки на объекты, .equals()
проверяет строковые значения.
Иногда кажется, что ==
сравнивает значения, потому что Java делает некоторые закулисные вещи, чтобы убедиться, что одинаковые строки в строке являются одним и тем же объектом.
Для Например:
String fooString1 = new String("foo");
String fooString2 = new String("foo");
// Evaluates to false
fooString1 == fooString2;
// Evaluates to true
fooString1.equals(fooString2);
// Evaluates to true, because Java uses the same object
"bar" == "bar";
Но будьте осторожны с нулями!
==
обрабатывает строки null
в порядке, но вызов .equals()
из пустой строки приведет к исключению:
String nullString1 = null;
String nullString2 = null;
// Evaluates to true
System.out.print(nullString1 == nullString2);
// Throws a NullPointerException
System.out.print(nullString1.equals(nullString2));
Итак, если вы знаете, что fooString1
может но не менее очевидно, что он проверяет значение null (из Java 7):
System.out.print(Objects.equals(fooString1, "bar"));
Ну, давайте сделаем ваш набор данных немного интереснее:
val rdd = sc.parallelize(for {
x <- 1 to 3
y <- 1 to 2
} yield (x, None), 8)
У нас есть шесть элементов:
rdd.count
Long = 6
нет разделителя:
rdd.partitioner
Option[org.apache.spark.Partitioner] = None
и восемь разделов:
rdd.partitions.length
Int = 8
Теперь давайте определим небольшой помощник для подсчета количества элементов на раздел:
import org.apache.spark.rdd.RDD
def countByPartition(rdd: RDD[(Int, None.type)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
Поскольку мы надеваем 't иметь разделитель, наш набор данных распределяется равномерно между разделами ( Схема разбиения по умолчанию в Spark ):
countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)
Теперь давайте переделаем наш набор данных:
import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))
Поскольку параметр, переданный в HashPartitioner
, определяет число разделов, мы ожидаем один раздел:
rddOneP.partitions.length
Int = 1
Поскольку у нас есть только один раздел, в котором содержатся все элементы:
countByPartition(rddOneP).collect
Array[Int] = Array(6)
Обратите внимание, что порядок значений после тасования не является детерминированным .
Так же, если мы используем HashPartitioner(2)
val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
, мы получим 2 раздела:
rddTwoP.partitions.length
Int = 2
Поскольку rdd
разделяется по ключевым данным, не будет распределяться равномерно:
countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)
Поскольку у нас есть три ключа и только два разных значения hashCode
mod numPartitions
, здесь нет ничего неожиданного:
(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
Просто для подтверждения выше:
rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))
Наконец, с HashPartitioner(7)
мы получаем семь разделов, три непустых с двумя элементами:
val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
HashPartitioner
принимает единственный аргумент, который определяет число разделов hash
ключей. Функция hash
может отличаться в зависимости от языка (Scala RDD может использовать hashCode
, DataSets
использовать MurmurHash 3, PySpark, portable_hash
). В таком простом случае, когда ключ является маленьким целым числом, вы можете предположить, что hash
является идентификатором (i = hash(i)
). Scala API использует nonNegativeMod
для определения раздела на основе вычисленного хэша, HashPartitioner
(или любой другой Partitioner
) перетасовывает данные. Если секция не используется повторно между несколькими операциями, она не уменьшает количество данных, которые нужно перетасовать. RDD распределен, это означает, что он разделен на некоторое количество частей. Каждый из этих разделов потенциально находится на разных машинах. Разделитель хэшей с помощью arument numPartitions
выбирает, на каком разделе помещать пар (key, value)
следующим образом:
numPartitions
разделы. (key, value)
в раздел с номером Hash(key) % numPartitions
Метод HashPartitioner.getPartition
принимает в качестве аргумента ключ и возвращает индекс раздела, к которому принадлежит ключ. Разделитель должен знать, что такое действительные индексы, поэтому он возвращает числа в правом диапазоне. Количество разделов задается с помощью аргумента конструктора numPartitions
.
Реализация возвращается примерно key.hashCode() % numPartitions
. Подробнее см. Partitioner.scala .
(1, None)
сhash(2) % P
, где P - это раздел. Разве это не должно бытьhash(1) % P
? – javamonkey79 31 July 2018 в 17:05