Это вопрос, который я уже задавал в списке рассылки spark, и я надеюсь добиться большего успеха здесь.
Я не уверен, что это напрямую связано с искрой, хотя искра имеет какое-то отношение к тому факту, что я не могу легко решить эту проблему.
Я пытаюсь получить некоторые файлы из S3, используя различные шаблоны. Моя проблема в том, что некоторые из этих шаблонов могут ничего не возвращать, и когда они это делают, я получаю следующее исключение:
org.apache.hadoop.mapred.InvalidInputException: Input Pattern s3n://bucket/mypattern matches 0 files
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:197)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140)
at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.rdd.UnionRDD$anonfun$1.apply(UnionRDD.scala:52)
at org.apache.spark.rdd.UnionRDD$anonfun$1.apply(UnionRDD.scala:52)
at scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:52)
at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:58)
at org.apache.spark.api.java.JavaPairRDD.reduceByKey(JavaPairRDD.scala:335)
... 2 more
Я бы хотел, чтобы способ игнорировать отсутствующие файлы и просто ничего не делать в этом случае. Проблема здесь IMO заключается в том, что я не знаю, будет ли шаблон возвращать что-то, пока он на самом деле не будет выполнен, и спарк начинает обрабатывать данные только тогда, когда происходит действие (здесь, часть reduceByKey
). Так что я не могу просто поймать ошибку где-нибудь и позволить вещам продолжаться.
Одним из решений было бы заставить форвард обрабатывать каждый путь в отдельности, но это, вероятно, будет стоить времени с точки зрения скорости и / или памяти, поэтому я ищу другой вариант, который был бы эффективен.
Я использую искру 0.9.1. Благодаря