Обработка одновременно в Scala

Рекомендовал бы ознакомиться с сопоставлением с образцом и рекурсивными ADT в целом, чтобы лучше понять, почему Play Json рассматривает JSON как «гражданина первого класса».

При этом многие API-интерфейсы на основе Java (например, библиотеки Google Java) ожидают десериализации JSON как Map[String, Object]. Хотя вы можете очень просто создать свою собственную функцию, которая рекурсивно генерирует этот объект с сопоставлением с шаблоном, простейшим решением, вероятно, будет использование следующего существующего шаблона:

import com.google.gson.Gson
import java.util.{Map => JMap, LinkedHashMap}

val gson = new Gson()

def decode(encoded: String): JMap[String, Object] =   
   gson.fromJson(encoded, (new LinkedHashMap[String, Object]()).getClass)

LinkedHashMap используется, если вы хотите сохранить порядок ключей во время десериализации (HashMap может использоваться, если порядок не имеет значения). Полный пример здесь .

8
задан Community 23 May 2017 в 09:57
поделиться

4 ответа

Это похоже на дубликат другого вопроса. Так что я продублирую свой ответ

Актеры обрабатывают одно сообщение за раз. Классический шаблон для обработки нескольких сообщений - наличие одного актора-координатора для пула субъектов-потребителей. Если вы используете реакцию, тогда пул потребителей может быть большим, но по-прежнему будет использовать только небольшое количество потоков JVM. Вот пример, в котором я создаю пул из 10 потребителей и одного координатора для их поддержки.

import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) => 
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop => 
           consumers foreach {_ ! Stop} 
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

Этот код проверяет, какой потребитель доступен, и отправляет запрос этому потребителю. Альтернативы - просто случайным образом назначить потребителей или использовать циклический планировщик.

В зависимости от того, что вы делаете, вам лучше подойдет Scala's Futures. Например, если вы не Если действительно нужны актеры, то весь вышеупомянутый механизм можно записать как

import scala.actors.Futures._

def transform(payload : String) = {      
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))
8
ответ дан 5 December 2019 в 12:12
поделиться

Назначение актора (ну, одного из них) - гарантировать, что к состоянию внутри актера можно получить доступ только для одного потока за раз. Если обработка сообщения не зависит от какого-либо изменяемого состояния внутри актора, то, вероятно, было бы более подходящим просто отправить задачу в планировщик или пул потоков для обработки. Дополнительная абстракция, которую предоставляет актор, на самом деле мешает вам.

Для этого есть удобные методы в scala.actors.Scheduler, или вы можете использовать Executor из java.util.concurrent.

1
ответ дан 5 December 2019 в 12:12
поделиться

Если все события могут обрабатываться независимо, почему они находятся в очереди? Ничего не зная о своем дизайне, это кажется ненужным шагом. Если бы вы могли составить функцию process из того, что запускает эти события, вы потенциально могли бы избежать очереди.

Актер по сути является параллельным эффектом, снабженным очередью. Если вы хотите обрабатывать несколько сообщений одновременно, на самом деле вам не нужен актер. Вы просто хотите, чтобы функция (Any => ()) была запланирована для выполнения в удобное время.

Сказав это, ваш подход является разумным , если вы хотите оставаться в пределах библиотеки акторов и если очередь событий находится вне вашего контроля.

Скалаз делает различие между Актерами и параллельными эффектами. В то время как его Actor очень легкий, scalaz.concurrent.Effect еще легче. Вот ваш код, примерно переведенный в библиотеку Scalaz:

val eventProcessor = effect (x => process x)

Это последний заголовок ствола, еще не выпущенный.

3
ответ дан 5 December 2019 в 12:12
поделиться

Звучит как простая проблема потребителя / производителя. Я бы использовал очередь с пулом потребителей. Вероятно, вы могли бы написать это с помощью нескольких строк кода, используя java.util.concurrent.

1
ответ дан 5 December 2019 в 12:12
поделиться
Другие вопросы по тегам:

Похожие вопросы: