Функция оценки времени выполнения не работает с наборами данных / RDD Spark

Вы не можете установить объект на null, только переменную, которая может содержать указатель / ссылку на этот объект. Это не влияет на сам объект. Но если теперь нет пути от любого живого потока (т. Е. Локальной переменной любого запущенного метода) к вашему объекту, это будет собирать мусор, если и когда требуется память. Это относится к любым объектам, которые относятся к вашему исходному древовидному объекту.

Обратите внимание, что для локальных переменных обычно не нужно устанавливать их в null, если метод (или блок) будет В любом случае, закончить скоро.

1
задан Ankit Khettry 21 January 2019 в 08:38
поделиться

1 ответ

Существует открытая проблема в Spark JIRA, чтобы решить эту проблему - SPARK-20525 Причиной этой проблемы было несовпадение загрузчика искровых классов при загрузке Spark UDF.

Разрешение этого состоит в том, чтобы загрузить ваш сеанс искры после вашего переводчика. Пожалуйста, найдите пример кода. Также вы можете сослаться на мой github например SparkCustomTransformations

trait CustomTransformations extends Serializable {
  def execute(spark: SparkSession, df: DataFrame, udfFunctions: AnyRef*): DataFrame
}

// IMPORTANT spark session should be lazy evaluated
lazy val spark = getSparkSession

def getInterpretor: scala.tools.nsc.interpreter.IMain = {

  import scala.tools.nsc.GenericRunnerSettings
  import scala.tools.nsc.interpreter.IMain

  val cl = ClassLoader.getSystemClassLoader
  val conf = new SparkConf()
  val settings = new GenericRunnerSettings(println _)
  settings.usejavacp.value = true

  val intp = new scala.tools.nsc.interpreter.IMain(settings, new java.io.PrintWriter(System.out))
  intp.setContextClassLoader
  intp.initializeSynchronous

  intp
}

val intp = getInterpretor

val udf_str =
  """
    (str:String)=>{
      str.toLowerCase
    }
    """
val customTransStr =
  """
    |import org.apache.spark.SparkConf
    |import org.apache.spark.sql.{DataFrame, SparkSession}
    |import org.apache.spark.sql.functions._
    |
    |new CustomTransformations {
    |    override def execute(spark: SparkSession, df: DataFrame, func: AnyRef*): DataFrame = {
    |
    |      //reading your UDF
    |      val str_lower_udf = spark.udf.register("str_lower", func(0).asInstanceOf[Function1[String,String]])
    |
    |      df.createOrReplaceTempView("df")
    |      val df_with_UDF_cols = spark.sql("select a.*, str_lower(a.fakeEventTag) as customUDFCol1 from df a").withColumn("customUDFCol2", str_lower_udf(col("fakeEventTag")))
    |
    |      df_with_UDF_cols.show()
    |      df_with_UDF_cols
    |    }
    |}
  """.stripMargin

intp.interpret(udf_str)
var udf_obj = intp.eval(udf_str)

val eval = new com.twitter.util.Eval
val customTransform: CustomTransformations = eval[CustomTransformations](customTransStr)


val sampleSparkDF = getSampleSparkDF
val outputDF = customTransform.execute(spark, sampleSparkDF, udf_obj)

outputDF.printSchema()
outputDF.show()
0
ответ дан pavan 21 January 2019 в 08:38
поделиться
Другие вопросы по тегам:

Похожие вопросы: