как изменить схему структуры внутри фрейма данных? [Дубликат]

Не использовать stream.stop(), он устарел

Оповещения MediaStream

Используйте stream.getTracks().forEach(track => track.stop())

2
задан user6910411 24 March 2017 в 20:54
поделиться

1 ответ

Python

Невозможно изменить одно вложенное поле. Вы должны воссоздать целую структуру. В этом конкретном случае самым простым решением является использование cast.

Сначала куча импорта:

from collections import namedtuple
from pyspark.sql.functions import col
from pyspark.sql.types import (
    ArrayType, LongType, StringType, StructField, StructType)

и данные примера:

Record = namedtuple("Record", ["a", "b", "c"])

df = sc.parallelize([([Record("foo", 1, 3)], )]).toDF(["array_field"])

Давайте подтвердим, что схема такая же, как в вашем случае:

df.printSchema()
root
 |-- array_field: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- c: long (nullable = true)

Вы можете определить новую схему, например, как строку:

str_schema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"

df.select(col("array_field").cast(str_schema)).printSchema()
root
 |-- array_field: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a_renamed: string (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- c: long (nullable = true)

или DataType:

struct_schema = ArrayType(StructType([
    StructField("a_renamed", StringType()),
    StructField("b", LongType()),
    StructField("c", LongType())
]))

 df.select(col("array_field").cast(struct_schema)).printSchema()
root
 |-- array_field: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a_renamed: string (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- c: long (nullable = true)

Scala

Те же методы могут использоваться в Scala:

case class Record(a: String, b: Long, c: Long)

val df = Seq(Tuple1(Seq(Record("foo", 1, 3)))).toDF("array_field")

val strSchema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"

df.select($"array_field".cast(strSchema))

или

import org.apache.spark.sql.types._

val structSchema = ArrayType(StructType(Seq(
    StructField("a_renamed", StringType),
    StructField("b", LongType),
    StructField("c", LongType)
)))

df.select($"array_field".cast(structSchema))

Возможные улучшения:

Если вы используете экспрессивную обработку данных или библиотеку обработки JSON, было бы проще сбросить типы данных в строку dict или JSON и взять ее оттуда например (Python / toolz ):

from toolz.curried import pipe, assoc_in, update_in, map
from operator import attrgetter

# Update name to "a_updated" if name is "a"
rename_field = update_in(
    keys=["name"], func=lambda x: "a_updated" if x == "a" else x)

updated_schema = pipe(
   #  Get schema of the field as a dict
   df.schema["array_field"].jsonValue(),
   # Update fields with rename
   update_in(
       keys=["type", "elementType", "fields"],
       func=lambda x: pipe(x, map(rename_field), list)),
   # Load schema from dict
   StructField.fromJson,
   # Get data type
   attrgetter("dataType"))

df.select(col("array_field").cast(updated_schema)).printSchema()
5
ответ дан user6910411 16 August 2018 в 11:47
поделиться
  • 1
    Здесь str_schema = "array<struct<a_renamed:string,b:bigint,c:bigint>>", когда вы знаете схему, это работает, но когда я не знаю схему. Как мы можем заменить все имена определенной строкой, например a_renamed, b_renamed, c_renamed, как это показано на – User12345 17 July 2018 в 21:16
  • 2
    @ User12345 Spark always & quot; знает & quot; схема во время выполнения. Если вы не хотите / не можете жестко кодировать строку DataType / DDL, вам необходимо динамически преобразовывать существующую схему. Пример такого подхода показан в последнем абзаце, но вы можете найти другие подходы, которые дадут тот же результат. – user6910411 19 July 2018 в 12:38
Другие вопросы по тегам:

Похожие вопросы: