Чтобы применить ваше регулярное выражение к каждому элементу в RDD, вы должны использовать функцию RDD map
, которая преобразует каждую строку в RDD с использованием некоторой функции (в данном случае частичной функции для извлечения в две части кортеж, который составляет каждую строку):
import org.apache.spark.{SparkContext, SparkConf}
object Example extends App {
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("Example"))
val data = Seq(
("BMW 1er Model",278),
("MINI Cooper Model",248))
val dataRDD = sc.parallelize(data)
val processedRDD = dataRDD.map{
case (inString, inInt) =>
val brandRegEx = """^.*[Bb][Mm][Ww]+|.[Mm][Ii][Nn][Ii]+.*$""".r
val brand = brandRegEx.findFirstIn(inString)
//val seriesRegEx = ...
//val series = seriesRegEx.findFirstIn(inString)
val series = "foo"
(inString, inInt, brand, series)
}
processedRDD.collect().foreach(println)
sc.stop()
}
Обратите внимание: я думаю, что у вас есть некоторые проблемы в вашем регулярном выражении, и вам также нужно регулярное выражение для поиска серии. Этот код выводит:
(BMW 1er Model,278,BMW,foo)
(MINI Cooper Model,248,NOT FOUND,foo)
Но если вы исправляете свои регулярные выражения для своих нужд, вы можете применить их к каждой строке.
Простое, но неэффективное решение см. В ответе @pasabaporaqui на Список рекурсивно всех файлов на sftp .
С вашим неясным ограничением единственное решение, которое использует одно соединение, будет:
sftp
в Python ls
команды для него, по одной для каждого каталога ls
для каждого найденного подкаталога.