Мертвая блокировка в ThreadPool

Исключение нулевого указателя генерируется, когда приложение пытается использовать null в случае, когда требуется объект. К ним относятся:

  1. Вызов метода экземпляра объекта null.
  2. Доступ или изменение поля объекта null.
  3. Принимая длину null, как если бы это был массив.
  4. Доступ или изменение слотов null, как если бы это был массив.
  5. Бросок null как будто это было значение Throwable.

Приложения должны бросать экземпляры этого класса, чтобы указать на другие незаконные использования объекта null.

Ссылка: http://docs.oracle.com/javase/8/docs/api/java/lang/NullPointerException.html

13
задан Csaba 15 October 2018 в 14:49
поделиться

4 ответа

Хорошо, таким образом, основная проблема с реализацией: как удостовериться, что никакой сигнал не потерян, и избегайте тупиков?

, По моему опыту, этого ДЕЙСТВИТЕЛЬНО трудно достигнуть с условными переменными и взаимным исключением, но легкий с семафорами. Это так происходит, что рубин реализует объект под названием Очередь (или SizedQueue), который должен решить проблему. Вот моя предложенная реализация:

require 'thread'
begin
  require 'fasttread'
rescue LoadError
  $stderr.puts "Using the ruby-core thread implementation"
end

class ThreadPool
  class Worker
    def initialize(thread_queue)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @queue = thread_queue
      @running = true
      @thread = Thread.new do
        @mutex.synchronize do
          while @running
            @cv.wait(@mutex)
            block = get_block
            if block
              @mutex.unlock
              block.call
              @mutex.lock
              reset_block
            end
            @queue << self
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @block
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @block = nil
    end

    def busy?
      @mutex.synchronize { !@block.nil? }
    end

    def stop
      @mutex.synchronize do
        @running = false
        @cv.signal
      end
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @queue = Queue.new
    @workers = []
  end

  def size
    @workers.size
  end

  def busy?
    @queue.size < @workers.size
  end

  def shutdown
    @workers.each { |w| w.stop }
    @workers = []
  end

  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    worker = get_worker
    worker.set_block(block)
  end

  private

  def get_worker
    if !@queue.empty? or @workers.size == @max_size
      return @queue.pop
    else
      worker = Worker.new(@queue)
      @workers << worker
      worker
    end
  end

end

И вот простой тестовый код:

tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
tp.shutdown
11
ответ дан 1 December 2019 в 22:24
поделиться

Хорошо, проблема, кажется, находится в Вашем методе ThreadPool#signal. То, что может произойти:

1 - Весь Ваш рабочий занят, и Вы пытаетесь обработать новое задание

2 - строка 90 получает нулевого рабочего

3 - рабочий освобожден и предупреждает о нем, но сигнал потерян, поскольку ThreadPool не ожидает его

4 - Вы падаете на строку 95, ожидая даже при том, что существует свободный рабочий.

ошибка здесь состоит в том, что можно предупредить о свободном рабочем, даже когда никто не слушает. Этот метод ThreadPool#signal должен быть:

def signal
     @mutex.synchronize { @cv.signal }
end

И проблема то же в объекте Рабочего. То, что могло бы произойти:

1 - Рабочий просто завершил задание

2 - Оно проверяет (строка 17), если существует ожидание задания: нет

3 - пул потоков отправляет новое задание, и сигнализирует о нем..., но сигнал потерян

4 - рабочий ожидает сигнала, даже при том, что это отмечено как занятое

, необходимо поместить Ваш инициализировать метод как:

def initialize(callback)
  @mutex = Mutex.new
  @cv = ConditionVariable.new
  @callback = callback
  @mutex.synchronize {@running = true}
  @thread = Thread.new do
    @mutex.synchronize do
      while @running
        block = get_block
        if block
          @mutex.unlock
          block.call
          @mutex.lock
          reset_block
          # Signal the ThreadPool that this worker is ready for another job
          @callback.signal
        else
          # Wait for a new job
          @cv.wait(@mutex)
        end
      end
    end
  end
end

Затем, Worker#get_block и методы Worker#reset_block не должны больше синхронизироваться. Тем путем Вам нельзя было присвоить блок рабочему между тестом для блока и ожиданием сигнала.

1
ответ дан 1 December 2019 в 22:24
поделиться

Я немного смещаюсь здесь, но я предложил бы моделировать, это на некотором языке процесса и модели проверяет его. Инструменты в свободном доступе являются, например, mCRL2 комплектом инструментальных средств (использующий основанный на ACP язык), Инструментальные средства Мобильности (исчисление пи) и Вращение (PROMELA).

Иначе я предложил бы удалить каждый бит кода, который не важен для проблемы и нахождения минимального случая, где мертвая блокировка происходит. Я сомневаюсь, что 100 потоков и 1 300 задач важно получить мертвую блокировку. С меньшим случаем можно, вероятно, просто добавить некоторую печать отладки, которая предоставляет достаточно информации решение проблемы.

1
ответ дан 1 December 2019 в 22:24
поделиться

Вы можете попробовать GEM GEM, предназначенный для координации работы между производителем и пулом рабочих потоков.

8
ответ дан 1 December 2019 в 22:24
поделиться
Другие вопросы по тегам:

Похожие вопросы: