Я борюсь со следующей проблемой в течение недели к настоящему времени и нуждаюсь в некотором совете.
def query(title: String): List[Search] // query("Terminator") => ["Terminator I", "Terminator II", "Terminator 1984", etc...]
def searchIMDB(s: Search): List[SearchResult]
def searchTMDB(s: Search): List[SearchResult]
def filterRedundantSearchResults(sr: SearchResult): Option[SearchResult]
def fetchIMDB(sr: SearchResult): List[MetaInfo]
def fetchTMDB(sr: SearchResult): List[MetaInfo]
def consolidate(infos: List[MetaInfo]): List[List[MetaInfo]]
Я хочу создать Конвейер как:
query("Terminator")
-> [askIMDB, askTMDB, ...]
-> filterRedundantSearchResults (already-searched-state per query)
-> [fetchIMDB, fetchTMDB, ...]
-> consolidate (collected-meta-infos-state per query)
=> List[ TerminatorI-List[MetaInfo], TerminatorII-List[MetaInfo], ...]
До сих пор я реализовал каждый Конвейерный Сегмент как Агента. Я должен создать выделенные экземпляры агента для каждого Запроса, как некоторые из тех агентов как filterXXX и консолидировать потребность поддержать состояние на запрос.
Функции как askIMDB приводят к нескольким результатам, которые я хочу обработать одновременно (каждый к отдельному агенту). Таким образом, я не нашел способа предварительно создать целый график агентов прежде, чем выполнить запрос () и ни один изящный способ изменить его во времени выполнения.
Моя первая попытка была цепочкой агентов и передающий sth как идентификаторы транзакции в сообщениях, таким образом, каждый Агент имел Карту [TransactionID-> состояние], но это чувствовало себя довольно ужасным. Вторая попытка состояла в том, чтобы создать своего рода абстракцию диграфа агентов в один поток, но я перестал работать до сих пор.
Это - мое первое сообщение, извините если я забыл что-то, или вопрос генералу/псевдокодируемому. Любой совет очень ценится.Спасибо!
Предлагаю вам взглянуть на ScalaQuery , который делает примерно то же самое. И он может это сделать, потому что это проблема монады. Фактически, некоторые решения Haskell, такие как Arrows, которые реализованы библиотекой Scalaz , кажутся довольно близкими.
Это было бы лучшим решением, поскольку правильная абстракция упростит внесение изменений в будущее.
В качестве хакера я прикидываю что-то вроде этого:
abstract class QueryModifiers
case object Consolidate extends QueryModifiers
// create others as appropriate
class Query(title: String) {
self =>
// Create actors
def createActor(qm: QueryModifiers): Actor = {
val actor = qm match {
case Consolidate => // create a consolidator actor
case //... as needed
}
actor.start
actor
}
// The pipeline
val pipe: List[List[QueryModifiers]] = Nil
// Build the pipeline
def ->(qms: List[QueryModifiers]) = new Query(title) {
override val pipe = qms :: self.pipe
}
def ->(qm: QueryModifiers) = new Query(title) {
override val pipe = List(qm) :: self.pipe
}
def ->(c: Consolidate.type) = {
// Define the full pipeline
// Because the way pipe is built, the last layer comes first, and the first comes last
val pipeline = Consolidate :: pipe
// Create an actor for every QueryModifier, using an unspecified createActor function
val actors = pipeline map (_ map (createActor(_))
// We have a list of lists of actors now, where the first element of the list
// was the last QueryModifiers we received; so, group the layers by two, and for each
// pair, make the second element send the result to the first.
// Since each layer can contain many actors, make each member of the second
// layer send the results to each member of the first layer.
// The actors should be expecting to receive message SendResultsTo at any time.
for {
List(nextLayer, previousLayer) <- actors.iterator sliding 2
nextActor <- nextLayer
previousActor <- previousLayer
} previousActor ! SendResultsTo(nextActor)
// Send the query to the first layer
for ( firstActor <- actors.last ) firstActor ! Query(title)
// Get the result from the last layer, which is the consolidator
val results = actors.head.head !? Results
// Return the results
results
}
}
РЕДАКТИРОВАТЬ
Вы также можете гарантировать порядок, используя небольшую хитрость. Я пытаюсь избежать здесь Scala 2.8, хотя он может значительно упростить эту задачу с помощью именованных параметров и параметров по умолчанию.
sealed abstract class QueryModifiers
case class QMSearcher(/*...*/) extends QueryModifiers
case class QMFilter(/*...*/) extends QueryModifiers
case class QMFetcher(/*...*/) extends QueryModifiers
case object Consolidate extends QueryModifiers
class Query[NextQM] private (title: String, searchers: List[QMSeacher], filters: List[QMFilter], fetchers: List[QMFetcher]) {
// Build the pipeline
def ->[T <: NextQM](qms: List[NextQM])(implicit m: Manifest[T]) = m.toString match {
case "QMSearch" => new Query[QMFilter](title, qms, Nil, Nil)
case "QMFilter" => new Query[QMFetcher](title, seachers, qms, Nil)
case "QMFetcher" => new Query[Consolidate.type](title, searches, filters, qms)
case _ /* "Consolidate$", actually */ => error("List of consolidate unexpected")
}
// Do similarly for qm: NextQM
// Consolidation
def ->(qm: Consolidate.type) = {
// Create Searchers actors
// Send them the Filters
// Send them Fetchers
// Create the Consolidator actor
// Send it to Searchers actors
// Send Searchers the query
// Ask Consolidator for answer
}
}
object Query {
def apply(title: String) = new Query[QMSearcher](title, Nil, Nil, Nil)
}
Теперь акторы Searcher хранят список фильтров, список сборщиков и ссылку на консолидатор. Они слушают сообщения, информирующие их об этих вещах, и задают вопрос. Для каждого результата они создают субъект фильтра для каждого фильтра в списке, отправляют каждому из них список средств выборки и консолидатор, а затем отправляют им результат.
Участники фильтра хранят список средств выборки и ссылку на консолидатор. Они слушают сообщения, информирующие их об этих вещах, а также результаты поиска. Они отправляют свой вывод, если таковой имеется, вновь созданным акторам сборщика, которые сначала информируются об объединителе.
Сборщики хранят ссылку на консолидаторы. Они слушают сообщение, информирующее их об этой ссылке, и о результате фильтра. Они, в свою очередь, отправляют свой результат консолидатору.
Консолидатор прослушивает два сообщения. Одно сообщение, исходящее от актеров сборщика, информирует их о результатах, которые они накапливают. В другом сообщении, исходящем от запроса, запрашивается результат, который он возвращает.
Осталось только разработать способ сообщить консолидатору, что все результаты обработаны. Один из способов: