Сначала измените тип столбца:
df.cc = pd.Categorical(df.cc)
Теперь данные выглядят одинаково, но сохраняются категорически. Чтобы захватить коды категорий:
df['code'] = df.cc.cat.codes
Теперь у вас есть:
cc temp code
0 US 37.0 2
1 CA 12.0 1
2 US 35.0 2
3 AU 20.0 0
Если вы не хотите изменять свой DataFrame, но просто получите коды:
df.cc.astype('category').cat.codes
Или используйте категориальный столбец как индекс:
df2 = pd.DataFrame(df.temp)
df2.index = pd.CategoricalIndex(df.cc)
Я нуждался в том же самом в Java. Проводя перевод ответ Scala от Zhang Zhan пользователям Spark Java API:
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
class RDDMultipleTextOutputFormat<A, B> extends MultipleTextOutputFormat<A, B> {
@Override
protected String generateFileNameForKeyValue(A key, B value, String name) {
return key.toString();
}
}
public class Main {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Split Job")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
String[] strings = {"Abcd", "Azlksd", "whhd", "wasc", "aDxa"};
sc.parallelize(Arrays.asList(strings))
// The first character of the string is the key
.mapToPair(s -> new Tuple2<>(s.substring(0,1).toLowerCase(), s))
.saveAsHadoopFile("output/", String.class, String.class,
RDDMultipleTextOutputFormat.class);
sc.stop();
}
}
Я бы сделал это так, как это масштабируется
import org.apache.hadoop.io.NullWritable
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any =
NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
key.asInstanceOf[String]
}
object Split {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Split" + args(1))
val sc = new SparkContext(conf)
sc.textFile("input/path")
.map(a => (k, v)) // Your own implementation
.partitionBy(new HashPartitioner(num))
.saveAsHadoopFile("output/path", classOf[String], classOf[String],
classOf[RDDMultipleTextOutputFormat])
spark.stop()
}
}
Просто увидел аналогичный ответ выше, но на самом деле нам не нужны настроенные разделы. MultipleTextOutputFormat создаст файл для каждого ключа. Это нормально, что несколько записей с теми же ключами попадают в один раздел.
новый HashPartitioner (num), где num - номер раздела, который вы хотите. В случае, если у вас есть большое количество разных ключей, вы можете установить число в большое. В этом случае каждый раздел не будет открывать слишком много обработчиков файлов hdfs.
partitionBy(new Hashpartitioner(num))
? это не то же самое, что repartition(num)
??
– samthebest
27 September 2014 в 13:48
хорошая новость для пользователя python в случае, если у вас много столбцов, и вы хотите сохранить все остальные столбцы, не разбитые на разделы в формате csv, которые не удастся, если вы используете «текстовый» метод как предложение Ника Чаммаса.
people_df.write.partitionBy("number").text("people")
сообщение об ошибке «AnalysisException: источник данных u'Text поддерживает только один столбец, и у вас есть 2 столбца.;»
In spark 2.0.0 (моя тестовая среда - это искра hdp 2.0.0) пакет «com.databricks.spark.csv» теперь интегрирован и позволяет нам сохранять текстовый файл, разделенный только на один столбец, см. Пример blow:
people_rdd = sc.parallelize([(1,"2016-12-26", "alice"),
(1,"2016-12-25", "alice"),
(1,"2016-12-25", "tom"),
(1, "2016-12-25","bob"),
(2,"2016-12-26" ,"charlie")])
df = people_rdd.toDF(["number", "date","name"])
df.coalesce(1).write.partitionBy("number").mode("overwrite").format('com.databricks.spark.csv').options(header='false').save("people")
[root@namenode people]# tree
.
├── number=1
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
├── number=2
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
└── _SUCCESS
[root@namenode people]# cat number\=1/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,alice
2016-12-25,alice
2016-12-25,tom
2016-12-25,bob
[root@namenode people]# cat number\=2/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,charlie
В моей искровой версии 1.6 .1 enviroment, код не выдавал никакой ошибки, однако это только один сгенерированный файл. он не разбивается на две папки.
Надеюсь, это поможет.
У меня есть аналогичная потребность и нашел способ. Но у него есть один недостаток (что не является проблемой для моего случая): вам нужно переразделить данные с одним разделом на выходной файл.
Чтобы разбивать таким образом, обычно требуется заранее знать, как много файлов, на которые будет выведено задание, и найти функцию, которая будет отображать каждую клавишу в каждый раздел.
Сначала давайте создадим наш класс на основе MultipleTextOutputFormat:
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
class KeyBasedOutput[T >: Null, V <: AnyRef] extends MultipleTextOutputFormat[T , V] {
override def generateFileNameForKeyValue(key: T, value: V, leaf: String) = {
key.toString
}
override protected def generateActualKey(key: T, value: V) = {
null
}
}
С этим классом Spark будет получить ключ из раздела (первый / последний, я думаю), и назвать файл с помощью этого ключа, поэтому неплохо смешивать несколько ключей в одном разделе.
Для вашего примера вам потребуется пользовательский разделитель. Это выполнит задание:
import org.apache.spark.Partitioner
class IdentityIntPartitioner(maxKey: Int) extends Partitioner {
def numPartitions = maxKey
def getPartition(key: Any): Int = key match {
case i: Int if i < maxKey => i
}
}
Теперь давайте поместим все вместе:
val rdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"), (7, "d"), (7, "e")))
// You need to know the max number of partitions (files) beforehand
// In this case we want one partition per key and we have 3 keys,
// with the biggest key being 7, so 10 will be large enough
val partitioner = new IdentityIntPartitioner(10)
val prefix = "hdfs://.../prefix"
val partitionedRDD = rdd.partitionBy(partitioner)
partitionedRDD.saveAsHadoopFile(prefix,
classOf[Integer], classOf[String], classOf[KeyBasedOutput[Integer, String]])
Это сгенерирует 3 файла под префиксом (с именами 1, 2 и 7), обрабатывая все за один проход.
Как вы можете видеть, вам нужно знать свои ключи, чтобы иметь возможность использовать это решение.
Для меня это было проще, потому что мне нужен один выходной файл для каждый ключевой хеш и количество файлов находилось под моим контролем, поэтому я мог использовать хэш-файл HashPartitioner, чтобы сделать трюк.
toString
. Я не уверен, что мне нужно обновить свой ответ, поскольку хорошо известная плохая практика - создавать большие файлы на HDFS, поскольку она ограничивает типы сжатия, которые вы можете использовать. Если у нас очень большие файлы, и нам нужно выбрать разделительный алгоритм сжатия, что может оказаться не лучшим для работы. Кроме того, Spark в настоящее время не может читать bzip2 (мое fav splittable compression) из-за ошибки в Hadoop. Тем не менее я обновляю свой ответ, чтобы быть явным. Опять же, большое спасибо.
– samthebest
21 June 2014 в 12:13
IdentityIntPartitioner
так, чтобы для каждого возможного ключа было несколько разделов, скажем M, где один выбирается случайным образом. Нам нужно использовать хеш-функцию и по модулю результата с помощью numPartitions
, хотя есть проблема: разные ключи могут оказаться в одном разделе, который, как я предполагаю, сломает saveAsHadoopFile
? Это нетривиальная проблема.
– samthebest
21 June 2014 в 17:40
У меня был аналогичный вариант использования. Я разрешил его на Java, написав два пользовательских класса, реализующих MultipleTextOutputFormat
и RecordWriter
.
Мой вход был JavaPairRDD<String, List<String>>
, и я хотел сохранить его в файле с именем по его ключу со всеми строками, содержащимися в его значении.
Вот код для моего MultipleTextOutputFormat
реализация
class RDDMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> {
@Override
protected String generateFileNameForKeyValue(K key, V value, String name) {
return key.toString(); //The return will be used as file name
}
/** The following 4 functions are only for visibility purposes
(they are used in the class MyRecordWriter) **/
protected String generateLeafFileName(String name) {
return super.generateLeafFileName(name);
}
protected V generateActualValue(K key, V value) {
return super.generateActualValue(key, value);
}
protected String getInputFileBasedOutputFileName(JobConf job, String name) {
return super.getInputFileBasedOutputFileName(job, name);
}
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException {
return super.getBaseRecordWriter(fs, job, name, arg3);
}
/** Use my custom RecordWriter **/
@Override
RecordWriter<K, V> getRecordWriter(final FileSystem fs, final JobConf job, String name, final Progressable arg3) throws IOException {
final String myName = this.generateLeafFileName(name);
return new MyRecordWriter<K, V>(this, fs, job, arg3, myName);
}
}
Вот код для моей реализации RecordWriter
.
class MyRecordWriter<K, V> implements RecordWriter<K, V> {
private RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat;
private final FileSystem fs;
private final JobConf job;
private final Progressable arg3;
private String myName;
TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap();
MyRecordWriter(RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat, FileSystem fs, JobConf job, Progressable arg3, String myName) {
this.rddMultipleTextOutputFormat = rddMultipleTextOutputFormat;
this.fs = fs;
this.job = job;
this.arg3 = arg3;
this.myName = myName;
}
@Override
void write(K key, V value) throws IOException {
String keyBasedPath = rddMultipleTextOutputFormat.generateFileNameForKeyValue(key, value, myName);
String finalPath = rddMultipleTextOutputFormat.getInputFileBasedOutputFileName(job, keyBasedPath);
Object actualValue = rddMultipleTextOutputFormat.generateActualValue(key, value);
RecordWriter rw = this.recordWriters.get(finalPath);
if(rw == null) {
rw = rddMultipleTextOutputFormat.getBaseRecordWriter(fs, job, finalPath, arg3);
this.recordWriters.put(finalPath, rw);
}
List<String> lines = (List<String>) actualValue;
for (String line : lines) {
rw.write(null, line);
}
}
@Override
void close(Reporter reporter) throws IOException {
Iterator keys = this.recordWriters.keySet().iterator();
while(keys.hasNext()) {
RecordWriter rw = (RecordWriter)this.recordWriters.get(keys.next());
rw.close(reporter);
}
this.recordWriters.clear();
}
}
Большая часть кода точно такая же, как в FileOutputFormat
. Единственное отличие состоит в том, что несколько строк
List<String> lines = (List<String>) actualValue;
for (String line : lines) {
rw.write(null, line);
}
Эти строки позволили мне написать каждую строку моего ввода List<String>
в файле. Первый аргумент функции write
установлен на null
, чтобы избежать нажатия клавиши на каждой строке.
Чтобы закончить, мне нужно только сделать этот вызов, чтобы записать мои файлы
javaPairRDD.saveAsHadoopFile(path, String.class, List.class, RDDMultipleTextOutputFormat.class);
У меня был аналогичный случай использования, когда я разбил входной файл на Hadoop HDFS на несколько файлов на основе ключа (1 файл на ключ). Вот мой scala-код для искры
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
val hadoopconf = new Configuration();
val fs = FileSystem.get(hadoopconf);
@serializable object processGroup {
def apply(groupName:String, records:Iterable[String]): Unit = {
val outFileStream = fs.create(new Path("/output_dir/"+groupName))
for( line <- records ) {
outFileStream.writeUTF(line+"\n")
}
outFileStream.close()
}
}
val infile = sc.textFile("input_file")
val dateGrouped = infile.groupBy( _.split(",")(0))
dateGrouped.foreach( (x) => processGroup(x._1, x._2))
Я сгруппировал записи на основе ключа. Значения для каждого ключа записываются в отдельный файл.
saveAsText () и saveAsHadoop (...) реализованы на основе данных RDD, в частности по методу: PairRDD.saveAsHadoopDataset , который берет данные из PairRdd, где он выполняется. Я вижу два возможных варианта. Если ваши данные относительно невелики по размеру, вы можете сэкономить некоторое время реализации, объединив RDD, создав новый RDD из каждой коллекции и используя этот RDD для записи данных. Что-то вроде этого:
val byKey = dataRDD.groupByKey().collect()
val rddByKey = byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}
val rddByKey.foreach{ case (k,rdd) => rdd.saveAsText(prefix+k}
Обратите внимание, что он не будет работать для больших наборов данных b / c, материализация итератора в v.toSeq
может не соответствовать памяти.
Другой вариант, который я вижу, и на самом деле тот, который я бы рекомендовал в этом случае, это: сворачивать свой собственный, путем прямого вызова hasoop / hdfs api.
Вот обсуждение, которое я начал при исследовании этого вопроса: Как создать RDD из другого RDD?
MultipleOutputFormat
, но я хотел бы знать how i>.
– samthebest
4 June 2014 в 11:30
Если вы используете Spark 1.4+, это стало намного проще, благодаря API DataFrame . (DataFrames были введены в Spark 1.3, но partitionBy()
, который нам нужен, был введен в 1.4 .)
Если вы начинаете с RDD, вы будете сначала нужно преобразовать его в DataFrame:
val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")
В Python этот же код:
people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])
Как только у вас есть DataFrame, запись на несколько выходов на основе конкретный ключ прост. Более того, и это красота API DataFrame - код почти одинаковый для Python, Scala, Java и R:
people_df.write.partitionBy("number").text("people")
И вы можете легко использовать другие форматы вывода, если вы хотите:
people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")
В каждом из этих примеров Spark создаст подкаталог для каждого из ключей, которые мы разделили DataFrame на:
people/
_SUCCESS
number=1/
part-abcd
part-efgh
number=2/
part-abcd
part-efgh
Dataset
s в Scala? и я соглашусь как лучший ответ. Да, некоторые люди не заботятся о типах и любят запускать все свое приложение каждые несколько минут, чтобы узнать, есть ли у них какие-либо ошибки, но некоторым из нас нравится ловить опечатки, как «нубмер», когда мы набрали его :) Серьезно, хотя, хорошо ответ.
– samthebest
12 May 2016 в 17:28
Dataset[SomeCaseClass]
более подходит в качестве комментария; наконец, у Python нет метода makeRDD()
.
– Nick Chammas
14 May 2016 в 16:23
Dataset[SomeCaseClass]
, вы можете просто вызвать .toDF()
, а метки столбцов будут соответствовать полям SomeCaseClass
es. Это дает немного больше безопасности типов.
– samthebest
18 May 2016 в 14:10
partitionBy()
. Например: people_df.coalesce(1).write.partitionBy("number").text("people")
Это может ограничить параллельность Spark при записи данных, однако, в зависимости от ваших данных и конфигурации кластера.
– Nick Chammas
23 May 2016 в 01:25
Это включает в себя кодек в соответствии с запросом, необходимые импорты и сутенером в соответствии с запросом.
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
// TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless
implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) {
def writeAsMultiple(prefix: String, codec: String,
keyName: String = "key")
(implicit sqlContext: SQLContext): Unit = {
import sqlContext.implicits._
rdd.toDF(keyName, "_2").write.partitionBy(keyName)
.format("text").option("codec", codec).save(prefix)
}
}
val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
Одно тонкое отличие от OP состоит в том, что оно будет префикс <keyName>=
к именам каталогов. Например,
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
дал бы:
prefix/key=1/part-00000
prefix/key=2/part-00000
, где prefix/my_number=1/part-00000
будет содержать строки a
и b
, а prefix/my_number=2/part-00000
будет содержать строку c
].
И
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")
Дает:
prefix/foo=1/part-00000
prefix/foo=2/part-00000
Должно быть ясно, как отредактировать для parquet
.
Наконец, ниже приведен пример для Dataset
, что, возможно, лучше, чем использование Tuples.
implicit class PimpedDataset[T](dataset: Dataset[T]) {
def writeAsMultiple(prefix: String, codec: String, field: String): Unit = {
dataset.write.partitionBy(field)
.format("text").option("codec", codec).save(prefix)
}
}
MultipleOutputFormat
существует.
– samthebest
21 June 2014 в 17:02
output/<key>/<partition>
. Поэтому каждый раздел записывается в разные файлы. (В этом примере индекс раздела находится в suffix
.)
– Daniel Darabos
21 June 2014 в 18:33
MultipleOutputFormat
отлично подходит для работы и будет работать по той же идее. Я просто никогда не использовал его. Я думаю, вы просто переписали бы мой MultiWriter
, чтобы использовать MultipleOutputFormat
вместо того, чтобы пересказывать собственное сопоставление файлов с ключами и gt. Но бит mapPartitionsWithIndex
был бы в основном без изменений.
– Daniel Darabos
21 June 2014 в 18:36
mapPartitionsWithIndex
и вручную записываем в HDFS, этот конкретный раздел не обязательно выводится в нужное место этого раздела. Поэтому дополнительная перетасовка не нужна и ее можно избежать.
– samthebest
12 August 2014 в 11:42