Используя Агентов Scala для создания sth как Конвейер

Я борюсь со следующей проблемой в течение недели к настоящему времени и нуждаюсь в некотором совете.

def query(title: String): List[Search]   // query("Terminator") => ["Terminator I", "Terminator II", "Terminator 1984", etc...]

def searchIMDB(s: Search): List[SearchResult]
def searchTMDB(s: Search): List[SearchResult]

def filterRedundantSearchResults(sr: SearchResult): Option[SearchResult]

def fetchIMDB(sr: SearchResult): List[MetaInfo]
def fetchTMDB(sr: SearchResult): List[MetaInfo]

def consolidate(infos: List[MetaInfo]): List[List[MetaInfo]]

Я хочу создать Конвейер как:

query("Terminator")
-> [askIMDB, askTMDB, ...]
-> filterRedundantSearchResults (already-searched-state per query)
-> [fetchIMDB, fetchTMDB, ...]
-> consolidate                  (collected-meta-infos-state per query)
   => List[  TerminatorI-List[MetaInfo],  TerminatorII-List[MetaInfo],  ...]

До сих пор я реализовал каждый Конвейерный Сегмент как Агента. Я должен создать выделенные экземпляры агента для каждого Запроса, как некоторые из тех агентов как filterXXX и консолидировать потребность поддержать состояние на запрос.

Функции как askIMDB приводят к нескольким результатам, которые я хочу обработать одновременно (каждый к отдельному агенту). Таким образом, я не нашел способа предварительно создать целый график агентов прежде, чем выполнить запрос () и ни один изящный способ изменить его во времени выполнения.

Моя первая попытка была цепочкой агентов и передающий sth как идентификаторы транзакции в сообщениях, таким образом, каждый Агент имел Карту [TransactionID-> состояние], но это чувствовало себя довольно ужасным. Вторая попытка состояла в том, чтобы создать своего рода абстракцию диграфа агентов в один поток, но я перестал работать до сих пор.

Это - мое первое сообщение, извините если я забыл что-то, или вопрос генералу/псевдокодируемому. Любой совет очень ценится.Спасибо!

6
задан hotzen 17 December 2009 в 19:45
поделиться

1 ответ

Предлагаю вам взглянуть на ScalaQuery , который делает примерно то же самое. И он может это сделать, потому что это проблема монады. Фактически, некоторые решения Haskell, такие как Arrows, которые реализованы библиотекой Scalaz , кажутся довольно близкими.

Это было бы лучшим решением, поскольку правильная абстракция упростит внесение изменений в будущее.

В качестве хакера я прикидываю что-то вроде этого:

abstract class QueryModifiers
case object Consolidate extends QueryModifiers
// create others as appropriate

class Query(title: String) {
  self =>

  // Create actors
  def createActor(qm: QueryModifiers): Actor = {
    val actor = qm match {
      case Consolidate => // create a consolidator actor
      case //... as needed
    }
    actor.start
    actor
  }

  // The pipeline
  val pipe: List[List[QueryModifiers]] = Nil

  // Build the pipeline
  def ->(qms: List[QueryModifiers]) = new Query(title) {
    override val pipe = qms :: self.pipe
  }
  def ->(qm: QueryModifiers) = new Query(title) {
    override val pipe = List(qm) :: self.pipe
  }
  def ->(c: Consolidate.type) = {
    // Define the full pipeline
    // Because the way pipe is built, the last layer comes first, and the first comes last
    val pipeline = Consolidate :: pipe

    // Create an actor for every QueryModifier, using an unspecified createActor function
    val actors = pipeline map (_ map (createActor(_))

    // We have a list of lists of actors now, where the first element of the list
    // was the last QueryModifiers we received; so, group the layers by two, and for each
    // pair, make the second element send the result to the first.
    // Since each layer can contain many actors, make each member of the second
    // layer send the results to each member of the first layer.
    // The actors should be expecting to receive message SendResultsTo at any time.
    for {
      List(nextLayer, previousLayer) <- actors.iterator sliding 2
      nextActor <- nextLayer
      previousActor <- previousLayer
    } previousActor ! SendResultsTo(nextActor)

    // Send the query to the first layer
    for ( firstActor <- actors.last ) firstActor ! Query(title)

    // Get the result from the last layer, which is the consolidator
    val results = actors.head.head !? Results

    // Return the results
    results
  }
}

РЕДАКТИРОВАТЬ

Вы также можете гарантировать порядок, используя небольшую хитрость. Я пытаюсь избежать здесь Scala 2.8, хотя он может значительно упростить эту задачу с помощью именованных параметров и параметров по умолчанию.

sealed abstract class QueryModifiers
case class QMSearcher(/*...*/) extends QueryModifiers
case class QMFilter(/*...*/) extends QueryModifiers
case class QMFetcher(/*...*/) extends QueryModifiers
case object Consolidate extends QueryModifiers

class Query[NextQM] private (title: String, searchers: List[QMSeacher], filters: List[QMFilter], fetchers: List[QMFetcher]) {

// Build the pipeline
  def ->[T <: NextQM](qms: List[NextQM])(implicit m: Manifest[T]) = m.toString match {
    case "QMSearch" => new Query[QMFilter](title, qms, Nil, Nil)
    case "QMFilter" => new Query[QMFetcher](title, seachers, qms, Nil)
    case "QMFetcher" => new Query[Consolidate.type](title, searches, filters, qms)
    case _ /* "Consolidate$", actually */ => error("List of consolidate unexpected")
  }
  // Do similarly for qm: NextQM

  // Consolidation
  def ->(qm: Consolidate.type) = {
     // Create Searchers actors
     // Send them the Filters
     // Send them Fetchers
     // Create the Consolidator actor
     // Send it to Searchers actors
     // Send Searchers the query
     // Ask Consolidator for answer
  }
}

object Query {
  def apply(title: String) = new Query[QMSearcher](title, Nil, Nil, Nil)
}

Теперь акторы Searcher хранят список фильтров, список сборщиков и ссылку на консолидатор. Они слушают сообщения, информирующие их об этих вещах, и задают вопрос. Для каждого результата они создают субъект фильтра для каждого фильтра в списке, отправляют каждому из них список средств выборки и консолидатор, а затем отправляют им результат.

Участники фильтра хранят список средств выборки и ссылку на консолидатор. Они слушают сообщения, информирующие их об этих вещах, а также результаты поиска. Они отправляют свой вывод, если таковой имеется, вновь созданным акторам сборщика, которые сначала информируются об объединителе.

Сборщики хранят ссылку на консолидаторы. Они слушают сообщение, информирующее их об этой ссылке, и о результате фильтра. Они, в свою очередь, отправляют свой результат консолидатору.

Консолидатор прослушивает два сообщения. Одно сообщение, исходящее от актеров сборщика, информирует их о результатах, которые они накапливают. В другом сообщении, исходящем от запроса, запрашивается результат, который он возвращает.

Осталось только разработать способ сообщить консолидатору, что все результаты обработаны. Один из способов:

  1. В запросе сообщить субъекту Consolidator обо всех созданных объектах поиска. Консолидатор хранит их список с флагом, показывающим, завершены они или нет.
  2. Каждый поисковик хранит список созданных им фильтров и ждет от них сообщения «готово». Когда поисковику не осталось обработки и он получил «выполнено» от всех фильтров, он отправляет сообщение консолидатору, информируя его о завершении.
  3. Каждый фильтр, в свою очередь, сохраняет список сборщиков он создал и, аналогично, ожидает от них сообщений «готово». Когда обработка завершится и будет получено "Готово" от всех сборщиков, он информирует поисковика о том, что он сделал.
  4. Он отправляет сообщение "готово" фильтру, который его создал, когда его работа завершена и отправлена ​​в консолидатор.
  5. Консолидатор только слушает сообщение, запрашивающее результат после того, как он получил "готово" от всех поисковиков.
4
ответ дан 17 December 2019 в 07:05
поделиться
Другие вопросы по тегам:

Похожие вопросы: