Прочитайте файл паркета на несколько разделов [duplicate]

NullPointerException s - исключения, возникающие при попытке использовать ссылку, которая указывает на отсутствие местоположения в памяти (null), как если бы она ссылалась на объект. Вызов метода по нулевой ссылке или попытка получить доступ к полю нулевой ссылки вызовет функцию NullPointerException. Они наиболее распространены, но другие способы перечислены на странице NullPointerException javadoc.

Вероятно, самый быстрый пример кода, который я мог бы придумать для иллюстрации NullPointerException, be:

public class Example {

    public static void main(String[] args) {
        Object obj = null;
        obj.hashCode();
    }

}

В первой строке внутри main я явно устанавливаю ссылку Object obj равной null. Это означает, что у меня есть ссылка, но она не указывает на какой-либо объект. После этого я пытаюсь обработать ссылку так, как если бы она указывала на объект, вызывая метод на нем. Это приводит к NullPointerException, потому что нет кода для выполнения в местоположении, на которое указывает ссылка.

(Это техничность, но я думаю, что она упоминает: ссылка, которая указывает на null, равна 't то же, что и указатель C, указывающий на недопустимую ячейку памяти. Нулевой указатель буквально не указывает на в любом месте , который отличается от указаний на местоположение, которое оказывается недопустимым.)

12
задан samthebest 3 December 2014 в 18:02
поделиться

5 ответов

Вы должны написать свои паркетные файлы с меньшим размером блока. Значение по умолчанию - 128 Мб на блок, но его можно настроить, установив конфигурацию parquet.block.size в записи.

Источник ParquetOuputFormat здесь , если вы хотите вникнуть в детали.

Размер блока - это минимальный объем данных, которые вы можете прочитать из файл паркета, который является логически читаемым (поскольку паркет столбчатый, вы не можете просто разделить по строке или что-то тривиальное, как это), поэтому вы не можете иметь больше потоков чтения, чем входные блоки.

6
ответ дан samthebest 17 August 2018 в 23:45
поделиться
  • 1
    Спасибо, этот ответ кажется наиболее точным до сих пор. Пожалуйста, не могли бы вы A) подтвердить ответ @Prokod невозможно (он говорит, что можно разделить блок паркета) и B) написать строку примера кода искры с помощью parquet.block.sizeDataset s), который пишет паркет для копий людей и вставить нужды :) – samthebest 10 August 2016 в 07:41
  • 2
    A) @Prokod вводит в заблуждение размер блока паркета и размер блока hdfs. См. Например, groups.google.com/forum / #! Topic / parquet-dev / t1iu0G-wLpE «Существует еще одно ограничение, которое является наименьшим разделом, который вы можете получить, имеет размер 1 группы строк . & Quot; B) Через 2 года вы можете использовать любую паркетную версию, но она должна быть близка к sparkContext.hadoopConfiguration.set ("parquet.block.size", newSize), а затем использовать свой контекст для записи вашего набора данных, как обычно. – C4stor 10 August 2016 в 09:59

Новый способ сделать это (Spark 2.x) устанавливает spark.sql.files.maxPartitionBytes

Источник: https://issues.apache.org/jira/browse/SPARK-17998 (официальная документация еще не верна, пропускает .sql)

По моему опыту настройки Hadoop больше не действуют.

0
ответ дан F Pereira 17 August 2018 в 23:45
поделиться
  • 1
    Согласно моему комментарию об этой JIRA, попытка этого не сработала для меня :( – samthebest 9 January 2018 в 17:15

Возможно, ваш файл паркета занимает только один блок HDFS. Создайте большой файл паркета с большим количеством блоков HDFS и загрузите его.

val k = sc.parquetFile("the-big-table.parquet")
k.partitions.length

Вы увидите то же количество разделов, что и блоки HDFS. Это работало отлично для меня (spark-1.1.0)

1
ответ дан gonbe 17 August 2018 в 23:45
поделиться
  • 1
    Предположим, что требуется гораздо больший уровень параллелизма, чем то, что будет подразумеваться размером блока HDFS, есть ли способ обеспечить, чтобы при написании паркета он мог поддерживать разделение более мелким, чем размер блока HDFS? – samthebest 7 December 2014 в 11:36
  • 2
    Нет. Я думал, что вы не хотите переместить блоки HDFS из исходного местоположения. Если вы в порядке с перетасовкой, переделайте блоки. – gonbe 7 December 2014 в 15:48
  • 3
    Да, я хочу избежать перетасовки, в идеале я хочу 2 - 4 раздела на процессор без тасования. Предположим, что у меня есть 1 ГБ паркета, а 1000 процессоров, очевидно, с обычными размерами блоков, это будет означать, что большинство моих процессоров не будут использоваться. Мой вопрос при написании паркета есть ли способ контролировать расщепление? Я думаю, что большинство людей не заботятся и просто все равно перетасовывают, так как перетасовка данных вокруг размера блока HDFS должна быть довольно быстрой. (О, BTW Я буду принимать ваш ответ, как только проверю его, у меня нет доступа к моему кластеру в течение нескольких дней) – samthebest 7 December 2014 в 16:56

Вы упомянули, что хотите контролировать распределение во время записи в паркет. При создании паркета из паркета RDD сохраняются разделы RDD. Итак, если вы создадите RDD и укажите 100 разделов и из фреймворка с паркетным форматом, тогда он будет писать 100 отдельных паркетных файлов в fs. Для чтения вы можете указать параметр spark.sql.shuffle.partitions.

1
ответ дан Kirby 17 August 2018 в 23:45
поделиться

Чтобы достичь этого, вы должны использовать SparkContext для установки свойства конфигурации Hadoop (sc.hadoopConfiguration) mapreduce.input.fileinputformat.split.maxsize.

Установив это свойство на меньшее значение, чем hdfs.blockSize, вы получите столько же разделов, сколько количество разделов.

Например: Когда hdfs.blockSize = 134217728 (128 МБ) и один файл считывается, который содержит ровно один полный блок, а mapreduce.input.fileinputformat.split.maxsize = 67108864 (64 МБ)

Тогда будут два раздела, в которые будут считываться данные расщепления.

0
ответ дан Prokod 17 August 2018 в 23:45
поделиться
  • 1
    Это звучит многообещающе, но аргументом @ C4stor не будет работать. – samthebest 10 August 2016 в 07:39
Другие вопросы по тегам:

Похожие вопросы: