Многие объяснения уже присутствуют, чтобы объяснить, как это происходит и как это исправить, но вы также должны следовать рекомендациям, чтобы избежать NullPointerException
вообще.
См. также: A хороший список лучших практик
Я бы добавил, очень важно, хорошо использовать модификатор final
. Использование "окончательной" модификатор, когда это применимо в Java
Сводка:
final
для обеспечения хорошей инициализации. @NotNull
и @Nullable
if("knownObject".equals(unknownObject)
valueOf()
поверх toString (). StringUtils
StringUtils.isEmpty(null)
. Ожидаете ли вы, что Spark будет читать файлы уже в каталоге? Если это так, это распространенное заблуждение, которое застало меня врасплох. textFileStream
просматривает каталог для новых файлов, затем он их считывает. Он игнорирует файлы уже в каталоге при запуске или файлы, которые он уже прочитал.
Обоснование заключается в том, что у вас будет процесс записи файлов в HDFS, тогда вы захотите, чтобы Spark их прочитал. Обратите внимание, что эти файлы много появляются атомарно, например, они медленно записываются где-то в другом месте, затем перемещает в наблюдаемый каталог. Это связано с тем, что HDFS неправильно обрабатывает чтение и запись файла одновременно.
Как сказал Дин, textFileStream использует по умолчанию только использование новых файлов.
def textFileStream(directory: String): DStream[String] = {
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}
Итак, все, что он делает, вызывает этот вариант fileStream
def fileStream[
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory)
}
И, посмотрев на класс FileInputDStream
, мы увидим, что он действительно может искать существующие файлы, но по умолчанию только новый:
newFilesOnly: Boolean = true,
Итак, возвращаясь в код StreamingContext
, мы может видеть, что есть и перегрузка, которую мы можем использовать, напрямую вызывая метод fileStream
:
def fileStream[
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag]
(directory: String, filter: Path => Boolean, newFilesOnly: Boolean):InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
}
ssc.fileStream[LongWritable, Text, TextInputFormat]
(directory, FileInputDStream.defaultFilter, false).map(_._2.toString)
max
, где они должны делать min
.... но я думаю, что файлы могут закончиться обработкой снова и снова, если вы исправите только эту часть .... Я отправлю ошибку когда-нибудь в эти выходные и опубликуйте его.
– Justin Pihony
14 March 2015 в 20:59
val filterF = new Function[Path, Boolean] {
def apply(x: Path): Boolean = {
println("looking if "+x+" to be consider or not")
val flag = if(x.toString.split("/").last.split("_").last.toLong < System.currentTimeMillis){ println("considered "+x); list += x.toString; true}
else{ false }
return flag
}
}
эта функция фильтра используется, чтобы определить, является ли каждый путь фактически тем, который вам выбран. поэтому функция внутри приложения должна быть настроена согласно вашему требованию.
val streamed_rdd = ssc.fileStream[LongWritable, Text, TextInputFormat]("/user/hdpprod/temp/spark_streaming_output",filterF,false).map{case (x, y) => (y.toString)}
теперь вам нужно установить третью переменную функции filestream в false, чтобы убедиться, что не только новые файлы, но и рассмотрите старые существующие файлы в потоковой директории.