поэтому у меня есть следующий фрагмент кода: def processing (): RDD [String] = {@volatile var count = 0 val results = rdd.mapPartitions [String] ((iter) = & gt; {try {iter.grouped (c ....
В моем наборе данных есть скорость, записанная для нескольких автомобилей, как функция времени. У каждого автомобиля есть определенный идентификатор. Данные выглядят так: + ----------------- + ----------- + ------ + | отметка времени | ...
У меня проблема аналогичная этому, но я хочу проверить дубликаты в нескольких столбцах и сохранить запись с самой старой отметкой времени. Я попытался создать порядок столбцов timestamp с этим, а затем отбросить ...
У меня есть два фрейма данных в Spark, у которых есть много столбцов плюс столбец timestamp. Я хочу исключить дубликаты для всех столбцов, кроме столбца timestamp. Поэтому мой окончательный кадр данных должен быть ...
Имея кадр данных, как показано ниже (на самом деле он содержит гораздо больше строк): Time | EventType | OrderId | Размер | Цена | Направление | message_id | Вторая | | 34200.105 | 5 | 0 | 100 | 1103400 | 1 | ...
Используя pyspark, мне нужно найти что-то похожее на команду SQL KEEP (DENSE RANK LAST ORDER). Используя groupBy и agg, я хочу извлечь другие значения столбцов, которые связаны с минимумом групп. ...
Вот как выглядят мои данные csv: TagNumber, DatePaid, TotalPaid ABCD, 11/5/2017, $ 101 EFGH, 12/5/2017, $ 201 ABCD, 11/7/2017, $ 501 ABCD, 12/5/2017, $ 201 Я создаю dataframe, который будет группировать данные с помощью ...
У меня есть простой фреймворк, над которым я хочу работать: + --- + ---- + | идентификатор | имя | + --- + ---- + | 1 | | | 2 | б | | 3 | с | | 4 | d | | 5 | е | + --- + ---- + Я пытаюсь добавить еще один столбец на основе «id» ...
Я пытаюсь запустить приведенный ниже код в искровом режиме и получить исключение: исключение в потоке «main» org.apache.spark.SparkException: Иск прерывается из-за срыва этапа: Задача 0 на этапе 42.0 не удалась 1 раз, ...
Я запускаю скрипт python в запросе на публикацию в листинге. В моей просьбе я добавил список «pyFiles», и все работает отлично. Но я также добавил свои json-файлы с помощью «файлов». И я вижу ...
Представьте, что у нас есть некоторый val hiveTableDataFrame: DataFrame И я хочу передать строки этого фрейма данных в мою программу драйверов. Дело в том, что hiveTableDataFrame слишком велик, и я не могу использовать ...
написав простую операцию объединения на искру и пытаясь получить значения карты. почему я получаю синтаксическую проблему? что такое правильный синтаксис? Spark 2.x joinrdd = webrdd.join (titlerdd) \ .map (...
Я пытаюсь получить количество строк в таблице: bank_accounts. Условиями являются «source_system_name = SAP» & amp; period_year = "2017" Для этого я придумал следующий код: object PartitionRetrieval {...
Я уже тестировал Apache Spark и Hadoop MapReduce с помощью TestDFSIO. Целью является тестирование производительности узкого места ввода-вывода в сети. Это приводит к тому, что Apache Spark работает быстрее, чем Hadoop. ...
У моего приложения Scala есть dataframe, созданный при вызове таблицы hive. После того, как данные потянуты, я создаю набор со списком идентификаторов: val c_Set = inputDF.select ("c_id"). Collect (). Map (_ (0)). ToSet Затем я ...
Я занимаюсь обработкой Spark на нескольких файлах. Обработка проста: чтение csv, выбор / фильтрация, а затем наложение на паркет. Я заметил, что: чтение всех файлов в одном фрейме данных ...
Я думал о том, как это сделать, поскольку я новичок в искру и играю с ним какое-то время. Требование выглядит так просто, как это, у меня есть несколько файлов с разделителями-запятыми (100 + МБ файлов) ...
У меня есть файл, который содержит много hdfs-путей. каждый путь HDFS содержит некоторые файлы JSON. Я хочу обработать все эти json-файлы, которые обновляются за последние 24 часа. На данный момент я читаю файл, который ...
У меня есть файл, в котором есть некоторые отсутствующие данные. Поэтому я пытаюсь определить количество отсутствующих записей, используя переменную счетчика. Файл: data-error.csv ...
Я создал искровое приложение. Я нахожусь на этапе тестирования, поэтому мне нужно создать модульные тесты для всех моих функций python. Я создал локальную искровую сессию в настроенном классе def setUpClass (cls): ...
Для RDD для генерации пары, например: rdd1 = sc.parallelize (['d', '112', 'b', 'c', 'i', 'a', 'e']) output: [(' d ',' 112 '), (' d ',' b '), (' d ',' c '), (' d ',' i '), ..., (' a ',' e ' )] Благодаря
Вероятно, основной вопрос, я довольно новичок в Spark / Scala. Поэтому у меня есть переменная типа Map [String, RDD [Int]]. Я не могу перебирать эту переменную с помощью и делать что-либо с RDD внутри ...
как использовать foreach (println) для кода ниже для разделения в разных строках: sc.textFile ("/ user / edureka_366833 / spark / fmapvalue") .map (_. split ('\ t')) .map (a = & gt; ; (a (0), a (1))) .flatMapValues (a = & gt; (a ....
Я развертываю приложение искры через автономный кластер. У меня есть один мастер и 2 раба. Я тестирую свой кластер. У меня есть приложение .jar, скопированное везде в том же месте. Я заметил ...
Мой код: import org.apache.spark.SparkContext Он может работать в интерактивном режиме, но когда я использую scalac для его компиляции, я получил следующее сообщение об ошибке: объект apache не является членом пакета ...
У меня есть искровой скрипт, который читает каждое сообщение, кодирует его и сохраняет в виде текстового файла sparkContext.sequenceFile (inputDirectory, classOf [IntWritable], classOf [DataOutputValue]). Map {case (_, message) = & gt; ...