Задача Spark httpclient не параллельна [дублировать]

Если вы используете только функцию mail(), вам нужно заполнить конфигурационный файл.

Вам нужно открыть расширение почты и установить SMTP smtp_port и т. д., и самое главное, ваше имя пользователя и пароль. Без этого почта не может быть отправлена. Кроме того, вы можете использовать класс PHPMail для отправки.

7
задан zero323 9 March 2016 в 21:46
поделиться

4 ответа

Я не мог найти простой способ добиться этого. Но после нескольких повторений попыток это то, что я сделал, и его работа над огромным списком запросов. В основном мы использовали это для пакетной операции для огромного запроса в нескольких подзапросах.

// Break down your huge workload into smaller chunks, in this case huge query string is broken 
// down to a small set of subqueries
// Here if needed to optimize further down, you can provide an optimal partition when parallelizing
val queries = sqlContext.sparkContext.parallelize[String](subQueryList.toSeq)

// Then map each one those to a Spark Task, in this case its a Future that returns a string
val tasks: RDD[Future[String]] = queries.map(query => {
    val task = makeHttpCall(query) // Method returns http call response as a Future[String]
    task.recover { 
        case ex => logger.error("recover: " + ex.printStackTrace()) }
    task onFailure {
        case t => logger.error("execution failed: " + t.getMessage) }
    task
})

// Note:: Http call is still not invoked, you are including this as part of the lineage

// Then in each partition you combine all Futures (means there could be several tasks in each partition) and sequence it
// And Await for the result, in this way you making it to block untill all the future in that sequence is resolved

val contentRdd = tasks.mapPartitions[String] { f: Iterator[Future[String]] =>
   val searchFuture: Future[Iterator[String]] = Future sequence f
   Await.result(searchFuture, threadWaitTime.seconds)
}

// Note: At this point, you can do any transformations on this rdd and it will be appended to the lineage. 
// When you perform any action on that Rdd, then at that point, 
// those mapPartition process will be evaluated to find the tasks and the subqueries to perform a full parallel http requests and 
// collect those data in a single rdd. 

Если вы не хотите выполнять какие-либо преобразования на контенте, такие как разбор полезной нагрузки ответа и т. д. Затем вы можете использовать foreachPartition вместо mapPartitions для немедленного выполнения всех этих http-вызовов.

2
ответ дан raksja 21 August 2018 в 07:02
поделиться

Это не работает.

Вы не можете ожидать, что объекты запроса будут распределены, а ответы будут собраны по кластеру другими узлами. Если вы это сделаете, то искра призывает к будущему никогда не закончится.

Если ваша карта () выполняет запросы синхронизации (http), то, пожалуйста, собирайте ответы в одном и том же вызове действий / преобразовании, а затем укажите результаты (ответы) для дальнейшей карты / уменьшить / другие вызовы.

В вашем случае перепишите логику, чтобы собрать ответы для каждого вызова в синхронизации и удалить понятие фьючерсов, тогда все должно быть хорошо.

1
ответ дан user6044522 21 August 2018 в 07:02
поделиться
  • 1
    Проблема в том, что между requests и responses не должно перемещаться данные, поэтому оба преобразования должны выполняться на одном и том же этапе, отсюда одни и те же исполнители и контексты. – zero323 10 March 2016 в 14:06

Я, наконец, сделал это с помощью scalaj-http вместо Dispatch. Вызов синхронный, но это соответствует моему варианту использования.

Я думаю, что Spark Job никогда не заканчивает использование Dispatch, потому что соединение Http не было закрыто должным образом.

С наилучшими пожеланиями

1
ответ дан Vincent Spiewak 21 August 2018 в 07:02
поделиться

этот пример использования выглядит довольно часто

Не совсем, потому что он просто не работает, как вы (возможно) ожидаете. Поскольку каждая задача работает на стандартной Scala Iterators, эти операции будут раздавлены вместе. Это означает, что все операции будут блокироваться на практике. Предполагая, что у вас есть три URL-адреса ["x", "y", "z"], код будет выполнен в следующем порядке:

Await.result(httpCall("x", 10.seconds))
Await.result(httpCall("y", 10.seconds))
Await.result(httpCall("z", 10.seconds))

Вы можете легко воспроизвести одно и то же поведение локально. Если вы хотите выполнить свой асинхронный код, вы должны обработать это явно с помощью mapPartitions:

rdd.mapPartitions(iter => {
  ??? // Submit requests
  ??? // Wait until all requests completed and return Iterator of results
})

, но это относительно сложно. Нет гарантии, что все данные для данного раздела вписываются в память, поэтому вам, вероятно, понадобится также механизм пакетной обработки.

Все сказанное, что я не могу воспроизвести описанную вами проблему, может быть проблемой конфигурации или проблемой с самой httpCall.

На боковой ноте, позволяющей один тайм-аут убить целую задачу, не выглядит хорошей идеей.

7
ответ дан zero323 21 August 2018 в 07:02
поделиться
Другие вопросы по тегам:

Похожие вопросы: