«Контейнер убит YARN за превышение пределов памяти. 10,4 ГБ из 10,4 ГБ используемой физической памяти »в кластере EMR с 75 ГБ памяти

Я использую кластер Spark с 5 узлами на AWS EMR каждого размера m3.xlarge (1 ведущий 4 подчиненных). Я успешно пробежал сжатый CSV-файл размером 146 Мб bzip2 и получил идеально агрегированный результат.

Сейчас я пытаюсь обработать файл CSV размером ~ 5 ГБ bzip2 в этом кластере, но получаю эту ошибку:

16/11/23 17:29:53 WARN TaskSetManager: Lost Задача 49.2 на этапе 6.0 (TID xxx, xxx.xxx.xxx.compute.internal): ExecutorLostFailure (executor 16 завершен из-за одной из запущенных задач) Причина: Контейнер уничтожен YARN за превышение пределов памяти. 10,4 ГБ из 10,4 ГБ физической памяти. Подумайте над улучшением spark.yarn.executor.memoryOverhead.

Я не понимаю, почему я получаю ограничение в ~ 10,5 ГБ для кластера ~ 75 ГБ (15 ГБ на 3 м.большой экземпляр) ...

Вот моя конфигурация EMR:

[
 {
  "classification":"spark-env",
  "properties":{

  },
  "configurations":[
     {
        "classification":"export",
        "properties":{
           "PYSPARK_PYTHON":"python34"
        },
        "configurations":[

        ]
     }
  ]
},
{
  "classification":"spark",
  "properties":{
     "maximizeResourceAllocation":"true"
  },
  "configurations":[

  ]
 }
]

Из того, что я прочитал, настройка свойства maximizeResourceAllocation должна указывать EMR, чтобы он настраивал Spark для полного использования всех ресурсов, доступных в кластере. Т.е. у меня должно быть ~ 75 ГБ доступной памяти ... Так почему я получаю ошибку ограничения ~ 10,5 ГБ? Вот код, который я запускаю:

def sessionize(raw_data, timeout):
# https://www.dataiku.com/learn/guide/code/reshaping_data/sessionization.html
    window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
              .orderBy("timestamp"))
    diff = (pyspark.sql.functions.lag(raw_data.timestamp, 1)
            .over(window))
    time_diff = (raw_data.withColumn("time_diff", raw_data.timestamp - diff)
                 .withColumn("new_session", pyspark.sql.functions.when(pyspark.sql.functions.col("time_diff") >= timeout.seconds, 1).otherwise(0)))
    window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
              .orderBy("timestamp")
              .rowsBetween(-1, 0))
    sessions = (time_diff.withColumn("session_id", pyspark.sql.functions.concat_ws("_", "user_id", "site_id", pyspark.sql.functions.sum("new_session").over(window))))
    return sessions
def aggregate_sessions(sessions):
    median = pyspark.sql.functions.udf(lambda x: statistics.median(x))
    aggregated = sessions.groupBy(pyspark.sql.functions.col("session_id")).agg(
        pyspark.sql.functions.first("site_id").alias("site_id"),
        pyspark.sql.functions.first("user_id").alias("user_id"),
        pyspark.sql.functions.count("id").alias("hits"),
        pyspark.sql.functions.min("timestamp").alias("start"),
        pyspark.sql.functions.max("timestamp").alias("finish"),
        median(pyspark.sql.functions.collect_list("foo")).alias("foo"),
    )
    return aggregated
 spark_context = pyspark.SparkContext(appName="process-raw-data")
spark_session = pyspark.sql.SparkSession(spark_context)
raw_data = spark_session.read.csv(sys.argv[1],
                                  header=True,
                                  inferSchema=True)
# Windowing doesn't seem to play nicely with TimestampTypes.
#
# Should be able to do this within the ``spark.read.csv`` call, I'd
# think. Need to look into it.
convert_to_unix = pyspark.sql.functions.udf(lambda s: arrow.get(s).timestamp)
raw_data = raw_data.withColumn("timestamp",
                               convert_to_unix(pyspark.sql.functions.col("timestamp")))
sessions = sessionize(raw_data, SESSION_TIMEOUT)
aggregated = aggregate_sessions(sessions)
aggregated.foreach(save_session)

По сути, не более, чем управление окнами и groupBy для агрегирования данных.

Это начинается с нескольких из этих ошибок, и к прекращению увеличения в размере той же самой ошибки.

Я пытался запустить spark-submit с помощью - conf spark.yarn.executor.memoryOverhead , но, похоже, это тоже не решает проблему.

41
задан Community 22 September 2017 в 17:48
поделиться