В обход org.apache.hadoop.mapred.InvalidInputException: шаблон ввода s3n: // […] соответствует 0 файлам

Это вопрос, который я уже задавал в списке рассылки spark, и я надеюсь добиться большего успеха здесь.

Я не уверен, что это напрямую связано с искрой, хотя искра имеет какое-то отношение к тому факту, что я не могу легко решить эту проблему.

Я пытаюсь получить некоторые файлы из S3, используя различные шаблоны. Моя проблема в том, что некоторые из этих шаблонов могут ничего не возвращать, и когда они это делают, я получаю следующее исключение:

org.apache.hadoop.mapred.InvalidInputException: Input Pattern s3n://bucket/mypattern matches 0 files
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:197)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140)
    at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
    at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
    at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.rdd.UnionRDD$anonfun$1.apply(UnionRDD.scala:52)
    at org.apache.spark.rdd.UnionRDD$anonfun$1.apply(UnionRDD.scala:52)
    at scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:52)
    at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:58)
    at org.apache.spark.api.java.JavaPairRDD.reduceByKey(JavaPairRDD.scala:335)
    ... 2 more

Я бы хотел, чтобы способ игнорировать отсутствующие файлы и просто ничего не делать в этом случае. Проблема здесь IMO заключается в том, что я не знаю, будет ли шаблон возвращать что-то, пока он на самом деле не будет выполнен, и спарк начинает обрабатывать данные только тогда, когда происходит действие (здесь, часть reduceByKey). Так что я не могу просто поймать ошибку где-нибудь и позволить вещам продолжаться.

Одним из решений было бы заставить форвард обрабатывать каждый путь в отдельности, но это, вероятно, будет стоить времени с точки зрения скорости и / или памяти, поэтому я ищу другой вариант, который был бы эффективен.

Я использую искру 0.9.1. Благодаря

10
задан Crystark 21 May 2014 в 13:00
поделиться