Как разбить большой текст на более мелкие файлы на основе столбца id, используя pyspark [duplicate]

Сначала измените тип столбца:

df.cc = pd.Categorical(df.cc)

Теперь данные выглядят одинаково, но сохраняются категорически. Чтобы захватить коды категорий:

df['code'] = df.cc.cat.codes

Теперь у вас есть:

   cc  temp  code
0  US  37.0     2
1  CA  12.0     1
2  US  35.0     2
3  AU  20.0     0

Если вы не хотите изменять свой DataFrame, но просто получите коды:

df.cc.astype('category').cat.codes

Или используйте категориальный столбец как индекс:

df2 = pd.DataFrame(df.temp)
df2.index = pd.CategoricalIndex(df.cc)
57
задан samthebest 22 February 2018 в 11:56
поделиться

10 ответов

Я нуждался в том же самом в Java. Проводя перевод ответ Scala от Zhang Zhan пользователям Spark Java API:

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;


class RDDMultipleTextOutputFormat<A, B> extends MultipleTextOutputFormat<A, B> {

    @Override
    protected String generateFileNameForKeyValue(A key, B value, String name) {
        return key.toString();
    }
}

public class Main {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("Split Job")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        String[] strings = {"Abcd", "Azlksd", "whhd", "wasc", "aDxa"};
        sc.parallelize(Arrays.asList(strings))
                // The first character of the string is the key
                .mapToPair(s -> new Tuple2<>(s.substring(0,1).toLowerCase(), s))
                .saveAsHadoopFile("output/", String.class, String.class,
                        RDDMultipleTextOutputFormat.class);
        sc.stop();
    }
}
3
ответ дан Community 15 August 2018 в 17:37
поделиться

Я бы сделал это так, как это масштабируется

import org.apache.hadoop.io.NullWritable

import org.apache.spark._
import org.apache.spark.SparkContext._

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = 
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
    key.asInstanceOf[String]
}

object Split {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Split" + args(1))
    val sc = new SparkContext(conf)
    sc.textFile("input/path")
    .map(a => (k, v)) // Your own implementation
    .partitionBy(new HashPartitioner(num))
    .saveAsHadoopFile("output/path", classOf[String], classOf[String],
      classOf[RDDMultipleTextOutputFormat])
    spark.stop()
  }
}

Просто увидел аналогичный ответ выше, но на самом деле нам не нужны настроенные разделы. MultipleTextOutputFormat создаст файл для каждого ключа. Это нормально, что несколько записей с теми же ключами попадают в один раздел.

новый HashPartitioner (num), где num - номер раздела, который вы хотите. В случае, если у вас есть большое количество разных ключей, вы можете установить число в большое. В этом случае каждый раздел не будет открывать слишком много обработчиков файлов hdfs.

79
ответ дан CubeJockey 15 August 2018 в 17:37
поделиться
  • 1
    Не могли бы вы добавить все необходимые импортеры? Я не тестировал это, но принимаю ответ, поскольку он кажется тем, что я хочу. Какой смысл в partitionBy(new Hashpartitioner(num))? это не то же самое, что repartition(num) ?? – samthebest 27 September 2014 в 13:48
  • 2
    Это отличается. хэш-раздел гарантирует, что все записи с одним и тем же ключом перейдут в один раздел. Поскольку я помнил, что перераспределение не обладает этой функциональностью. – zhang zhan 28 September 2014 в 01:57
  • 3
    Большое спасибо за это очень хорошее решение. Я просто задавался вопросом: как мне изменить код, чтобы иметь выход для каждого файла, отсортированного по значениям v? – Yiannis Gkoufas 31 January 2015 в 21:31
  • 4
    Я искал записи нескольких паркетных выходов, и это решение по этим линиям выглядит многообещающим (только подклассификация MultipleOutputFormat напрямую, не используя MultipleTextOutputFormat). К сожалению, MutlipleOutputFormat существует только в старых API MR1 / mapred, тогда как AvroParquetOutputFormat и ParquetOutputFormat (поддерживающий паркет) написаны против нового API MR2 / mapreduce, поэтому кажется, что один и тот же путь не открыт ... – silasdavis 3 July 2015 в 15:45
  • 5
    Выглядит отлично! Есть ли эквивалент python? – NDavis 24 February 2016 в 01:03

хорошая новость для пользователя python в случае, если у вас много столбцов, и вы хотите сохранить все остальные столбцы, не разбитые на разделы в формате csv, которые не удастся, если вы используете «текстовый» метод как предложение Ника Чаммаса.

people_df.write.partitionBy("number").text("people") 

сообщение об ошибке «AnalysisException: источник данных u'Text поддерживает только один столбец, и у вас есть 2 столбца.;»

In spark 2.0.0 (моя тестовая среда - это искра hdp 2.0.0) пакет «com.databricks.spark.csv» теперь интегрирован и позволяет нам сохранять текстовый файл, разделенный только на один столбец, см. Пример blow:

people_rdd = sc.parallelize([(1,"2016-12-26", "alice"),
                             (1,"2016-12-25", "alice"),
                             (1,"2016-12-25", "tom"), 
                             (1, "2016-12-25","bob"), 
                             (2,"2016-12-26" ,"charlie")])
df = people_rdd.toDF(["number", "date","name"])

df.coalesce(1).write.partitionBy("number").mode("overwrite").format('com.databricks.spark.csv').options(header='false').save("people")

[root@namenode people]# tree
.
├── number=1
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
├── number=2
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
└── _SUCCESS

[root@namenode people]# cat number\=1/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,alice
2016-12-25,alice
2016-12-25,tom
2016-12-25,bob
[root@namenode people]# cat number\=2/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,charlie

В моей искровой версии 1.6 .1 enviroment, код не выдавал никакой ошибки, однако это только один сгенерированный файл. он не разбивается на две папки.

Надеюсь, это поможет.

1
ответ дан dalin qin 15 August 2018 в 17:37
поделиться

У меня есть аналогичная потребность и нашел способ. Но у него есть один недостаток (что не является проблемой для моего случая): вам нужно переразделить данные с одним разделом на выходной файл.

Чтобы разбивать таким образом, обычно требуется заранее знать, как много файлов, на которые будет выведено задание, и найти функцию, которая будет отображать каждую клавишу в каждый раздел.

Сначала давайте создадим наш класс на основе MultipleTextOutputFormat:

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class KeyBasedOutput[T >: Null, V <: AnyRef] extends MultipleTextOutputFormat[T , V] {
  override def generateFileNameForKeyValue(key: T, value: V, leaf: String) = {
    key.toString
  }
  override protected def generateActualKey(key: T, value: V) = {
    null
  }
}

С этим классом Spark будет получить ключ из раздела (первый / последний, я думаю), и назвать файл с помощью этого ключа, поэтому неплохо смешивать несколько ключей в одном разделе.

Для вашего примера вам потребуется пользовательский разделитель. Это выполнит задание:

import org.apache.spark.Partitioner

class IdentityIntPartitioner(maxKey: Int) extends Partitioner {
  def numPartitions = maxKey

  def getPartition(key: Any): Int = key match {
    case i: Int if i < maxKey => i
  }
}

Теперь давайте поместим все вместе:

val rdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"), (7, "d"), (7, "e")))

// You need to know the max number of partitions (files) beforehand
// In this case we want one partition per key and we have 3 keys,
// with the biggest key being 7, so 10 will be large enough
val partitioner = new IdentityIntPartitioner(10)

val prefix = "hdfs://.../prefix"

val partitionedRDD = rdd.partitionBy(partitioner)

partitionedRDD.saveAsHadoopFile(prefix,
    classOf[Integer], classOf[String], classOf[KeyBasedOutput[Integer, String]])

Это сгенерирует 3 файла под префиксом (с именами 1, 2 и 7), обрабатывая все за один проход.

Как вы можете видеть, вам нужно знать свои ключи, чтобы иметь возможность использовать это решение.

Для меня это было проще, потому что мне нужен один выходной файл для каждый ключевой хеш и количество файлов находилось под моим контролем, поэтому я мог использовать хэш-файл HashPartitioner, чтобы сделать трюк.

3
ответ дан douglaz 15 August 2018 в 17:37
поделиться
  • 1
    Это, безусловно, самое приятное решение до сих пор, и, похоже, почти трюк. Меня немного беспокоит, что это приведет к одному файлу на ключ, что вызовет проблемы для больших наборов данных. Если вы можете изменить свой ответ, чтобы он мог настроить количество выходных файлов на ключ, я был бы очень благодарен. – samthebest 20 June 2014 в 10:09
  • 2
    @samthebest, я могу это сделать, но это будет очень конкретное решение. Не могли бы вы обновить вопрос, чтобы сказать, что вам нужно несколько выходных файлов на ключ? Кстати, действительно ли вы используете целые ключи на своей работе? – douglaz 20 June 2014 в 22:39
  • 3
    Ну, любой ключ, который имеет смысл разделить - так что это разумно, когда мы назовем его toString. Я не уверен, что мне нужно обновить свой ответ, поскольку хорошо известная плохая практика - создавать большие файлы на HDFS, поскольку она ограничивает типы сжатия, которые вы можете использовать. Если у нас очень большие файлы, и нам нужно выбрать разделительный алгоритм сжатия, что может оказаться не лучшим для работы. Кроме того, Spark в настоящее время не может читать bzip2 (мое fav splittable compression) из-за ошибки в Hadoop. Тем не менее я обновляю свой ответ, чтобы быть явным. Опять же, большое спасибо. – samthebest 21 June 2014 в 12:13
  • 4
    Это решение помещает все данные через один узел, если все они имеют один и тот же ключ, правильно? Похоже на ущерб общей его масштабируемости. – Daniel Darabos 21 June 2014 в 14:14
  • 5
    @DanielDarabos точка правильная. Разумеется, можно настроить IdentityIntPartitioner так, чтобы для каждого возможного ключа было несколько разделов, скажем M, где один выбирается случайным образом. Нам нужно использовать хеш-функцию и по модулю результата с помощью numPartitions, хотя есть проблема: разные ключи могут оказаться в одном разделе, который, как я предполагаю, сломает saveAsHadoopFile? Это нетривиальная проблема. – samthebest 21 June 2014 в 17:40

У меня был аналогичный вариант использования. Я разрешил его на Java, написав два пользовательских класса, реализующих MultipleTextOutputFormat и RecordWriter.

Мой вход был JavaPairRDD<String, List<String>>, и я хотел сохранить его в файле с именем по его ключу со всеми строками, содержащимися в его значении.

Вот код для моего MultipleTextOutputFormat реализация

class RDDMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> {

    @Override
    protected String generateFileNameForKeyValue(K key, V value, String name) {
        return key.toString(); //The return will be used as file name
    }

    /** The following 4 functions are only for visibility purposes                 
    (they are used in the class MyRecordWriter) **/
    protected String generateLeafFileName(String name) {
        return super.generateLeafFileName(name);
    }

    protected V generateActualValue(K key, V value) {
        return super.generateActualValue(key, value);
    }

    protected String getInputFileBasedOutputFileName(JobConf job,     String name) {
        return super.getInputFileBasedOutputFileName(job, name);
        }

    protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException {
        return super.getBaseRecordWriter(fs, job, name, arg3);
    }

    /** Use my custom RecordWriter **/
    @Override
    RecordWriter<K, V> getRecordWriter(final FileSystem fs, final JobConf job, String name, final Progressable arg3) throws IOException {
    final String myName = this.generateLeafFileName(name);
        return new MyRecordWriter<K, V>(this, fs, job, arg3, myName);
    }
} 

Вот код для моей реализации RecordWriter.

class MyRecordWriter<K, V> implements RecordWriter<K, V> {

    private RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat;
    private final FileSystem fs;
    private final JobConf job;
    private final Progressable arg3;
    private String myName;

    TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap();

    MyRecordWriter(RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat, FileSystem fs, JobConf job, Progressable arg3, String myName) {
        this.rddMultipleTextOutputFormat = rddMultipleTextOutputFormat;
        this.fs = fs;
        this.job = job;
        this.arg3 = arg3;
        this.myName = myName;
    }

    @Override
    void write(K key, V value) throws IOException {
        String keyBasedPath = rddMultipleTextOutputFormat.generateFileNameForKeyValue(key, value, myName);
        String finalPath = rddMultipleTextOutputFormat.getInputFileBasedOutputFileName(job, keyBasedPath);
        Object actualValue = rddMultipleTextOutputFormat.generateActualValue(key, value);
        RecordWriter rw = this.recordWriters.get(finalPath);
        if(rw == null) {
            rw = rddMultipleTextOutputFormat.getBaseRecordWriter(fs, job, finalPath, arg3);
            this.recordWriters.put(finalPath, rw);
        }
        List<String> lines = (List<String>) actualValue;
        for (String line : lines) {
            rw.write(null, line);
        }
    }

    @Override
    void close(Reporter reporter) throws IOException {
        Iterator keys = this.recordWriters.keySet().iterator();

        while(keys.hasNext()) {
            RecordWriter rw = (RecordWriter)this.recordWriters.get(keys.next());
            rw.close(reporter);
        }

        this.recordWriters.clear();
    }
}

Большая часть кода точно такая же, как в FileOutputFormat. Единственное отличие состоит в том, что несколько строк

List<String> lines = (List<String>) actualValue;
for (String line : lines) {
    rw.write(null, line);
}

Эти строки позволили мне написать каждую строку моего ввода List<String> в файле. Первый аргумент функции write установлен на null, чтобы избежать нажатия клавиши на каждой строке.

Чтобы закончить, мне нужно только сделать этот вызов, чтобы записать мои файлы

javaPairRDD.saveAsHadoopFile(path, String.class, List.class, RDDMultipleTextOutputFormat.class);
0
ответ дан jeanr 15 August 2018 в 17:37
поделиться

У меня был аналогичный случай использования, когда я разбил входной файл на Hadoop HDFS на несколько файлов на основе ключа (1 файл на ключ). Вот мой scala-код для искры

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

val hadoopconf = new Configuration();
val fs = FileSystem.get(hadoopconf);

@serializable object processGroup {
    def apply(groupName:String, records:Iterable[String]): Unit = {
        val outFileStream = fs.create(new Path("/output_dir/"+groupName))
        for( line <- records ) {
                outFileStream.writeUTF(line+"\n")
            }
        outFileStream.close()
    }
}
val infile = sc.textFile("input_file")
val dateGrouped = infile.groupBy( _.split(",")(0))
dateGrouped.foreach( (x) => processGroup(x._1, x._2))

Я сгруппировал записи на основе ключа. Значения для каждого ключа записываются в отдельный файл.

3
ответ дан Lightness Races in Orbit 15 August 2018 в 17:37
поделиться
  • 1
    это выглядит как отличное решение, особенно потому, что оно связано с результатом iterables, я получаю org.apache.spark.SparkException: задача не сериализуема, вы считаете, что экземпляр fs вызывает эту проблему? – perrohunter 17 December 2015 в 20:34

saveAsText () и saveAsHadoop (...) реализованы на основе данных RDD, в частности по методу: PairRDD.saveAsHadoopDataset , который берет данные из PairRdd, где он выполняется. Я вижу два возможных варианта. Если ваши данные относительно невелики по размеру, вы можете сэкономить некоторое время реализации, объединив RDD, создав новый RDD из каждой коллекции и используя этот RDD для записи данных. Что-то вроде этого:

val byKey = dataRDD.groupByKey().collect()
val rddByKey = byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}
val rddByKey.foreach{ case (k,rdd) => rdd.saveAsText(prefix+k}

Обратите внимание, что он не будет работать для больших наборов данных b / c, материализация итератора в v.toSeq может не соответствовать памяти.

Другой вариант, который я вижу, и на самом деле тот, который я бы рекомендовал в этом случае, это: сворачивать свой собственный, путем прямого вызова hasoop / hdfs api.

Вот обсуждение, которое я начал при исследовании этого вопроса: Как создать RDD из другого RDD?

3
ответ дан maasg 15 August 2018 в 17:37
поделиться
  • 1
    Да, я бы хотел использовать hasoop / hdfs api - i.e использовать MultipleOutputFormat, но я хотел бы знать how . – samthebest 4 June 2014 в 11:30
  • 2
    Вы не можете сделать RDD внутри другого RDD (ваша вторая строка). См. Этот ppt slideshare.net/databricks/… – Adrian 25 March 2015 в 15:07
  • 3
    @ Адриан, ты прав. Мне там не хватало сбора. – maasg 25 March 2015 в 15:18

Если вы используете Spark 1.4+, это стало намного проще, благодаря API DataFrame . (DataFrames были введены в Spark 1.3, но partitionBy(), который нам нужен, был введен в 1.4 .)

Если вы начинаете с RDD, вы будете сначала нужно преобразовать его в DataFrame:

val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")

В Python этот же код:

people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])

Как только у вас есть DataFrame, запись на несколько выходов на основе конкретный ключ прост. Более того, и это красота API DataFrame - код почти одинаковый для Python, Scala, Java и R:

people_df.write.partitionBy("number").text("people")

И вы можете легко использовать другие форматы вывода, если вы хотите:

people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")

В каждом из этих примеров Spark создаст подкаталог для каждого из ключей, которые мы разделили DataFrame на:

people/
  _SUCCESS
  number=1/
    part-abcd
    part-efgh
  number=2/
    part-abcd
    part-efgh
92
ответ дан Nick Chammas 15 August 2018 в 17:37
поделиться
  • 1
    Можете ли вы добавить эквивалентный код Dataset s в Scala? и я соглашусь как лучший ответ. Да, некоторые люди не заботятся о типах и любят запускать все свое приложение каждые несколько минут, чтобы узнать, есть ли у них какие-либо ошибки, но некоторым из нас нравится ловить опечатки, как «нубмер», когда мы набрали его :) Серьезно, хотя, хорошо ответ. – samthebest 12 May 2016 в 17:28
  • 2
    @samthebest - Просто FYI, я откатил ваше редактирование, потому что у него было несколько проблем: это не соответствовало моему стилю письма; Я мало знаю о наборах данных, поэтому примечание о Dataset[SomeCaseClass] более подходит в качестве комментария; наконец, у Python нет метода makeRDD(). – Nick Chammas 14 May 2016 в 16:23
  • 3
    Обратите внимание, что если у вас есть Dataset[SomeCaseClass], вы можете просто вызвать .toDF(), а метки столбцов будут соответствовать полям SomeCaseClass es. Это дает немного больше безопасности типов. – samthebest 18 May 2016 в 14:10
  • 4
    Есть ли способ заставить этот метод писать только один файл / часть на раздел? – moustachio 21 May 2016 в 18:06
  • 5
    @moustachio - Хороший вопрос. Я думаю, вы можете заставить это, объединив DataFrame в один раздел перед partitionBy(). Например: people_df.coalesce(1).write.partitionBy("number").text("people") Это может ограничить параллельность Spark при записи данных, однако, в зависимости от ваших данных и конфигурации кластера. – Nick Chammas 23 May 2016 в 01:25

Это включает в себя кодек в соответствии с запросом, необходимые импорты и сутенером в соответствии с запросом.

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext

// TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless
implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) {
  def writeAsMultiple(prefix: String, codec: String,
                      keyName: String = "key")
                     (implicit sqlContext: SQLContext): Unit = {
    import sqlContext.implicits._

    rdd.toDF(keyName, "_2").write.partitionBy(keyName)
    .format("text").option("codec", codec).save(prefix)
  }
}

val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")

Одно тонкое отличие от OP состоит в том, что оно будет префикс <keyName>= к именам каталогов. Например,

myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")

дал бы:

prefix/key=1/part-00000
prefix/key=2/part-00000

, где prefix/my_number=1/part-00000 будет содержать строки a и b, а prefix/my_number=2/part-00000 будет содержать строку c ].

И

myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")

Дает:

prefix/foo=1/part-00000
prefix/foo=2/part-00000

Должно быть ясно, как отредактировать для parquet.

Наконец, ниже приведен пример для Dataset, что, возможно, лучше, чем использование Tuples.

implicit class PimpedDataset[T](dataset: Dataset[T]) {
  def writeAsMultiple(prefix: String, codec: String, field: String): Unit = {
    dataset.write.partitionBy(field)
    .format("text").option("codec", codec).save(prefix)
  }
}
15
ответ дан samthebest 15 August 2018 в 17:37
поделиться
  • 1
    Благодарю. Если мы сможем использовать HDFS вместо локальной файловой системы, так как мы по существу сами будем реализовывать часть тасования вручную? Кроме того, что происходит, когда несколько разделов содержат пары, имеющие один и тот же ключ? Обе задачи могут попытаться записать в один и тот же файл, и поэтому нам нужна какая-то синхронизированная система управления файлами, чтобы отслеживать создание part-XXXXX. Я боюсь, что это решение кажется очень грязным, учитывая, что я уверен, что решение с использованием MultipleOutputFormat существует. – samthebest 21 June 2014 в 17:02
  • 2
    Вы правы, что это своего рода осуществление перетасовки. Думаю, нет узкого места. Нет единого узла, который получает все записи с ключом. Нет проблем с тем же ключом, который поступает из нескольких разделов, и нет необходимости в синхронизации. Имя файла output/<key>/<partition>. Поэтому каждый раздел записывается в разные файлы. (В этом примере индекс раздела находится в suffix.) – Daniel Darabos 21 June 2014 в 18:33
  • 3
    MultipleOutputFormat отлично подходит для работы и будет работать по той же идее. Я просто никогда не использовал его. Я думаю, вы просто переписали бы мой MultiWriter, чтобы использовать MultipleOutputFormat вместо того, чтобы пересказывать собственное сопоставление файлов с ключами и gt. Но бит mapPartitionsWithIndex был бы в основном без изменений. – Daniel Darabos 21 June 2014 в 18:36
  • 4
    Извините, я неправильно понял ваше решение (tbh I skim read). Спасибо за разъяснения. Да, я думаю, что с некоторыми играми и заменой кода записи на HDFS это сработает (и не станет узким местом). Спасибо за Ваш ответ. – samthebest 22 June 2014 в 13:38
  • 5
    Я обеспокоен тем, что, когда мы используем mapPartitionsWithIndex и вручную записываем в HDFS, этот конкретный раздел не обязательно выводится в нужное место этого раздела. Поэтому дополнительная перетасовка не нужна и ее можно избежать. – samthebest 12 August 2014 в 11:42
  • 6
    не уверен, что у него нет +100 upvote, и на самом деле было нулевое значение. Очень полезно, спасибо! – Aliostad 20 December 2017 в 21:49
[113] Если у вас потенциально много значений для данного ключа, я думаю, что масштабируемое решение состоит в том, чтобы выписать один файл за ключ на раздел. К сожалению, в Spark нет встроенной поддержки, но мы можем что-то взбить. [113] [114] (Замените [112] на свой выбор работы с распределенной файловой системой.) [114] [115] Это делает один проход по RDD и не выполняет тасование. Он дает вам один каталог для каждого ключа с несколькими файлами внутри каждого. [115]
15
ответ дан samthebest 5 September 2018 в 16:42
поделиться
Другие вопросы по тегам:

Похожие вопросы: