Как составить таблицу фактов PK

val filterF = new Function[Path, Boolean] {
    def apply(x: Path): Boolean = {
      println("looking if "+x+" to be consider or not")
      val flag = if(x.toString.split("/").last.split("_").last.toLong < System.currentTimeMillis){ println("considered "+x); list += x.toString; true}
       else{ false }
      return flag
    }
}

эта функция фильтра используется, чтобы определить, является ли каждый путь фактически тем, который вам выбран. поэтому функция внутри приложения должна быть настроена согласно вашему требованию.

val streamed_rdd = ssc.fileStream[LongWritable, Text, TextInputFormat]("/user/hdpprod/temp/spark_streaming_output",filterF,false).map{case (x, y) => (y.toString)}

теперь вам нужно установить третью переменную функции filestream в false, чтобы убедиться, что не только новые файлы, но и рассмотрите старые существующие файлы в потоковой директории.

0
задан FatCharlie 16 January 2019 в 10:05
поделиться