искровый случай класс udf выход как dataframe [дубликат]

Добавьте этот скрипт в .profile в Mac OS X:

# Usage:
#   `git-pull-all` to pull all your local branches from origin
#   `git-pull-all remote` to pull all your local branches from a named remote

function git-pull-all() {
    START=$(git symbolic-ref --short -q HEAD);
    for branch in $(git branch | sed 's/^.//'); do
        git checkout $branch;
        git pull ${1:-origin} $branch || break;
    done;
    git checkout $START;
};

function git-push-all() {
    git push --all ${1:-origin};
};

44
задан zero323 26 October 2015 в 13:04
поделиться

5 ответов

[Д0] Это может быть легко достигнуто с помощью функции поворота

df4.groupBy("year").pivot("course").sum("earnings").collect() 
-1
ответ дан David Arenburg 19 August 2018 в 10:04
поделиться
  • 1
    Панг, спасибо за форматирование – Abhishek Kgsk 20 January 2017 в 08:08
  • 2
    Я не вижу «год», «курс», или "заработок" в любом из ответов или o.p ... какой кадр данных вы говорите в этом очень кратком ответе (не)? – Kai 26 May 2017 в 15:27

Предположим, что после вашей функции будет последовательность элементов, приведенная ниже:

val df = sc.parallelize(List(("Mike,1986,Toronto", 30), ("Andre,1980,Ottawa", 36), ("jill,1989,London", 27))).toDF("infoComb", "age")
df.show
+------------------+---+
|          infoComb|age|
+------------------+---+
|Mike,1986,Toronto| 30|
| Andre,1980,Ottawa| 36|
|  jill,1989,London| 27|
+------------------+---+

теперь то, что вы можете сделать с этой инфо-зоной, состоит в том, что вы можете начать разделение строки и получить больше столбцов с:

df.select(expr("(split(infoComb, ','))[0]").cast("string").as("name"), expr("(split(infoComb, ','))[1]").cast("integer").as("yearOfBorn"), expr("(split(infoComb, ','))[2]").cast("string").as("city"), $"age").show
+-----+----------+-------+---+
| name|yearOfBorn|   city|age|
+-----+----------+-------+---+
|Mike|      1986|Toronto| 30|
|Andre|      1980| Ottawa| 36|
| jill|      1989| London| 27|
+-----+----------+-------+---+

Надеюсь, это поможет.

16
ответ дан EdwinGuo 19 August 2018 в 10:04
поделиться

Если ваши результирующие столбцы будут иметь ту же длину, что и исходная, вы можете создать новые столбцы с помощью функции withColumn и путем применения udf. После этого вы можете удалить исходный столбец, например:

 val newDf = myDf.withColumn("newCol1", myFun(myDf("originalColumn")))
.withColumn("newCol2", myFun2(myDf("originalColumn"))
.drop(myDf("originalColumn"))

, где myFun - это udf, определяемый следующим образом:

   def myFun= udf(
    (originalColumnContent : String) =>  {
      // do something with your original column content and return a new one
    }
  )
5
ответ дан Niemand 19 August 2018 в 10:04
поделиться
  • 1
    Привет Niemand, я ценю ваш ответ ... но это не решает вопрос ... в вашем коде вы вызываете функцию & quot; myDF & quot; несколько раз, тогда как я хотел бы, чтобы эта функция вызывалась один раз, генерирует класс, имеющий несколько полей, и каждую переменную поля возвращают как новые столбцы – sshroff 25 August 2015 в 20:09
  • 2
    Ну, я боюсь, что я представил единственный возможный способ узнать, я не думаю, что существует какой-либо другой способ, но, надеюсь, я ошибаюсь;). Также не то, что я не вызывал myFun несколько раз - вы можете вызвать другие функции, такие как myFun2, myFun3 и т. Д., Чтобы создать нужные столбцы. – Niemand 25 August 2015 в 20:15

Я решил создать функцию сглаживания одного столбца, а затем просто вызовет ее одновременно с udf.

Сначала определите это:

implicit class DfOperations(df: DataFrame) {

  def flattenColumn(col: String) = {
    def addColumns(df: DataFrame, cols: Array[String]): DataFrame = {
      if (cols.isEmpty) df
      else addColumns(
        df.withColumn(col + "_" + cols.head, df(col + "." + cols.head)),
        cols.tail
      )
    }

    val field = df.select(col).schema.fields(0)
    val newCols = field.dataType.asInstanceOf[StructType].fields.map(x => x.name)

    addColumns(df, newCols).drop(col)
  }

  def withColumnMany(colName: String, col: Column) = {
    df.withColumn(colName, col).flattenColumn(colName)
  }

}

Тогда использование очень просто:

case class MyClass(a: Int, b: Int)

val df = sc.parallelize(Seq(
  (0),
  (1)
)).toDF("x")

val f = udf((x: Int) => MyClass(x*2,x*3))

df.withColumnMany("test", f($"x")).show()

//  +---+------+------+
//  |  x|test_a|test_b|
//  +---+------+------+
//  |  0|     0|     0|
//  |  1|     2|     3|
//  +---+------+------+
2
ответ дан Pekka 19 August 2018 в 10:04
поделиться
  • 1
    Вам не нужно делать все с помощью ColumnMany. Просто используйте select (& quot; select. * & Quot;), чтобы сгладить его. – Assaf Mendelson 28 February 2017 в 09:17

Вообще говоря, то, что вы хотите, напрямую не возможно. В то время UDF может возвращать только один столбец. Существует два разных способа преодоления этого ограничения:

  1. Возвращает столбец сложного типа. Наиболее общим решением является StructType, но вы можете также рассмотреть ArrayType или MapType.
    import org.apache.spark.sql.functions.udf
    
    val df = Seq(
      (1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c")
    ).toDF("x", "y", "z")
    
    case class Foobar(foo: Double, bar: Double)
    
    val foobarUdf = udf((x: Long, y: Double, z: String) => 
      Foobar(x * y, z.head.toInt * y))
    
    val df1 = df.withColumn("foobar", foobarUdf($"x", $"y", $"z"))
    df1.show
    // +---+----+---+------------+
    // |  x|   y|  z|      foobar|
    // +---+----+---+------------+
    // |  1| 3.0|  a| [3.0,291.0]|
    // |  2|-1.0|  b|[-2.0,-98.0]|
    // |  3| 0.0|  c|   [0.0,0.0]|
    // +---+----+---+------------+
    
    df1.printSchema
    // root
    //  |-- x: long (nullable = false)
    //  |-- y: double (nullable = false)
    //  |-- z: string (nullable = true)
    //  |-- foobar: struct (nullable = true)
    //  |    |-- foo: double (nullable = false)
    //  |    |-- bar: double (nullable = false)
    
    Это может быть легко сглажено позже, но обычно этого не нужно.
  2. Переключиться на RDD, изменить и перестроить DF:
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    
    def foobarFunc(x: Long, y: Double, z: String): Seq[Any] = 
      Seq(x * y, z.head.toInt * y)
    
    val schema = StructType(df.schema.fields ++
      Array(StructField("foo", DoubleType), StructField("bar", DoubleType)))
    
    val rows = df.rdd.map(r => Row.fromSeq(
      r.toSeq ++
      foobarFunc(r.getAs[Long]("x"), r.getAs[Double]("y"), r.getAs[String]("z"))))
    
    val df2 = sqlContext.createDataFrame(rows, schema)
    
    df2.show
    // +---+----+---+----+-----+
    // |  x|   y|  z| foo|  bar|
    // +---+----+---+----+-----+
    // |  1| 3.0|  a| 3.0|291.0|
    // |  2|-1.0|  b|-2.0|-98.0|
    // |  3| 0.0|  c| 0.0|  0.0|
    // +---+----+---+----+-----+
    
59
ответ дан zero323 19 August 2018 в 10:04
поделиться
  • 1
    Когда вы говорите «обычно нет» для [выравнивания столбца] », почему? Или искра позволяет большинство вещей, которые вы делаете с колонками верхнего уровня, также выполняться с помощью иерархических данных (например, df1.foobar.foo)? – max 20 June 2016 в 17:03
  • 2
    @max Поскольку простой structs может использоваться практически в любом контексте, когда обычно используется плоская структура (с простым синтаксисом точки fooobar.foo). Однако это не относится к типам коллекций. Вы также можете проверить stackoverflow.com/a/33850490/1560062 – zero323 20 June 2016 в 17:33
Другие вопросы по тегам:

Похожие вопросы: