Идиоматическая Карта Scala upsert

У меня есть аналогичная потребность и нашел способ. Но у него есть один недостаток (что не является проблемой для моего случая): вам нужно переразделить данные с одним разделом на выходной файл.

Чтобы разбивать таким образом, обычно требуется заранее знать, как много файлов, на которые будет выведено задание, и найти функцию, которая будет отображать каждую клавишу в каждый раздел.

Сначала давайте создадим наш класс на основе MultipleTextOutputFormat:

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class KeyBasedOutput[T >: Null, V <: AnyRef] extends MultipleTextOutputFormat[T , V] {
  override def generateFileNameForKeyValue(key: T, value: V, leaf: String) = {
    key.toString
  }
  override protected def generateActualKey(key: T, value: V) = {
    null
  }
}

С этим классом Spark будет получить ключ из раздела (первый / последний, я думаю), и назвать файл с помощью этого ключа, поэтому неплохо смешивать несколько ключей в одном разделе.

Для вашего примера вам потребуется пользовательский разделитель. Это выполнит задание:

import org.apache.spark.Partitioner

class IdentityIntPartitioner(maxKey: Int) extends Partitioner {
  def numPartitions = maxKey

  def getPartition(key: Any): Int = key match {
    case i: Int if i < maxKey => i
  }
}

Теперь давайте поместим все вместе:

val rdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"), (7, "d"), (7, "e")))

// You need to know the max number of partitions (files) beforehand
// In this case we want one partition per key and we have 3 keys,
// with the biggest key being 7, so 10 will be large enough
val partitioner = new IdentityIntPartitioner(10)

val prefix = "hdfs://.../prefix"

val partitionedRDD = rdd.partitionBy(partitioner)

partitionedRDD.saveAsHadoopFile(prefix,
    classOf[Integer], classOf[String], classOf[KeyBasedOutput[Integer, String]])

Это сгенерирует 3 файла под префиксом (с именами 1, 2 и 7), обрабатывая все за один проход.

Как вы можете видеть, вам нужно знать свои ключи, чтобы иметь возможность использовать это решение.

Для меня это было проще, потому что мне нужен один выходной файл для каждый ключевой хеш и количество файлов находилось под моим контролем, поэтому я мог использовать хэш-файл HashPartitioner, чтобы сделать трюк.

27
задан om-nom-nom 8 April 2012 в 21:31
поделиться