Исключение нулевого указателя генерируется, когда приложение пытается использовать null в случае, когда требуется объект. К ним относятся:
null
. null
. null
, как если бы это был массив. null
, как если бы это был массив. null
как будто это было значение Throwable. Приложения должны бросать экземпляры этого класса, чтобы указать на другие незаконные использования объекта null
.
Ссылка: http://docs.oracle.com/javase/8/docs/api/java/lang/NullPointerException.html
Хорошо, таким образом, основная проблема с реализацией: как удостовериться, что никакой сигнал не потерян, и избегайте тупиков?
, По моему опыту, этого ДЕЙСТВИТЕЛЬНО трудно достигнуть с условными переменными и взаимным исключением, но легкий с семафорами. Это так происходит, что рубин реализует объект под названием Очередь (или SizedQueue), который должен решить проблему. Вот моя предложенная реализация:
require 'thread'
begin
require 'fasttread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(thread_queue)
@mutex = Mutex.new
@cv = ConditionVariable.new
@queue = thread_queue
@running = true
@thread = Thread.new do
@mutex.synchronize do
while @running
@cv.wait(@mutex)
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
end
@queue << self
end
end
end
end
def name
@thread.inspect
end
def get_block
@block
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end
def reset_block
@block = nil
end
def busy?
@mutex.synchronize { !@block.nil? }
end
def stop
@mutex.synchronize do
@running = false
@cv.signal
end
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@queue = Queue.new
@workers = []
end
def size
@workers.size
end
def busy?
@queue.size < @workers.size
end
def shutdown
@workers.each { |w| w.stop }
@workers = []
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
worker = get_worker
worker.set_block(block)
end
private
def get_worker
if !@queue.empty? or @workers.size == @max_size
return @queue.pop
else
worker = Worker.new(@queue)
@workers << worker
worker
end
end
end
И вот простой тестовый код:
tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
tp.shutdown
Хорошо, проблема, кажется, находится в Вашем методе ThreadPool#signal. То, что может произойти:
1 - Весь Ваш рабочий занят, и Вы пытаетесь обработать новое задание
2 - строка 90 получает нулевого рабочего
3 - рабочий освобожден и предупреждает о нем, но сигнал потерян, поскольку ThreadPool не ожидает его
4 - Вы падаете на строку 95, ожидая даже при том, что существует свободный рабочий.
ошибка здесь состоит в том, что можно предупредить о свободном рабочем, даже когда никто не слушает. Этот метод ThreadPool#signal должен быть:
def signal
@mutex.synchronize { @cv.signal }
end
И проблема то же в объекте Рабочего. То, что могло бы произойти:
1 - Рабочий просто завершил задание
2 - Оно проверяет (строка 17), если существует ожидание задания: нет
3 - пул потоков отправляет новое задание, и сигнализирует о нем..., но сигнал потерян
4 - рабочий ожидает сигнала, даже при том, что это отмечено как занятое
, необходимо поместить Ваш инициализировать метод как:
def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
@mutex.synchronize do
while @running
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@cv.wait(@mutex)
end
end
end
end
end
Затем, Worker#get_block и методы Worker#reset_block не должны больше синхронизироваться. Тем путем Вам нельзя было присвоить блок рабочему между тестом для блока и ожиданием сигнала.
Я немного смещаюсь здесь, но я предложил бы моделировать, это на некотором языке процесса и модели проверяет его. Инструменты в свободном доступе являются, например, mCRL2 комплектом инструментальных средств (использующий основанный на ACP язык), Инструментальные средства Мобильности (исчисление пи) и Вращение (PROMELA).
Иначе я предложил бы удалить каждый бит кода, который не важен для проблемы и нахождения минимального случая, где мертвая блокировка происходит. Я сомневаюсь, что 100 потоков и 1 300 задач важно получить мертвую блокировку. С меньшим случаем можно, вероятно, просто добавить некоторую печать отладки, которая предоставляет достаточно информации решение проблемы.
Вы можете попробовать GEM GEM, предназначенный для координации работы между производителем и пулом рабочих потоков.