Это больше идея, чем полное решение, и я еще не тестировал ее.
Вы можете начать с извлечения конвейера обработки данных в функцию.
def pipeline(f: String, n: Int) = {
sqlContext
.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load(f)
.repartition(n)
.groupBy(...)
.agg(...)
.cache // Cache so we can force computation later
}
Если ваши файлы малы, вы можете отрегулировать параметр n
, чтобы использовать как можно меньшее количество разделов для соответствия данным из одного файла и избегать перетасовки. Это означает, что вы ограничиваете параллелизм, но мы вернемся к этой проблеме позже.
val n: Int = ???
Затем вам нужно получить список входных файлов. Этот шаг зависит от источника данных, но большую часть времени он более или менее прост:
val files: Array[String] = ???
Затем вы можете сопоставить вышеуказанный список с помощью функции pipeline
:
val rdds = files.map(f => pipeline(f, n))
Поскольку мы ограничиваем параллелизм на уровне одного файла, который мы хотим компенсировать, отправив несколько заданий. Давайте добавим простой помощник, который заставляет оценивать и обертывает его с помощью Future
import scala.concurrent._
import ExecutionContext.Implicits.global
def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
df.rdd.foreach(_ => ()) // Force computation
df
}
. Наконец, мы можем использовать вспомогательный помощник на rdds
:
val result = Future.sequence(
rdds.map(rdd => pipelineToFuture(rdd)).toList
)
В зависимости от вашего требования вы можете добавить onComplete
обратные вызовы или использовать реактивные потоки для сбора результатов.
Нет способа сделать это с ванильными запросами.
Существует не совсем идеальный способ сделать это с мыслью о ремне.
from requests_toolbelt.multipart import encoder
mpe = encoder.MultipartEncoder(fields={'RequestData': (None, RequestData), 'TtsParameter': (None, TtsParameter_TEXT_TO_READ)})
for part in mpe.parts:
if 'name="TtsParameter"' in part.headers:
part.headers = part.headers.replace('name="TtsParameter"',
'name="TtsParameter"; paramName="TEXT_TO_READ"')
headers.update({'Content-Type': mpe.content_type})
requests.post(url, headers=headers, data=mpe)