используете ли вы конфигурацию SMTP для отправки своей электронной почты? попробуйте вместо этого использовать phpmailer. вы можете загрузить библиотеку из https://github.com/PHPMailer/PHPMailer . Я создал электронную почту, отправив этот путь:
function send_mail($email, $recipient_name, $message='')
{
require("phpmailer/class.phpmailer.php");
$mail = new PHPMailer();
$mail->CharSet="utf-8";
$mail->IsSMTP(); // set mailer to use SMTP
$mail->Host = "mail.example.com"; // specify main and backup server
$mail->SMTPAuth = true; // turn on SMTP authentication
$mail->Username = "myusername"; // SMTP username
$mail->Password = "p@ssw0rd"; // SMTP password
$mail->From = "me@walalang.com";
$mail->FromName = "System-Ad";
$mail->AddAddress($email, $recipient_name);
$mail->WordWrap = 50; // set word wrap to 50 characters
$mail->IsHTML(true); // set email format to HTML (true) or plain text (false)
$mail->Subject = "This is a Sampleenter code here Email";
$mail->Body = $message;
$mail->AltBody = "This is the body in plain text for non-HTML mail clients";
$mail->AddEmbeddedImage('images/logo.png', 'logo', 'logo.png');
$mail->addAttachment('files/file.xlsx');
if(!$mail->Send())
{
echo "Message could not be sent. ";
echo "Mailer Error: " . $mail->ErrorInfo;
exit;
}
echo "Message has been sent";
}
Я не мог найти простой способ добиться этого. Но после нескольких повторений попыток это то, что я сделал, и его работа над огромным списком запросов. В основном мы использовали это для пакетной операции для огромного запроса в нескольких подзапросах.
// Break down your huge workload into smaller chunks, in this case huge query string is broken
// down to a small set of subqueries
// Here if needed to optimize further down, you can provide an optimal partition when parallelizing
val queries = sqlContext.sparkContext.parallelize[String](subQueryList.toSeq)
// Then map each one those to a Spark Task, in this case its a Future that returns a string
val tasks: RDD[Future[String]] = queries.map(query => {
val task = makeHttpCall(query) // Method returns http call response as a Future[String]
task.recover {
case ex => logger.error("recover: " + ex.printStackTrace()) }
task onFailure {
case t => logger.error("execution failed: " + t.getMessage) }
task
})
// Note:: Http call is still not invoked, you are including this as part of the lineage
// Then in each partition you combine all Futures (means there could be several tasks in each partition) and sequence it
// And Await for the result, in this way you making it to block untill all the future in that sequence is resolved
val contentRdd = tasks.mapPartitions[String] { f: Iterator[Future[String]] =>
val searchFuture: Future[Iterator[String]] = Future sequence f
Await.result(searchFuture, threadWaitTime.seconds)
}
// Note: At this point, you can do any transformations on this rdd and it will be appended to the lineage.
// When you perform any action on that Rdd, then at that point,
// those mapPartition process will be evaluated to find the tasks and the subqueries to perform a full parallel http requests and
// collect those data in a single rdd.
Если вы не хотите выполнять какие-либо преобразования на контенте, такие как разбор полезной нагрузки ответа и т. д. Затем вы можете использовать foreachPartition
вместо mapPartitions
для немедленного выполнения всех этих http-вызовов.
Это не работает.
Вы не можете ожидать, что объекты запроса будут распределены, а ответы будут собраны по кластеру другими узлами. Если вы это сделаете, то искра призывает к будущему никогда не закончится.
Если ваша карта () выполняет запросы синхронизации (http), то, пожалуйста, собирайте ответы в одном и том же вызове действий / преобразовании, а затем укажите результаты (ответы) для дальнейшей карты / уменьшить / другие вызовы.
В вашем случае перепишите логику, чтобы собрать ответы для каждого вызова в синхронизации и удалить понятие фьючерсов, тогда все должно быть хорошо.
requests
и responses
не должно перемещаться данные, поэтому оба преобразования должны выполняться на одном и том же этапе, отсюда одни и те же исполнители и контексты.
– zero323
10 March 2016 в 14:06
Я, наконец, сделал это с помощью scalaj-http вместо Dispatch. Вызов синхронный, но это соответствует моему варианту использования.
Я думаю, что Spark Job никогда не заканчивает использование Dispatch, потому что соединение Http не было закрыто должным образом.
С наилучшими пожеланиями
этот пример использования выглядит довольно часто
blockquote>Не совсем, потому что он просто не работает, как вы (вероятно) ожидаете. Поскольку каждая задача работает на стандартной Scala
Iterators
, эти операции будут раздавлены вместе. Это означает, что все операции будут блокироваться на практике. Предполагая, что у вас есть три URL-адреса ["x", "y", "z"], код будет выполнен в следующем порядке:Await.result(httpCall("x", 10.seconds)) Await.result(httpCall("y", 10.seconds)) Await.result(httpCall("z", 10.seconds))
Вы можете легко воспроизвести одно и то же поведение локально. Если вы хотите выполнить свой асинхронный код, вы должны обработать это явно с помощью
mapPartitions
:rdd.mapPartitions(iter => { ??? // Submit requests ??? // Wait until all requests completed and return Iterator of results })
, но это относительно сложно. Нет гарантии, что все данные для данного раздела вписываются в память, поэтому вам, вероятно, понадобится также механизм пакетной обработки.
Все сказанное, что я не могу воспроизвести описанную вами проблему, может быть проблемой конфигурации или проблемой с самой
httpCall
.На боковой ноте, позволяющей один тайм-аут убить целую задачу, не выглядит хорошей идеей.