ssc.filestream не может прочитать уже существующие файлы в каталоге [duplicate]

Многие объяснения уже присутствуют, чтобы объяснить, как это происходит и как это исправить, но вы также должны следовать рекомендациям, чтобы избежать NullPointerException вообще.

См. также: A хороший список лучших практик

Я бы добавил, очень важно, хорошо использовать модификатор final. Использование "окончательной" модификатор, когда это применимо в Java

Сводка:

  1. Используйте модификатор final для обеспечения хорошей инициализации.
  2. Избегайте возврата null в методы, например, при возврате пустых коллекций.
  3. Использовать аннотации @NotNull и @Nullable
  4. Быстрое завершение работы и использование утверждений, чтобы избежать распространения нулевых объектов через все приложение, когда они не должен быть пустым.
  5. Сначала используйте значения с известным объектом: if("knownObject".equals(unknownObject)
  6. Предпочитают valueOf() поверх toString ().
  7. Используйте null safe StringUtils StringUtils.isEmpty(null).

3
задан sophie 14 March 2015 в 11:04
поделиться

3 ответа

Ожидаете ли вы, что Spark будет читать файлы уже в каталоге? Если это так, это распространенное заблуждение, которое застало меня врасплох. textFileStream просматривает каталог для новых файлов, затем он их считывает. Он игнорирует файлы уже в каталоге при запуске или файлы, которые он уже прочитал.

Обоснование заключается в том, что у вас будет процесс записи файлов в HDFS, тогда вы захотите, чтобы Spark их прочитал. Обратите внимание, что эти файлы много появляются атомарно, например, они медленно записываются где-то в другом месте, затем перемещает в наблюдаемый каталог. Это связано с тем, что HDFS неправильно обрабатывает чтение и запись файла одновременно.

3
ответ дан Dean Wampler 19 August 2018 в 18:29
поделиться

Как сказал Дин, textFileStream использует по умолчанию только использование новых файлов.

  def textFileStream(directory: String): DStream[String] = {
    fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
  }

Итак, все, что он делает, вызывает этот вариант fileStream

def fileStream[
    K: ClassTag,
    V: ClassTag,
    F <: NewInputFormat[K, V]: ClassTag
  ] (directory: String): InputDStream[(K, V)] = {
    new FileInputDStream[K, V, F](this, directory)
  }

И, посмотрев на класс FileInputDStream, мы увидим, что он действительно может искать существующие файлы, но по умолчанию только новый:

newFilesOnly: Boolean = true,

Итак, возвращаясь в код StreamingContext, мы может видеть, что есть и перегрузка, которую мы можем использовать, напрямую вызывая метод fileStream:

def fileStream[
 K: ClassTag,
 V: ClassTag,
 F <: NewInputFormat[K, V]: ClassTag] 
(directory: String, filter: Path => Boolean, newFilesOnly: Boolean):InputDStream[(K, V)] = {
  new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
}

Итак, TL; DR; является

ssc.fileStream[LongWritable, Text, TextInputFormat]
    (directory, FileInputDStream.defaultFilter, false).map(_._2.toString)
7
ответ дан Justin Pihony 19 August 2018 в 18:29
поделиться
  • 1
    привет Джастину, спасибо за это. Но я могу читать только старые файлы, которые я разместил там в течение последних 5 секунд, прежде чем запустить работу Spark. Есть ли настройка, которую мне нужно обновить? Это часть журналов, я не уверен, что это имеет какое-то отношение к ней: INFO dstream.FileInputDStream: время слайда = 10000 мс, и это мой новый код: val hdfsDStream = ssc.fileStream [LongWritable, Text, TextInputFormat ] («hdfs: //sandbox.hortonworks.com/user/root/logs" ;, (t: Path) = & gt; true, newFilesOnly = false) .map (_._ 2.toString) – sophie 14 March 2015 в 16:00
  • 2
    @sophie Я абсолютно уверен, что это ошибка. Однако, врываясь в это ... похоже, что наивное исправление ломается. Суть в том, что они делают max, где они должны делать min .... но я думаю, что файлы могут закончиться обработкой снова и снова, если вы исправите только эту часть .... Я отправлю ошибку когда-нибудь в эти выходные и опубликуйте его. – Justin Pihony 14 March 2015 в 20:59
  • 3
    привет Джастину, знаете ли вы, если это уже исправлено? Я до сих пор не могу заставить его работать. – sophie 4 April 2015 в 15:38
  • 4
    Я все еще скептически отношусь к этой проблеме, но issues.apache.org/jira/browse/SPARK-6061 – Justin Pihony 4 April 2015 в 19:19
val filterF = new Function[Path, Boolean] {
    def apply(x: Path): Boolean = {
      println("looking if "+x+" to be consider or not")
      val flag = if(x.toString.split("/").last.split("_").last.toLong < System.currentTimeMillis){ println("considered "+x); list += x.toString; true}
       else{ false }
      return flag
    }
}

эта функция фильтра используется, чтобы определить, является ли каждый путь фактически тем, который вам выбран. поэтому функция внутри приложения должна быть настроена согласно вашему требованию.

val streamed_rdd = ssc.fileStream[LongWritable, Text, TextInputFormat]("/user/hdpprod/temp/spark_streaming_output",filterF,false).map{case (x, y) => (y.toString)}

теперь вам нужно установить третью переменную функции filestream в false, чтобы убедиться, что не только новые файлы, но и рассмотрите старые существующие файлы в потоковой директории.

0
ответ дан Smittey 19 August 2018 в 18:29
поделиться
Другие вопросы по тегам:

Похожие вопросы: