Как я должен обработать операции блокирования при использовании scala агентов?

Я начал изучать scala платформу агентов приблизительно два дня назад. Для создания бетона идей в моем уме я решил реализовать основанный на TCP сервер эха, который мог обработать несколько одновременных соединений.

Вот код для сервера эха (обработка ошибок, не включенная):

class EchoServer extends Actor {
  private var connections = 0

  def act() {
    val serverSocket = new ServerSocket(6789)

    val echoServer = self
    actor { while (true) echoServer ! ("Connected", serverSocket.accept) }

    while (true) {
      receive {
        case ("Connected", connectionSocket: Socket) =>
          connections += 1
          (new ConnectionHandler(this, connectionSocket)).start
        case "Disconnected" =>
          connections -= 1
      }
    }
  }
}

В основном сервером является Агент, который обрабатывает "Связанные" и "Разъединенные" сообщения. Это делегирует соединение, слушая анонимного агента, который вызывает принятие () метод (операция блокирования) на serverSocket. Когда соединение прибывает, оно сообщает серверу с помощью "Связанного" сообщения и передает его сокет для использования для связи с недавно связанным клиентом. Экземпляр класса ConnectionHandler обрабатывает фактическую связь с клиентом.

Вот код для обработчика соединений (некоторая включенная обработка ошибок):

class ConnectionHandler(server: EchoServer, connectionSocket: Socket)
    extends Actor {

  def act() {
    for (input <- getInputStream; output <- getOutputStream) {
      val handler = self
      actor {
        var continue = true
        while (continue) {
          try {
            val req = input.readLine
            if (req != null) handler ! ("Request", req)
            else continue = false
          } catch {
            case e: IOException => continue = false
          }
        }

        handler ! "Disconnected"
      }

      var connected = true
      while (connected) {
        receive {
          case ("Request", req: String) =>
            try {
              output.writeBytes(req + "\n")
            } catch {
              case e: IOException => connected = false
            }
          case "Disconnected" =>
            connected = false
        }
      }
    }

    close()
    server ! "Disconnected"
  }

  // code for getInputStream(), getOutputStream() and close() methods
}

Обработчик соединений использует анонимного агента, который ожидает запросов, которые будут отправлены на сокет путем называния readLine () методом (операция блокирования) на входном потоке сокета. Когда запрос получен, сообщение "Запроса" отправляется на обработчик, который затем просто повторяет запрос назад клиенту. Если обработчик или анонимный агент испытывают проблемы с базовым сокетом затем, сокет закрывается, и сообщение "Разъединения" отправляется на сервер эха, указывающий, что клиент был разъединен от сервера.

Так, я могу включить сервер эха и позволить ему ожидать соединений. Затем я могу открыть новый терминал и подключение к серверу с помощью telnet. Я могу отправить, это запрашивает, и это отвечает правильно. Теперь, если я открываю другой терминал и подключение к серверу, сервер регистрирует соединение, но не удается запустить обработчик соединений для этого нового соединения. Когда я отправляю, это обменивается сообщениями через любое из существующих соединений, я не получаю непосредственного ответа. Вот интересная часть. Когда я завершаю всех кроме одного из существующих соединений клиента и оставляю клиент X открытым, затем все ответы на запрос, который я отправил через клиент X, возвращаются. Я сделал некоторые тесты и пришел к заключению, что действие () метод не называют на последующих соединениях клиента даже при том, что я называю запуск () методом при создании обработчика соединений.

Я предполагаю, что обрабатываю операции блокирования неправильно в моем обработчике соединений. Так как предыдущее соединение обрабатывается обработчиком соединений, который имеет заблокированное ожидание анонимного агента запроса, я думаю, что этот заблокированный агент предотвращает других агентов (обработчики соединений) от запуска.

Как я должен обработать операции блокирования при использовании scala агентов?

Любая справка значительно ценилась бы.

7
задан Dwayne Crooks 27 July 2010 в 17:23
поделиться

1 ответ

Из scaladoc для scala.actors.Actor:

Примечание: необходимо соблюдать осторожность при вызове методов блокировки потоков, отличных от тех, которые предоставляются признаком Actor или его объектом-компаньоном (например, receive). Блокирование основного потока внутри актора может привести к голоданию других акторов. Это также относится к акторам, которые долгое время используют свой поток между вызовами receive/react.

Если агенты используют блокирующие операции (например, методы для блокировки ввода-вывода), есть несколько вариантов:

  • Система времени выполнения может быть настроена на использование большего размера пула потоков (например, путем установки свойства actors.corePoolSize JVM).
  • Метод scheduler свойства Actor может быть переопределен для возврата ResizableThreadPoolScheduler, который изменяет размер пула потоков, чтобы избежать голодания, вызванного акторами, вызывающими произвольные блокирующие методы.
  • Свойство actors.enableForkJoin JVM может быть установлено в false, в этом случае по умолчанию для выполнения акторов используется ResizableThreadPoolScheduler.
4
ответ дан 7 December 2019 в 14:26
поделиться
Другие вопросы по тегам:

Похожие вопросы: