Добавьте этот скрипт в .profile
в Mac OS X:
# Usage:
# `git-pull-all` to pull all your local branches from origin
# `git-pull-all remote` to pull all your local branches from a named remote
function git-pull-all() {
START=$(git symbolic-ref --short -q HEAD);
for branch in $(git branch | sed 's/^.//'); do
git checkout $branch;
git pull ${1:-origin} $branch || break;
done;
git checkout $START;
};
function git-push-all() {
git push --all ${1:-origin};
};
df4.groupBy("year").pivot("course").sum("earnings").collect()
Предположим, что после вашей функции будет последовательность элементов, приведенная ниже:
val df = sc.parallelize(List(("Mike,1986,Toronto", 30), ("Andre,1980,Ottawa", 36), ("jill,1989,London", 27))).toDF("infoComb", "age")
df.show
+------------------+---+
| infoComb|age|
+------------------+---+
|Mike,1986,Toronto| 30|
| Andre,1980,Ottawa| 36|
| jill,1989,London| 27|
+------------------+---+
теперь то, что вы можете сделать с этой инфо-зоной, состоит в том, что вы можете начать разделение строки и получить больше столбцов с:
df.select(expr("(split(infoComb, ','))[0]").cast("string").as("name"), expr("(split(infoComb, ','))[1]").cast("integer").as("yearOfBorn"), expr("(split(infoComb, ','))[2]").cast("string").as("city"), $"age").show
+-----+----------+-------+---+
| name|yearOfBorn| city|age|
+-----+----------+-------+---+
|Mike| 1986|Toronto| 30|
|Andre| 1980| Ottawa| 36|
| jill| 1989| London| 27|
+-----+----------+-------+---+
Надеюсь, это поможет.
Если ваши результирующие столбцы будут иметь ту же длину, что и исходная, вы можете создать новые столбцы с помощью функции withColumn и путем применения udf. После этого вы можете удалить исходный столбец, например:
val newDf = myDf.withColumn("newCol1", myFun(myDf("originalColumn")))
.withColumn("newCol2", myFun2(myDf("originalColumn"))
.drop(myDf("originalColumn"))
, где myFun - это udf, определяемый следующим образом:
def myFun= udf(
(originalColumnContent : String) => {
// do something with your original column content and return a new one
}
)
Я решил создать функцию сглаживания одного столбца, а затем просто вызовет ее одновременно с udf.
Сначала определите это:
implicit class DfOperations(df: DataFrame) {
def flattenColumn(col: String) = {
def addColumns(df: DataFrame, cols: Array[String]): DataFrame = {
if (cols.isEmpty) df
else addColumns(
df.withColumn(col + "_" + cols.head, df(col + "." + cols.head)),
cols.tail
)
}
val field = df.select(col).schema.fields(0)
val newCols = field.dataType.asInstanceOf[StructType].fields.map(x => x.name)
addColumns(df, newCols).drop(col)
}
def withColumnMany(colName: String, col: Column) = {
df.withColumn(colName, col).flattenColumn(colName)
}
}
Тогда использование очень просто:
case class MyClass(a: Int, b: Int)
val df = sc.parallelize(Seq(
(0),
(1)
)).toDF("x")
val f = udf((x: Int) => MyClass(x*2,x*3))
df.withColumnMany("test", f($"x")).show()
// +---+------+------+
// | x|test_a|test_b|
// +---+------+------+
// | 0| 0| 0|
// | 1| 2| 3|
// +---+------+------+
Вообще говоря, то, что вы хотите, напрямую не возможно. В то время UDF может возвращать только один столбец. Существует два разных способа преодоления этого ограничения:
StructType
, но вы можете также рассмотреть ArrayType
или MapType
. import org.apache.spark.sql.functions.udf
val df = Seq(
(1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c")
).toDF("x", "y", "z")
case class Foobar(foo: Double, bar: Double)
val foobarUdf = udf((x: Long, y: Double, z: String) =>
Foobar(x * y, z.head.toInt * y))
val df1 = df.withColumn("foobar", foobarUdf($"x", $"y", $"z"))
df1.show
// +---+----+---+------------+
// | x| y| z| foobar|
// +---+----+---+------------+
// | 1| 3.0| a| [3.0,291.0]|
// | 2|-1.0| b|[-2.0,-98.0]|
// | 3| 0.0| c| [0.0,0.0]|
// +---+----+---+------------+
df1.printSchema
// root
// |-- x: long (nullable = false)
// |-- y: double (nullable = false)
// |-- z: string (nullable = true)
// |-- foobar: struct (nullable = true)
// | |-- foo: double (nullable = false)
// | |-- bar: double (nullable = false)
Это может быть легко сглажено позже, но обычно этого не нужно. import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
def foobarFunc(x: Long, y: Double, z: String): Seq[Any] =
Seq(x * y, z.head.toInt * y)
val schema = StructType(df.schema.fields ++
Array(StructField("foo", DoubleType), StructField("bar", DoubleType)))
val rows = df.rdd.map(r => Row.fromSeq(
r.toSeq ++
foobarFunc(r.getAs[Long]("x"), r.getAs[Double]("y"), r.getAs[String]("z"))))
val df2 = sqlContext.createDataFrame(rows, schema)
df2.show
// +---+----+---+----+-----+
// | x| y| z| foo| bar|
// +---+----+---+----+-----+
// | 1| 3.0| a| 3.0|291.0|
// | 2|-1.0| b|-2.0|-98.0|
// | 3| 0.0| c| 0.0| 0.0|
// +---+----+---+----+-----+
df1.foobar.foo
)?
– max
20 June 2016 в 17:03
structs
может использоваться практически в любом контексте, когда обычно используется плоская структура (с простым синтаксисом точки fooobar.foo
). Однако это не относится к типам коллекций. Вы также можете проверить stackoverflow.com/a/33850490/1560062
– zero323
20 June 2016 в 17:33