Лучший метод для заглядывания в Почтовый ящик Агента Scala

Используя Scala 2.8 RC1 или более новый, что является лучшим (самый легкий и/или самый прямой) метод, чтобы "посмотреть" на сообщения ожидания в почтовом ящике агента (из действия того же агента () метод) для исследования то, что находится в очереди, не имея необходимость реагировать/получать сообщения и/или нарушать текущее содержание почтового ящика всегда.

Цель этого состоит в том так, чтобы агент мог определить, безопасно ли обработать запрос для выхода первым определением, если любое из остающихся сообщений почтового ящика является, которые должны быть обработаны, вместо просто отброшенного путем остановки агента сразу.

6
задан scaling_out 27 April 2010 в 12:59
поделиться

2 ответа

Вам не нужно заглядывать вперед. Просто отслеживайте тот факт, что был запрошен выход, и используйте reactWithin(0), чтобы определить, когда очередь пуста после запроса выхода.

import scala.actors._

sealed case class Message
case object Exit extends Message
case class Unimportant(n:Int) extends Message
case class Important(n:Int) extends Message

class SafeExitingActor extends Actor {
  def act : Nothing = react {
      case Exit => {
           println("exit requested, clearing the queue")
           exitRequested
      }
      case message => {
           processMessage(message, false)
           act
      }
  }

  // reactWithin(0) gives a TIMEOUT as soon as the mailbox is empty
  def exitRequested : Nothing = reactWithin(0) {
     case Exit => {
         println("extra exit requested, ignoring")
         exitRequested // already know about the exit, keep processing
     }
     case TIMEOUT => {
         println("timeout, queue is empty, shutting down")
         exit // TIMEOUT so nothing more to process, we can shut down
     }
     case message => {
         processMessage(message, true)
         exitRequested
     }
  }

  // process is a separate method to avoid duplicating in act and exitRequested
  def processMessage(message : Any, importantOnly : Boolean) = {
     message match {
       case Unimportant(n) if !importantOnly => println("Unimportant " + n)
       case Unimportant(n) => () // do nothing
       case Important(n) => println("Important! " + n)
     }
     Thread sleep 100 // sleep a little to ensure mailbox backlog
  }
}

object TestProcessing {
  def main(args : Array[String]) {
    val actor = new SafeExitingActor()
    actor.start()
    for (i <- 1 to 10) {
        actor ! Unimportant(i)
        actor ! Important(i)
    }
    actor ! Exit
    for (i <- 11 to 20) {
        actor ! Unimportant(i)
        actor ! Important(i)
    }
    actor ! Exit
    actor ! Important(100)
  }
}

Это должно вывести

Unimportant 1
Important! 1
Unimportant 2
Important! 2
Unimportant 3
Important! 3
Unimportant 4
Important! 4
Unimportant 5
Important! 5
Unimportant 6
Important! 6
Unimportant 7
Important! 7
Unimportant 8
Important! 8
Unimportant 9
Important! 9
Unimportant 10
Important! 10
exit requested, clearing the queue
Important! 11
Important! 12
Important! 13
Important! 14
Important! 15
Important! 16
Important! 17
Important! 18
Important! 19
Important! 20
extra exit requested, ignoring
Important! 100
timeout, queue is empty, shutting down
10
ответ дан 8 December 2019 в 18:34
поделиться

В целом это звучит как опасная операция, поскольку при наличии критических сообщений субъект, обрабатывающий их, может проверить и не найти ни одного, но тогда перед выходом может быть дан другой из какого-то другого потока.

Если вы точно знаете, что этого не может произойти, и вам не нужно много невероятно быстрых переключателей сообщений, я бы, вероятно, написал актера-охранника, который считает и отслеживает критические сообщения, но в противном случае просто передает их на другого актера для обработки.

Если вы не хотите этого делать, имейте в виду, что детали внутреннего устройства должны измениться, и вам, возможно, придется просмотреть исходный код для Actor, Reactor, MessageQueue и т. Д., Чтобы получить то, что вы хотеть. На данный момент должно сработать что-то вроде этого (предупреждение, непроверено):

package scala.actors
package myveryownstuff
trait CriticalActor extends Actor {
  def criticalAwaits(p: Any => Boolean) = {
    synchronized {
      drainSendBuffer(mailbox)
      mailbox.get(0)(p).isDefined
    }
  }
}

Обратите внимание, что мы должны поместить расширенную характеристику в пакет scala.actors, потому что все внутренние компоненты почтового ящика объявлены закрытыми для пакета scala.actors. (Это хорошее предупреждение, что вы должны быть осторожны, прежде чем возиться с внутренними компонентами.) Затем мы добавляем новый метод, который принимает функцию, которая может проверять критическое сообщение и искать его с помощью встроенного mailbox.get ( n) метод , который возвращает n -е сообщение, выполняющее некоторый предикат.

3
ответ дан 8 December 2019 в 18:34
поделиться
Другие вопросы по тегам:

Похожие вопросы: