Используя Scala 2.8 RC1 или более новый, что является лучшим (самый легкий и/или самый прямой) метод, чтобы "посмотреть" на сообщения ожидания в почтовом ящике агента (из действия того же агента () метод) для исследования то, что находится в очереди, не имея необходимость реагировать/получать сообщения и/или нарушать текущее содержание почтового ящика всегда.
Цель этого состоит в том так, чтобы агент мог определить, безопасно ли обработать запрос для выхода первым определением, если любое из остающихся сообщений почтового ящика является, которые должны быть обработаны, вместо просто отброшенного путем остановки агента сразу.
Вам не нужно заглядывать вперед. Просто отслеживайте тот факт, что был запрошен выход, и используйте reactWithin(0), чтобы определить, когда очередь пуста после запроса выхода.
import scala.actors._
sealed case class Message
case object Exit extends Message
case class Unimportant(n:Int) extends Message
case class Important(n:Int) extends Message
class SafeExitingActor extends Actor {
def act : Nothing = react {
case Exit => {
println("exit requested, clearing the queue")
exitRequested
}
case message => {
processMessage(message, false)
act
}
}
// reactWithin(0) gives a TIMEOUT as soon as the mailbox is empty
def exitRequested : Nothing = reactWithin(0) {
case Exit => {
println("extra exit requested, ignoring")
exitRequested // already know about the exit, keep processing
}
case TIMEOUT => {
println("timeout, queue is empty, shutting down")
exit // TIMEOUT so nothing more to process, we can shut down
}
case message => {
processMessage(message, true)
exitRequested
}
}
// process is a separate method to avoid duplicating in act and exitRequested
def processMessage(message : Any, importantOnly : Boolean) = {
message match {
case Unimportant(n) if !importantOnly => println("Unimportant " + n)
case Unimportant(n) => () // do nothing
case Important(n) => println("Important! " + n)
}
Thread sleep 100 // sleep a little to ensure mailbox backlog
}
}
object TestProcessing {
def main(args : Array[String]) {
val actor = new SafeExitingActor()
actor.start()
for (i <- 1 to 10) {
actor ! Unimportant(i)
actor ! Important(i)
}
actor ! Exit
for (i <- 11 to 20) {
actor ! Unimportant(i)
actor ! Important(i)
}
actor ! Exit
actor ! Important(100)
}
}
Это должно вывести
Unimportant 1
Important! 1
Unimportant 2
Important! 2
Unimportant 3
Important! 3
Unimportant 4
Important! 4
Unimportant 5
Important! 5
Unimportant 6
Important! 6
Unimportant 7
Important! 7
Unimportant 8
Important! 8
Unimportant 9
Important! 9
Unimportant 10
Important! 10
exit requested, clearing the queue
Important! 11
Important! 12
Important! 13
Important! 14
Important! 15
Important! 16
Important! 17
Important! 18
Important! 19
Important! 20
extra exit requested, ignoring
Important! 100
timeout, queue is empty, shutting down
В целом это звучит как опасная операция, поскольку при наличии критических сообщений субъект, обрабатывающий их, может проверить и не найти ни одного, но тогда перед выходом может быть дан другой из какого-то другого потока.
Если вы точно знаете, что этого не может произойти, и вам не нужно много невероятно быстрых переключателей сообщений, я бы, вероятно, написал актера-охранника, который считает и отслеживает критические сообщения, но в противном случае просто передает их на другого актера для обработки.
Если вы не хотите этого делать, имейте в виду, что детали внутреннего устройства должны измениться, и вам, возможно, придется просмотреть исходный код для Actor, Reactor, MessageQueue и т. Д., Чтобы получить то, что вы хотеть. На данный момент должно сработать что-то вроде этого (предупреждение, непроверено):
package scala.actors
package myveryownstuff
trait CriticalActor extends Actor {
def criticalAwaits(p: Any => Boolean) = {
synchronized {
drainSendBuffer(mailbox)
mailbox.get(0)(p).isDefined
}
}
}
Обратите внимание, что мы должны поместить расширенную характеристику в пакет scala.actors, потому что все внутренние компоненты почтового ящика объявлены закрытыми для пакета scala.actors. (Это хорошее предупреждение, что вы должны быть осторожны, прежде чем возиться с внутренними компонентами.) Затем мы добавляем новый метод, который принимает функцию, которая может проверять критическое сообщение и искать его с помощью встроенного mailbox.get ( n) метод
, который возвращает n
-е сообщение, выполняющее некоторый предикат.