Конвертировать HTTP-вызов синхронизации в асинхронный вызов в искровом [дубликат]

используете ли вы конфигурацию SMTP для отправки своей электронной почты? попробуйте вместо этого использовать phpmailer. вы можете загрузить библиотеку из https://github.com/PHPMailer/PHPMailer . Я создал электронную почту, отправив этот путь:

function send_mail($email, $recipient_name, $message='')
{
    require("phpmailer/class.phpmailer.php");

    $mail = new PHPMailer();

    $mail->CharSet="utf-8";
    $mail->IsSMTP();                                      // set mailer to use SMTP
    $mail->Host = "mail.example.com";  // specify main and backup server
    $mail->SMTPAuth = true;     // turn on SMTP authentication
    $mail->Username = "myusername";  // SMTP username
    $mail->Password = "p@ssw0rd"; // SMTP password

    $mail->From = "me@walalang.com";
    $mail->FromName = "System-Ad";
    $mail->AddAddress($email, $recipient_name);

    $mail->WordWrap = 50;                                 // set word wrap to 50 characters
    $mail->IsHTML(true);                                  // set email format to HTML (true) or plain text (false)

    $mail->Subject = "This is a Sampleenter code here Email";
    $mail->Body    = $message;
    $mail->AltBody = "This is the body in plain text for non-HTML mail clients";    
    $mail->AddEmbeddedImage('images/logo.png', 'logo', 'logo.png');
    $mail->addAttachment('files/file.xlsx');

    if(!$mail->Send())
    {
       echo "Message could not be sent. 

"; echo "Mailer Error: " . $mail->ErrorInfo; exit; } echo "Message has been sent"; }

7
задан zero323 9 March 2016 в 21:46
поделиться

4 ответа

Я не мог найти простой способ добиться этого. Но после нескольких повторений попыток это то, что я сделал, и его работа над огромным списком запросов. В основном мы использовали это для пакетной операции для огромного запроса в нескольких подзапросах.

// Break down your huge workload into smaller chunks, in this case huge query string is broken 
// down to a small set of subqueries
// Here if needed to optimize further down, you can provide an optimal partition when parallelizing
val queries = sqlContext.sparkContext.parallelize[String](subQueryList.toSeq)

// Then map each one those to a Spark Task, in this case its a Future that returns a string
val tasks: RDD[Future[String]] = queries.map(query => {
    val task = makeHttpCall(query) // Method returns http call response as a Future[String]
    task.recover { 
        case ex => logger.error("recover: " + ex.printStackTrace()) }
    task onFailure {
        case t => logger.error("execution failed: " + t.getMessage) }
    task
})

// Note:: Http call is still not invoked, you are including this as part of the lineage

// Then in each partition you combine all Futures (means there could be several tasks in each partition) and sequence it
// And Await for the result, in this way you making it to block untill all the future in that sequence is resolved

val contentRdd = tasks.mapPartitions[String] { f: Iterator[Future[String]] =>
   val searchFuture: Future[Iterator[String]] = Future sequence f
   Await.result(searchFuture, threadWaitTime.seconds)
}

// Note: At this point, you can do any transformations on this rdd and it will be appended to the lineage. 
// When you perform any action on that Rdd, then at that point, 
// those mapPartition process will be evaluated to find the tasks and the subqueries to perform a full parallel http requests and 
// collect those data in a single rdd. 

Если вы не хотите выполнять какие-либо преобразования на контенте, такие как разбор полезной нагрузки ответа и т. д. Затем вы можете использовать foreachPartition вместо mapPartitions для немедленного выполнения всех этих http-вызовов.

2
ответ дан raksja 20 August 2018 в 22:59
поделиться

Это не работает.

Вы не можете ожидать, что объекты запроса будут распределены, а ответы будут собраны по кластеру другими узлами. Если вы это сделаете, то искра призывает к будущему никогда не закончится.

Если ваша карта () выполняет запросы синхронизации (http), то, пожалуйста, собирайте ответы в одном и том же вызове действий / преобразовании, а затем укажите результаты (ответы) для дальнейшей карты / уменьшить / другие вызовы.

В вашем случае перепишите логику, чтобы собрать ответы для каждого вызова в синхронизации и удалить понятие фьючерсов, тогда все должно быть хорошо.

1
ответ дан user6044522 20 August 2018 в 22:59
поделиться
  • 1
    Проблема в том, что между requests и responses не должно перемещаться данные, поэтому оба преобразования должны выполняться на одном и том же этапе, отсюда одни и те же исполнители и контексты. – zero323 10 March 2016 в 14:06

Я, наконец, сделал это с помощью scalaj-http вместо Dispatch. Вызов синхронный, но это соответствует моему варианту использования.

Я думаю, что Spark Job никогда не заканчивает использование Dispatch, потому что соединение Http не было закрыто должным образом.

С наилучшими пожеланиями

1
ответ дан Vincent Spiewak 20 August 2018 в 22:59
поделиться

этот пример использования выглядит довольно часто

Не совсем, потому что он просто не работает, как вы (вероятно) ожидаете. Поскольку каждая задача работает на стандартной Scala Iterators, эти операции будут раздавлены вместе. Это означает, что все операции будут блокироваться на практике. Предполагая, что у вас есть три URL-адреса ["x", "y", "z"], код будет выполнен в следующем порядке:

Await.result(httpCall("x", 10.seconds))
Await.result(httpCall("y", 10.seconds))
Await.result(httpCall("z", 10.seconds))

Вы можете легко воспроизвести одно и то же поведение локально. Если вы хотите выполнить свой асинхронный код, вы должны обработать это явно с помощью mapPartitions:

rdd.mapPartitions(iter => {
  ??? // Submit requests
  ??? // Wait until all requests completed and return Iterator of results
})

, но это относительно сложно. Нет гарантии, что все данные для данного раздела вписываются в память, поэтому вам, вероятно, понадобится также механизм пакетной обработки.

Все сказанное, что я не могу воспроизвести описанную вами проблему, может быть проблемой конфигурации или проблемой с самой httpCall.

На боковой ноте, позволяющей один тайм-аут убить целую задачу, не выглядит хорошей идеей.

7
ответ дан zero323 20 August 2018 в 22:59
поделиться
Другие вопросы по тегам:

Похожие вопросы: