Автоматически и элегантно сглаживает DataFrame в Spark SQL

На самом деле есть супер простой способ сделать это без беспорядочных перехватов QPaintEvent и без жестких требований QGraphicsProxyWidget, которые не работают на продвинутых дочерних виджетах. Техника ниже будет работать даже с продвинутыми виджетами и их дочерними виджетами.

Fade In Your Widget

// w is your widget
QGraphicsOpacityEffect *eff = new QGraphicsOpacityEffect(this);
w->setGraphicsEffect(eff);
QPropertyAnimation *a = new QPropertyAnimation(eff,"opacity");
a->setDuration(350);
a->setStartValue(0);
a->setEndValue(1);
a->setEasingCurve(QEasingCurve::InBack);
a->start(QPropertyAnimation::DeleteWhenStopped);

Fade Out Your Widget

// w is your widget
QGraphicsOpacityEffect *eff = new QGraphicsOpacityEffect(this);
w->setGraphicsEffect(eff);
QPropertyAnimation *a = new QPropertyAnimation(eff,"opacity");
a->setDuration(350);
a->setStartValue(1);
a->setEndValue(0);
a->setEasingCurve(QEasingCurve::OutBack);
a->start(QPropertyAnimation::DeleteWhenStopped);
connect(a,SIGNAL(finished()),this,SLOT(hideThisWidget()));
// now implement a slot called hideThisWidget() to do
// things like hide any background dimmer, etc.
29
задан echen 26 May 2016 в 21:30
поделиться

10 ответов

Краткий ответ: нет «принятого» способа сделать это, но вы можете сделать это очень элегантно с помощью рекурсивной функции, которая генерирует ваше утверждение select(...), пройдя по DataFrame.schema.

Рекурсивная функция должна возвращать Array[Column]. Каждый раз, когда функция попадает в StructType, она вызывает себя и добавляет возвращенный Array[Column] к своему собственному Array[Column].

Что-то вроде:

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colName = if (prefix == null) f.name else (prefix + "." + f.name)

    f.dataType match {
      case st: StructType => flattenSchema(st, colName)
      case _ => Array(col(colName))
    }
  })
}

Затем вы бы использовали это так:

df.select(flattenSchema(df.schema):_*)
55
ответ дан David Griffin 26 May 2016 в 21:30
поделиться

Я улучшаю свой предыдущий ответ и предлагаю решение своей проблемы, указанной в комментариях к принятому ответу.

Это принятое решение создает массив объектов Column и использует его для выбора этих столбцов. В Spark, если у вас есть вложенный DataFrame, вы можете выбрать дочерний столбец следующим образом: df.select("Parent.Child"), и это возвращает DataFrame со значениями дочернего столбца и называется Child . Но если у вас есть идентичные имена для атрибутов различных родительских структур, вы теряете информацию о родительском элементе и можете получить идентичные имена столбцов и больше не сможете получить к ним доступ по имени, поскольку они однозначны.

Это была моя проблема.

Я нашел решение своей проблемы, может быть, оно может помочь кому-то еще. Я вызвал flattenSchema отдельно:

val flattenedSchema = flattenSchema(df.schema)

, и это вернуло объекты Array of Column. Вместо использования этого в select(), который возвращал бы DataFrame со столбцами, названными потомком последнего уровня, я сопоставил исходные имена столбцов с самими собой как строки, затем после выбора столбца Parent.Child он переименовал его в Parent.Child вместо Child (для удобства я также заменил точки подчеркиванием):

val renamedCols = flattenedSchema.map(name => col(name.toString()).as(name.toString().replace(".","_")))

А затем вы можете использовать функцию выбора, как показано в исходном ответе:

var newDf = df.select(renamedCols:_*)
18
ответ дан V. Samma 26 May 2016 в 21:30
поделиться

Просто хотел поделиться своим решением для Pyspark - это более или менее перевод решения @David Griffin, поэтому он поддерживает любой уровень вложенных объектов.

from pyspark.sql.types import StructType, ArrayType  

def flatten(schema, prefix=None):
    fields = []
    for field in schema.fields:
        name = prefix + '.' + field.name if prefix else field.name
        dtype = field.dataType
        if isinstance(dtype, ArrayType):
            dtype = dtype.elementType

        if isinstance(dtype, StructType):
            fields += flatten(dtype, prefix=name)
        else:
            fields.append(name)

    return fields


df.select(flatten(df.schema)).show()
12
ответ дан Steve Ng 26 May 2016 в 21:30
поделиться

Чтобы объединить ответы Дэвида Гриффена и В. Саммы, вы можете просто сделать это, чтобы сгладить, избегая дублирования имен столбцов:

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Column
import org.apache.spark.sql.DataFrame

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colName = if (prefix == null) f.name else (prefix + "." + f.name)
    f.dataType match {
      case st: StructType => flattenSchema(st, colName)
      case _ => Array(col(colName).as(colName.replace(".","_")))
    }
  })
}

def flattenDataFrame(df:DataFrame): DataFrame = {
    df.select(flattenSchema(df.schema):_*)
}

var my_flattened_json_table = flattenDataFrame(my_json_table)
1
ответ дан swdev 26 May 2016 в 21:30
поделиться

Я добавил метод DataFrame#flattenSchema в открытый проект spark-daria .

Вот как вы можете использовать функцию с вашим кодом.

import com.github.mrpowers.spark.daria.sql.DataFrameExt._
df.flattenSchema().show()

+-------+-------+---------+----+---+
|foo.bar|foo.baz|        x|   y|  z|
+-------+-------+---------+----+---+
|   this|     is|something|cool| ;)|
+-------+-------+---------+----+---+

Вы также можете указать различные разделители имен столбцов с помощью метода flattenSchema().

df.flattenSchema(delimiter = "_").show()
+-------+-------+---------+----+---+
|foo_bar|foo_baz|        x|   y|  z|
+-------+-------+---------+----+---+
|   this|     is|something|cool| ;)|
+-------+-------+---------+----+---+

Этот параметр разделителя удивительно важен. Если вы выравниваете свою схему для загрузки таблицы в Redshift, вы не сможете использовать точки в качестве разделителя.

Вот полный фрагмент кода для генерации этого вывода.

val data = Seq(
  Row(Row("this", "is"), "something", "cool", ";)")
)

val schema = StructType(
  Seq(
    StructField(
      "foo",
      StructType(
        Seq(
          StructField("bar", StringType, true),
          StructField("baz", StringType, true)
        )
      ),
      true
    ),
    StructField("x", StringType, true),
    StructField("y", StringType, true),
    StructField("z", StringType, true)
  )
)

val df = spark.createDataFrame(
  spark.sparkContext.parallelize(data),
  StructType(schema)
)

df.flattenSchema().show()

Базовый код аналогичен коду Дэвида Гриффина (в случае, если вы не хотите добавлять зависимость spark-daria в ваш проект).

object StructTypeHelpers {

  def flattenSchema(schema: StructType, delimiter: String = ".", prefix: String = null): Array[Column] = {
    schema.fields.flatMap(structField => {
      val codeColName = if (prefix == null) structField.name else prefix + "." + structField.name
      val colName = if (prefix == null) structField.name else prefix + delimiter + structField.name

      structField.dataType match {
        case st: StructType => flattenSchema(schema = st, delimiter = delimiter, prefix = colName)
        case _ => Array(col(codeColName).alias(colName))
      }
    })
  }

}

object DataFrameExt {

  implicit class DataFrameMethods(df: DataFrame) {

    def flattenSchema(delimiter: String = ".", prefix: String = null): DataFrame = {
      df.select(
        StructTypeHelpers.flattenSchema(df.schema, delimiter, prefix): _*
      )
    }

  }

}
1
ответ дан Powers 26 May 2016 в 21:30
поделиться

Вот функция, которая делает то, что вы хотите, и которая может иметь дело с несколькими вложенными столбцами, содержащими столбцы с одинаковым именем, с префиксом:

from pyspark.sql import functions as F

def flatten_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']

    flat_df = nested_df.select(flat_cols +
                               [F.col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])
    return flat_df

До:

root
 |-- x: string (nullable = true)
 |-- y: string (nullable = true)
 |-- foo: struct (nullable = true)
 |    |-- a: float (nullable = true)
 |    |-- b: float (nullable = true)
 |    |-- c: integer (nullable = true)
 |-- bar: struct (nullable = true)
 |    |-- a: float (nullable = true)
 |    |-- b: float (nullable = true)
 |    |-- c: integer (nullable = true)

После:

root
 |-- x: string (nullable = true)
 |-- y: string (nullable = true)
 |-- foo_a: float (nullable = true)
 |-- foo_b: float (nullable = true)
 |-- foo_c: integer (nullable = true)
 |-- bar_a: float (nullable = true)
 |-- bar_b: float (nullable = true)
 |-- bar_c: integer (nullable = true)
0
ответ дан steco 26 May 2016 в 21:30
поделиться

Я использовал один вкладыш, который приводит к сплющенной схеме с 5 столбцами bar, baz, x, y, z:

df.select("foo.*", "x", "y", "z")

Что касается explode: я обычно резервирую explode для выравнивания списка. Например, если у вас есть столбец idList, представляющий собой список строк, вы можете сделать:

df.withColumn("flattenedId", functions.explode(col("idList")))
  .drop("idList")

Это приведет к созданию нового информационного кадра со столбцом с именем flattenedId (больше не будет списком)

0
ответ дан Kei-ven 26 May 2016 в 21:30
поделиться

Это модификация решения, но она использует обозначение tailrec


  @tailrec
  def flattenSchema(
      splitter: String,
      fields: List[(StructField, String)],
      acc: Seq[Column]): Seq[Column] = {
    fields match {
      case (field, prefix) :: tail if field.dataType.isInstanceOf[StructType] =>
        val newPrefix = s"$prefix${field.name}."
        val newFields = field.dataType.asInstanceOf[StructType].fields.map((_, newPrefix)).toList
        flattenSchema(splitter, tail ++ newFields, acc)

      case (field, prefix) :: tail =>
        val colName = s"$prefix${field.name}"
        val newCol  = col(colName).as(colName.replace(".", splitter))
        flattenSchema(splitter, tail, acc :+ newCol)

      case _ => acc
    }
  }
  def flattenDataFrame(df: DataFrame): DataFrame = {
    val fields = df.schema.fields.map((_, ""))
    df.select(flattenSchema("__", fields.toList, Seq.empty): _*)
  }
.
0
ответ дан fhuertas 26 May 2016 в 21:30
поделиться

Вы также можете использовать SQL, чтобы выбрать столбцы как плоские.

  1. Получить исходную схему фрейма данных
  2. Создать строку SQL путем просмотра схемы
  3. Запросить исходный фрейм данных

Я выполнил реализацию в Java: https://gist.github.com/ebuildy/3de0e2855498e5358e4eed1a4f72ea48

(также используйте рекурсивный метод, я предпочитаю способ SQL, так что вы можете легко проверить его через Spark-shell ).

1
ответ дан Thomas Decaux 26 May 2016 в 21:30
поделиться
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.StructType
import scala.collection.mutable.ListBuffer 
val columns=new ListBuffer[String]()

def flattenSchema(schema:StructType,prefix:String=null){
for(i<-schema.fields){
  if(i.dataType.isInstanceOf[StructType]) {
    val columnPrefix = i.name + "."
    flattenSchema(i.dataType.asInstanceOf[StructType], columnPrefix)
  }
  else {
    if(prefix == null)
      columns.+=(i.name)
    else
      columns.+=(prefix+i.name)
  }
  }
}
0
ответ дан 27 November 2019 в 04:14
поделиться
Другие вопросы по тегам:

Похожие вопросы: