val filterF = new Function[Path, Boolean] {
def apply(x: Path): Boolean = {
println("looking if "+x+" to be consider or not")
val flag = if(x.toString.split("/").last.split("_").last.toLong < System.currentTimeMillis){ println("considered "+x); list += x.toString; true}
else{ false }
return flag
}
}
эта функция фильтра используется, чтобы определить, является ли каждый путь фактически тем, который вам выбран. поэтому функция внутри приложения должна быть настроена согласно вашему требованию.
val streamed_rdd = ssc.fileStream[LongWritable, Text, TextInputFormat]("/user/hdpprod/temp/spark_streaming_output",filterF,false).map{case (x, y) => (y.toString)}
теперь вам нужно установить третью переменную функции filestream в false, чтобы убедиться, что не только новые файлы, но и рассмотрите старые существующие файлы в потоковой директории.