pyspark json взрывается для массива с нулевым или большим количеством элементов [duplicate]

Вы можете использовать system для запуска команды операционной системы из R.

Чтобы запустить java-банку в хорошо сконфигурированной системе, это должно работать:

 system("java -jar /path/to/my.jar")

Вы можете добавить другие параметры, например, возможно, код Java принимает входные данные из файла, который вы пишете с помощью R, и передайте имя файла. Затем вывод из кода Java можно записать в файлы, а затем прочитать из R. Не зная, что делает код Java, мы не можем быть более конкретными.

Можно напрямую взаимодействовать с Java-кодом, но это требует полного знания внутренних компонентов банки, чтобы вы знали, какие функции вызывать с помощью каких параметров. Это обычно называют «API» для этой Java.

В противном случае напишите файл, вызовите system, прочитайте файл, иногда это самый простой способ запускать код на других языках.

16
задан alexgbelov 28 September 2016 в 05:57
поделиться

2 ответа

Spark 2.2 +

Вы можете использовать функцию explode_outer:

import org.apache.spark.sql.functions.explode_outer

df.withColumn("likes", explode_outer($"likes")).show

// +---+----+--------+
// | id|name|   likes|
// +---+----+--------+
// |  1|Luke|baseball|
// |  1|Luke|  soccer|
// |  2|Lucy|    null|
// +---+----+--------+

Spark & ​​lt; = 2.1

В Scala, но эквивалент Java должен быть почти идентичны (для импорта отдельных функций используется import static).

import org.apache.spark.sql.functions.{array, col, explode, lit, when}

val df = Seq(
  (1, "Luke", Some(Array("baseball", "soccer"))),
  (2, "Lucy", None)
).toDF("id", "name", "likes")

df.withColumn("likes", explode(
  when(col("likes").isNotNull, col("likes"))
    // If null explode an array<string> with a single null
    .otherwise(array(lit(null).cast("string")))))

Идея здесь в основном заключается в том, чтобы заменить NULL на array(NULL) желаемого типа. Для сложного типа (aka structs) вам необходимо предоставить полную схему:

val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y")

val st =  StructType(Seq(
  StructField("_1", IntegerType, false), StructField("_2", StringType, true)
))

dfStruct.withColumn("y", explode(
  when(col("y").isNotNull, col("y"))
    .otherwise(array(lit(null).cast(st)))))

или

dfStruct.withColumn("y", explode(
  when(col("y").isNotNull, col("y"))
    .otherwise(array(lit(null).cast("struct<_1:int,_2:string>")))))

Примечание:

Если массив Column ] был создан с containsNull установлен на false, вы должны сначала изменить это (проверено с помощью Spark 2.1):

df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true)))
33
ответ дан zero323 19 August 2018 в 01:57
поделиться
  • 1
    Это выглядит великолепно, спасибо! У меня есть следующий вопрос: что, если мой тип столбца - StructType? Я попытался использовать cast (new StructType ()), но я получил data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type;. Я пытаюсь сделать мой метод как можно более общим, поэтому он подходит для всех типов столбцов. – alexgbelov 28 September 2016 в 07:13
  • 2
    Кроме того, чтобы получить тип столбца, я использую DataFrame.dtypes (). Есть ли лучший способ получить типы столбцов? – alexgbelov 28 September 2016 в 07:20
  • 3
    a) Вы должны предоставить полную схему со всеми полями. b) dtypes или schema. – zero323 28 September 2016 в 07:33
  • 4
    объединиться вместо случая - когда он более краток и должен работать как шарм – morsik 14 October 2016 в 11:30
  • 5
    @hamed Это будет практически идентичное исправление для небольшой разницы синтаксиса (например, isNotNull() вместо isNotNull). – zero323 12 April 2017 в 13:19

Следуя принятому ответу, когда элементы массива являются сложным типом, его трудно определить вручную (например, с большими структурами).

Чтобы сделать это автоматически, я написал следующий помощник метод:

  def explodeOuter(df: Dataset[Row], columnsToExplode: List[String]) = {
      val arrayFields = df.schema.fields
          .map(field => field.name -> field.dataType)
          .collect { case (name: String, type: ArrayType) => (name, type.asInstanceOf[ArrayType])}
          .toMap

      columnsToExplode.foldLeft(df) { (dataFrame, arrayCol) =>
      dataFrame.withColumn(arrayCol, explode(when(size(col(arrayCol)) =!= 0, col(arrayCol))
        .otherwise(array(lit(null).cast(arrayFields(arrayCol).elementType)))))    
 }
0
ответ дан nsanglar 19 August 2018 в 01:57
поделиться
Другие вопросы по тегам:

Похожие вопросы: