Как выводить больше коррелированных столбцов, которые не входят в столбцы groupBy и aggregation [duplicate]

Вы также можете использовать метод compareTo() для сравнения двух строк. Если результат compareTo равен 0, то две строки равны, в противном случае сравниваемые строки не равны.

== сравнивает ссылки и не сравнивает фактические строки. Если вы создали каждую строку, используя new String(somestring).intern(), вы можете использовать оператор == для сравнения двух строк, в противном случае могут использоваться только методы equals () или compareTo.

83
задан Rami 26 December 2017 в 01:35
поделиться

7 ответов

Это то же самое в ответе zero323, но в способе sql query

Предполагая, что датафрейм создан и зарегистрирован как

df.createOrReplaceTempView("table")
//+----+--------+----------+
//|Hour|Category|TotalValue|
//+----+--------+----------+
//|0   |cat26   |30.9      |
//|0   |cat13   |22.1      |
//|0   |cat95   |19.6      |
//|0   |cat105  |1.3       |
//|1   |cat67   |28.5      |
//|1   |cat4    |26.8      |
//|1   |cat13   |12.6      |
//|1   |cat23   |5.3       |
//|2   |cat56   |39.6      |
//|2   |cat40   |29.7      |
//|2   |cat187  |27.9      |
//|2   |cat68   |9.8       |
//|3   |cat8    |35.6      |
//+----+--------+----------+

Функция окна:

sqlContext.sql("select Hour, Category, TotalValue from (select *, row_number() OVER (PARTITION BY Hour ORDER BY TotalValue DESC) as rn  FROM table) tmp where rn = 1").show(false)
//+----+--------+----------+
//|Hour|Category|TotalValue|
//+----+--------+----------+
//|1   |cat67   |28.5      |
//|3   |cat8    |35.6      |
//|2   |cat56   |39.6      |
//|0   |cat26   |30.9      |
//+----+--------+----------+

Простая агрегация SQL, за которой следует соединение:

sqlContext.sql("select Hour, first(Category) as Category, first(TotalValue) as TotalValue from " +
  "(select Hour, Category, TotalValue from table tmp1 " +
  "join " +
  "(select Hour as max_hour, max(TotalValue) as max_value from table group by Hour) tmp2 " +
  "on " +
  "tmp1.Hour = tmp2.max_hour and tmp1.TotalValue = tmp2.max_value) tmp3 " +
  "group by tmp3.Hour")
  .show(false)
//+----+--------+----------+
//|Hour|Category|TotalValue|
//+----+--------+----------+
//|1   |cat67   |28.5      |
//|3   |cat8    |35.6      |
//|2   |cat56   |39.6      |
//|0   |cat26   |30.9      |
//+----+--------+----------+

Использование упорядочения по структурам:

sqlContext.sql("select Hour, vs.Category, vs.TotalValue from (select Hour, max(struct(TotalValue, Category)) as vs from table group by Hour)").show(false)
//+----+--------+----------+
//|Hour|Category|TotalValue|
//+----+--------+----------+
//|1   |cat67   |28.5      |
//|3   |cat8    |35.6      |
//|2   |cat56   |39.6      |
//|0   |cat26   |30.9      |
//+----+--------+----------+

Способ DataSets и дон 't do s такие же, как в исходном ответе

134
ответ дан Ramesh Maharjan 15 August 2018 в 18:13
поделиться
  • 1
    Похоже, что с искры 1.6 это row_number () вместо rowNumber – Adam Szałucha 15 September 2017 в 13:53
  • 2
    О Не использовать df.orderBy (...). GropBy (...). При каких обстоятельствах мы можем полагаться на orderBy (...)? или если мы не можем быть уверены, что orderBy () собирается дать правильный результат, какие альтернативы у нас есть? – Ignacio Alorre 27 September 2017 в 12:35
  • 3
    Я мог бы что-то игнорировать, но в целом рекомендуется избегать groupByKey , вместо этого следует использовать сокращениеByKey. Кроме того, вы сохраните одну строку. – Thomas 19 February 2018 в 16:17
  • 4
    @Thomas, избегающий groupBy / groupByKey, - это просто когда вы работаете с RDD, вы заметите, что в наборе данных Api нет функции reduceByKey. – soote 17 May 2018 в 04:19
  • 5
137
ответ дан Ramesh Maharjan 5 September 2018 в 17:21
поделиться

Для Spark 2.0.2 с группировкой по нескольким столбцам:

import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"col1", $"col2", $"col3").orderBy($"timestamp".desc)

val refined_df = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")
6
ответ дан Antonín Hoskovec 15 August 2018 в 18:13
поделиться

Нижеприведенное решение делает только одну группуBy и извлекает строки вашего фрейма данных, которые содержат maxValue за один снимок. Нет необходимости в дополнительных Joins или Windows.

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.DataFrame

//df is the dataframe with Day, Category, TotalValue

implicit val dfEnc = RowEncoder(df.schema)

val res: DataFrame = df.groupByKey{(r) => r.getInt(0)}.mapGroups[Row]{(day: Int, rows: Iterator[Row]) => i.maxBy{(r) => r.getDouble(2)}}
0
ответ дан elghoto 15 August 2018 в 18:13
поделиться
  • 1
    Но сначала он перемешивает все. Это вряд ли улучшение (возможно, не хуже оконных функций, в зависимости от данных). – user8371915 6 June 2018 в 22:00
  • 2
    у вас есть первое место в группе, которое вызовет перетасовку. Это не хуже, чем функция окна, потому что в оконной функции она будет оценивать окно для каждой отдельной строки в dataframe. – elghoto 6 June 2018 в 22:09

Мы можем использовать функцию окна rank () (где вы бы выбрали ранг = 1) ранг просто добавляет число для каждой строки группы (в этом случае это будет час)

вот пример. (из https://github.com/jaceklaskowski/mastering-apache-spark-book/blob/master/spark-sql-functions.adoc#rank )

val dataset = spark.range(9).withColumn("bucket", 'id % 3)

import org.apache.spark.sql.expressions.Window
val byBucket = Window.partitionBy('bucket).orderBy('id)

scala> dataset.withColumn("rank", rank over byBucket).show
+---+------+----+
| id|bucket|rank|
+---+------+----+
|  0|     0|   1|
|  3|     0|   2|
|  6|     0|   3|
|  1|     1|   1|
|  4|     1|   2|
|  7|     1|   3|
|  2|     2|   1|
|  5|     2|   2|
|  8|     2|   3|
+---+------+----+
-3
ответ дан Javier Montón 15 August 2018 в 18:13
поделиться

Если фрейм данных необходимо сгруппировать по нескольким столбцам, это может помочь

val keys = List("Hour", "Category");
 val selectFirstValueOfNoneGroupedColumns = 
 df.columns
   .filterNot(keys.toSet)
   .map(_ -> "first").toMap
 val grouped = 
 df.groupBy(keys.head, keys.tail: _*)
   .agg(selectFirstValueOfNoneGroupedColumns)

Надеюсь, что это поможет кому-то с аналогичной проблемой

-1
ответ дан NehaM 15 August 2018 в 18:13
поделиться

Здесь вы можете сделать так:

   val data = df.groupBy("Hour").agg(first("Hour").as("_1"),first("Category").as("Category"),first("TotalValue").as("TotalValue")).drop("Hour")

data.withColumnRenamed("_1","Hour").show
-2
ответ дан Shubham Agrawal 15 August 2018 в 18:13
поделиться
Другие вопросы по тегам:

Похожие вопросы: