Если вы используете только функцию mail()
, вам нужно заполнить конфигурационный файл.
Вам нужно открыть расширение почты и установить SMTP smtp_port
и т. д., и самое главное, ваше имя пользователя и пароль. Без этого почта не может быть отправлена. Кроме того, вы можете использовать класс PHPMail для отправки.
Я не мог найти простой способ добиться этого. Но после нескольких повторений попыток это то, что я сделал, и его работа над огромным списком запросов. В основном мы использовали это для пакетной операции для огромного запроса в нескольких подзапросах.
// Break down your huge workload into smaller chunks, in this case huge query string is broken
// down to a small set of subqueries
// Here if needed to optimize further down, you can provide an optimal partition when parallelizing
val queries = sqlContext.sparkContext.parallelize[String](subQueryList.toSeq)
// Then map each one those to a Spark Task, in this case its a Future that returns a string
val tasks: RDD[Future[String]] = queries.map(query => {
val task = makeHttpCall(query) // Method returns http call response as a Future[String]
task.recover {
case ex => logger.error("recover: " + ex.printStackTrace()) }
task onFailure {
case t => logger.error("execution failed: " + t.getMessage) }
task
})
// Note:: Http call is still not invoked, you are including this as part of the lineage
// Then in each partition you combine all Futures (means there could be several tasks in each partition) and sequence it
// And Await for the result, in this way you making it to block untill all the future in that sequence is resolved
val contentRdd = tasks.mapPartitions[String] { f: Iterator[Future[String]] =>
val searchFuture: Future[Iterator[String]] = Future sequence f
Await.result(searchFuture, threadWaitTime.seconds)
}
// Note: At this point, you can do any transformations on this rdd and it will be appended to the lineage.
// When you perform any action on that Rdd, then at that point,
// those mapPartition process will be evaluated to find the tasks and the subqueries to perform a full parallel http requests and
// collect those data in a single rdd.
Если вы не хотите выполнять какие-либо преобразования на контенте, такие как разбор полезной нагрузки ответа и т. д. Затем вы можете использовать foreachPartition
вместо mapPartitions
для немедленного выполнения всех этих http-вызовов.
Это не работает.
Вы не можете ожидать, что объекты запроса будут распределены, а ответы будут собраны по кластеру другими узлами. Если вы это сделаете, то искра призывает к будущему никогда не закончится.
Если ваша карта () выполняет запросы синхронизации (http), то, пожалуйста, собирайте ответы в одном и том же вызове действий / преобразовании, а затем укажите результаты (ответы) для дальнейшей карты / уменьшить / другие вызовы.
В вашем случае перепишите логику, чтобы собрать ответы для каждого вызова в синхронизации и удалить понятие фьючерсов, тогда все должно быть хорошо.
requests
и responses
не должно перемещаться данные, поэтому оба преобразования должны выполняться на одном и том же этапе, отсюда одни и те же исполнители и контексты.
– zero323
10 March 2016 в 14:06
Я, наконец, сделал это с помощью scalaj-http вместо Dispatch. Вызов синхронный, но это соответствует моему варианту использования.
Я думаю, что Spark Job никогда не заканчивает использование Dispatch, потому что соединение Http не было закрыто должным образом.
С наилучшими пожеланиями
этот пример использования выглядит довольно часто
blockquote>Не совсем, потому что он просто не работает, как вы (возможно) ожидаете. Поскольку каждая задача работает на стандартной Scala
Iterators
, эти операции будут раздавлены вместе. Это означает, что все операции будут блокироваться на практике. Предполагая, что у вас есть три URL-адреса ["x", "y", "z"], код будет выполнен в следующем порядке:Await.result(httpCall("x", 10.seconds)) Await.result(httpCall("y", 10.seconds)) Await.result(httpCall("z", 10.seconds))
Вы можете легко воспроизвести одно и то же поведение локально. Если вы хотите выполнить свой асинхронный код, вы должны обработать это явно с помощью
mapPartitions
:rdd.mapPartitions(iter => { ??? // Submit requests ??? // Wait until all requests completed and return Iterator of results })
, но это относительно сложно. Нет гарантии, что все данные для данного раздела вписываются в память, поэтому вам, вероятно, понадобится также механизм пакетной обработки.
Все сказанное, что я не могу воспроизвести описанную вами проблему, может быть проблемой конфигурации или проблемой с самой
httpCall
.На боковой ноте, позволяющей один тайм-аут убить целую задачу, не выглядит хорошей идеей.