Как мне настроить отказоустойчивость akka Actor?

Я пытаюсь добиться отказоустойчивого поведения в Акках Акка. Я работаю над некоторыми код, зависящий от доступности Актеров в системе для длительной обработки. Я обнаружил, что моя обработка прекращается через пару часов (это должно занять около 10 часов) и мало что происходит. Я считаю, что мои Актеры не восстанавливаются после исключений.

Что мне нужно сделать, чтобы Актеры были перезапущены на постоянной основе один за одним? Я ожидаю, что это можно сделать из этой документации http://akka.io/docs/akka/1.1.3/scala/fault-tolerance

Я работаю с akka 1.1.3 и scala 2.9

import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached
import akka.dispatch.Dispatchers
import akka.routing.CyclicIterator
import akka.routing.LoadBalancer
import akka.config.Supervision._


object TestActor {
  val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool")
                   .setCorePoolSize(100)
                   .setMaxPoolSize(100)
                   .build
}

class TestActor(val name: Integer) extends Actor {
    self.lifeCycle = Permanent
    self.dispatcher = TestActor.dispatcher
    def receive = {
       case num: Integer => {  
         if( num % 2 == 0 )
           throw new Exception("This is a simulated failure")
         println("Actor: " + name + " Received: " + num)
         //Thread.sleep(100)
       }
    }

  override def postStop(){
    println("TestActor post Stop ")
  }

  //callback method for restart handling 
  override def preRestart(reason: Throwable){
    println("TestActor "+ name + " restaring after shutdown because of " + reason)
  }

  //callback method for restart handling 
  override def postRestart(reason: Throwable){
    println("Restaring TestActor "+name+"after shutdown because of " + reason)
  }  
}

trait CyclicLoadBalancing extends LoadBalancer { this: Actor =>
    val testActors: List[ActorRef]
    val seq = new CyclicIterator[ActorRef](testActors)
}

trait TestActorManager extends Actor {
     self.lifeCycle = Permanent
     self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 1000, 5000)
     val testActors: List[ActorRef]
     override def preStart = testActors foreach { self.startLink(_) }
     override def postStop = { System.out.println("postStop") }
}


  object FaultTest {
    def main(args : Array[String]) : Unit = {
      println("starting FaultTest.main()")
      val numOfActors = 5
      val supervisor = actorOf(
        new TestActorManager with CyclicLoadBalancing {
             val testActors = (0 until numOfActors toList) map (i => actorOf(new TestActor(i)));
        }
      )

      supervisor.start();

      println("Number of Actors: " +  Actor.registry.actorsFor(classOf[TestActor]).length)

      val testActor = Actor.registry.actorsFor(classOf[TestActor]).head

      (1 until 200 toList) foreach { testActor ! _ }

    }
  }

Этот код устанавливает 5 участников позади LoadBalancer, которые просто распечатывают целые числа, которые им отправляются, за исключением того, что они выдают исключения для четных чисел для имитации ошибок. Этим Актерам отправляются целые числа от 0 до 200. Я ожидаю, что будут выведены нечетные числа, но все, кажется, отключается после пары ошибок на четных числах. Выполнение этого кода с помощью sbt приводит к следующему выводу:

[info] Running FaultTest 
starting FaultTest.main()
Loading config [akka.conf] from the application classpath.
Number of Actors: 5
Actor: 2 Received: 1
Actor: 2 Received: 9
Actor: 1 Received: 3
Actor: 3 Received: 7
[info] == run ==
[success] Successful.
[info] 
[info] Total time: 13 s, completed Aug 16, 2011 11:00:23 AM

Я думаю, что здесь происходит то, что запускаются 5 участников, и первые 5 четных чисел выводят их из строя, и они не перезапускаются.

Как может этот код изменить так, чтобы Актеры оправлялись от исключений?

Я ожидаю, что это на самом деле распечатает все нечетные числа от 1 до 200. Я думаю, что каждый субъект будет терпеть неудачу на четных числах, но будет перезапущен с сохраненным почтовым ящиком при исключениях. Я ожидаю увидеть распечатку от preRestart и postRestart. Что необходимо настроить в этом примере кода, чтобы это произошло?

Вот некоторые дополнительные предположения относительно akka и Actors, которые могут привести к моему недоразумению. Я предполагаю, что Actor может быть настроен с помощью Supervisor или faultHandler, чтобы он был перезапущен и оставался доступным при возникновении исключения во время приема. Я предполагаю, что сообщение, которое было отправлено актеру, будет потеряно, если оно вызовет исключение во время приема. Я предполагаю, что будут вызываться preRestart () и postRestart () для актора, который генерирует исключение.

Пример кода представляет то, что я пытаюсь сделать, и основан на Почему моя отправка включена Акторы уменьшены в Akka?

** Другой пример кода **

Вот еще один пример кода, который проще. Я начинаю одного актера, который делает исключения для четных чисел. На пути нет балансировщика нагрузки или прочего. Я пытаюсь распечатать информацию об актере. Я жду выхода из программы в течение минуты после того, как сообщения были отправлены Актеру и наблюдаю за происходящим.

Я ожидаю, что это распечатает нечетные числа, но похоже, что Актер сидит с сообщениями в свой почтовый ящик.

Я неправильно настроил OneForOneStrategy? Мне нужно связать Актера с чем-то? Является ли такая конфигурация в корне неверной с моей стороны? Нужно ли как-то настраивать диспетчер с отказоустойчивостью? Могу ли я испортить потоки в Диспетчере?

import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.actor.ActorRegistry
import akka.config.Supervision._

class SingleActor(val name: Integer) extends Actor {
    self.lifeCycle = Permanent
    self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 30, 1000)
    def receive = {
       case num: Integer => {  
         if( num % 2 == 0 )
            throw new Exception("This is a simulated failure, where does this get logged?")
         println("Actor: " + name + " Received: " + num)
       }
    }

  override def postStop(){
    println("TestActor post Stop ")
  }

  override def preRestart(reason: Throwable){
    println("TestActor "+ name + " restaring after shutdown because of " + reason)
  }

  override def postRestart(reason: Throwable){
    println("Restaring TestActor "+name+"after shutdown because of " + reason)
  }  
}

object TestSingleActor{

    def main(args : Array[String]) : Unit = {
      println("starting TestSingleActor.main()")

      val testActor = Actor.actorOf( new SingleActor(1) ).start()

      println("number of actors: " + registry.actors.size)
      printAllActorsInfo

      (1 until 20 toList) foreach { testActor ! _ }

      for( i <- 1 until 120 ){
        Thread.sleep(500)
        printAllActorsInfo
      }
    }

  def printAllActorsInfo() ={
    registry.actors.foreach( (a) =>
       println("Actor hash: %d has mailbox %d isRunning: %b isShutdown: %b isBeingRestarted: %b "
               .format(a.hashCode(),a.mailboxSize,a.isRunning,a.isShutdown,a.isBeingRestarted)))
  }
}

Я получаю такой вывод:

[info] Running TestSingleActor 
starting TestSingleActor.main()
Loading config [akka.conf] from the application classpath.
number of actors: 1
Actor hash: -1537745664 has mailbox 0 isRunning: true isShutdown: false isBeingRestarted: false 
Actor: 1 Received: 1
Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 

... 117 more of these lines repeted ...

Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 
[info] == run ==
[success] Successful.
[info] 
[info] Total time: 70 s, completed Aug 17, 2011 2:24:49 PM

8
задан Community 23 May 2017 в 12:01
поделиться