Чистый Ruby параллельный Хеш

Аргументы ВМ работали для меня в затмении. Если вы используете eclipse версии 3.4, выполните следующие действия

, перейдите к Run --> Run Configurations -->, затем выберите проект в maven build ->, затем выберите вкладку «JRE» -> и введите -Xmx1024m.

В качестве альтернативы вы можете сделать Run --> Run Configurations --> select the "JRE" tab -->, затем ввести - Xmx1024m

Это должно увеличить кучу памяти для всех сборок / проектов. Приведенный выше объем памяти составляет 1 ГБ. Вы можете оптимизировать, как вы хотите.

60
задан Yehuda Katz 3 July 2009 в 22:05
поделиться

8 ответов

Хорошо, теперь, когда вы указали фактическое значение слова «потокобезопасность», вот две возможные реализации. Следующий код будет работать вечно в MRI и JRuby. Реализация без блокировки следует модели конечной согласованности, в которой каждый поток использует свое собственное представление хэша, если мастер находится в движении. Чтобы убедиться, что при хранении всей информации в потоке не происходит утечки памяти, требуется небольшая хитрость, но это обрабатывается и проверяется - размер процесса не увеличивается при выполнении этого кода. Обе реализации потребуют дополнительной работы, чтобы быть «завершенными», а это означает, что удаление, обновление и т. Д. Потребует некоторого размышления, но любая из двух концепций, представленных ниже, будет соответствовать вашим требованиям.

Это ' очень важно для людей, читающих эту ветку, чтобы понять, что вся проблема связана исключительно с JRuby - в MRI достаточно встроенного хэша.

module Cash
  def Cash.new(*args, &block)
    env = ENV['CASH_IMPL']
    impl = env ? Cash.const_get(env) : LocklessImpl
    klass = defined?(JRUBY_VERSION) ? impl : ::Hash
    klass.new(*args)
  end

  class LocklessImpl
    def initialize
      @hash = {}
    end

    def thread_hash
      thread = Thread.current
      thread[:cash] ||= {}
      hash = thread[:cash][thread_key]
      if hash
        hash
      else
        hash = thread[:cash][thread_key] = {}
        ObjectSpace.define_finalizer(self){ thread[:cash].delete(thread_key) }
        hash
      end
    end

    def thread_key
      [Thread.current.object_id, object_id]
    end

    def []=(key, val)
      time = Time.now.to_f
      tuple = [time, val]
      @hash[key] = tuple
      thread_hash[key] = tuple
      val
    end

    def [](key)
    # check the master value
    #
      val = @hash[key]

    # someone else is either writing the key or it has never been set.  we
    # need to invalidate our own copy in either case
    #
      if val.nil?
        thread_val = thread_hash.delete(key)
        return(thread_val ? thread_val.last : nil)
      end

    # check our own thread local value
    #
      thread_val = thread_hash[key]

    # in this case someone else has written a value that we have never seen so
    # simply return it
    #
      if thread_val.nil?
        return(val.last)
      end

    # in this case there is a master *and* a thread local value, if the master
    # is newer juke our own cached copy
    #
      if val.first > thread_val.first
        thread_hash.delete(key)
        return val.last
      else
        return thread_val.last
      end
    end
  end

  class LockingImpl < ::Hash
    require 'sync'

    def initialize(*args, &block)
      super
    ensure
      extend Sync_m
    end

    def sync(*args, &block)
      sync_synchronize(*args, &block)
    end

    def [](key)
      sync(:SH){ super }
    end

    def []=(key, val)
      sync(:EX){ super }
    end
  end
end



if $0 == __FILE__
  iteration = 0

  loop do
    n = 42
    hash = Cash.new

    threads =
      Array.new(10) {
        Thread.new do
          Thread.current.abort_on_exception = true
          n.times do |key|
            hash[key] = key
            raise "#{ key }=nil" if hash[key].nil?
          end
        end
      }

    threads.map{|thread| thread.join}

    puts "THREADSAFE: #{ iteration += 1 }"
  end
end
22
ответ дан 24 November 2019 в 17:54
поделиться

Базовое / простое решение для публикации, просто чтобы повысить мою репутацию за переполнение стека:

require 'thread'

class ConcurrentHash < Hash
  def initialize
    super
    @mutex = Mutex.new
  end

  def [](*args)
    @mutex.synchronize { super }
  end

  def []=(*args)
    @mutex.synchronize { super }
  end
end
10
ответ дан 24 November 2019 в 17:54
поделиться

Это класс-оболочка вокруг Hash, который разрешает одновременное считывание, но блокирует все для всех других типов доступа (включая итерационные чтения).

class LockedHash
  def initialize
    @hash = Hash.new
    @lock = ThreadAwareLock.new()
    @reader_count = 0
  end

  def [](key)
    @lock.lock_read
    ret = @hash[key]
    @lock.unlock_read
    ret
  end

  def []=(key, value)
    @lock.lock_write
    @hash[key] = value
    @lock.unlock_write
  end

  def method_missing(method_sym, *arguments, &block)
    if @hash.respond_to? method_sym
      @lock.lock_block
      val = lambda{@hash.send(method_sym,*arguments, &block)}.call
      @lock.unlock_block
      return val
    end
    super
  end
end

Вот код блокировки, который он использует:

class RWLock
  def initialize
    @outer = Mutex.new
    @inner = Mutex.new
    @reader_count = 0
  end
  def lock_read
    @outer.synchronize{@inner.synchronize{@reader_count += 1}}
  end
  def unlock_read
    @inner.synchronize{@reader_count -= 1}
  end
  def lock_write
    @outer.lock
    while @reader_count > 0 ;end
  end
  def unlock_write
    @outer.unlock
  end
end

class ThreadAwareLock < RWLock
  def initialize
    @owner = nil
    super
  end
  def lock_block
    lock_write
    @owner = Thread.current.object_id
  end
  def unlock_block
    @owner = nil
    unlock_write
  end
  def lock_read
    super unless my_block?
  end
  def unlock_read
    super unless my_block?
  end
  def lock_write
    super unless my_block?
  end
  def unlock_write
    super unless my_block?
  end
  def my_block?
    @owner == Thread.current.object_id
  end
end

Блокировка с поддержкой потоков позволяет вам заблокировать класс один раз, а затем вызвать методы, которые обычно блокируются, и не блокировать их. Вам это нужно, потому что вы переходите в блоки внутри некоторых методов, и эти блоки могут вызывать методы блокировки для объекта, и вам не нужна тупиковая ситуация или ошибка двойной блокировки. Вместо этого вы можете использовать блокировку подсчета.

Вот попытка реализовать блокировки чтения-записи на уровне сегмента:

class SafeBucket
  def initialize
    @lock = RWLock.new()
    @value_pairs = []
  end

  def get(key)
    @lock.lock_read
    pair = @value_pairs.select{|p| p[0] == key}
    unless pair && pair.size > 0
      @lock.unlock_read
      return nil
    end
    ret = pair[0][1]
    @lock.unlock_read
    ret
  end

  def set(key, value)
    @lock.lock_write
    pair = @value_pairs.select{|p| p[0] == key}
    if pair && pair.size > 0
      pair[0][1] = value
      @lock.unlock_write
      return
    end
    @value_pairs.push [key, value]
    @lock.unlock_write
    value
  end

  def each
    @value_pairs.each{|p| yield p[0],p[1]}
  end

end

class MikeConcurrentHash
  def initialize
    @buckets = []
    100.times {@buckets.push SafeBucket.new}
  end

  def [](key)
    bucket(key).get(key)
  end

  def []=(key, value)
    bucket(key).set(key, value)
  end

  def each
    @buckets.each{|b| b.each{|key, value| yield key, value}}
  end

  def bucket(key)
    @buckets[key.hash % 100]
  end
end

Я прекратил работать над этим, потому что он слишком медленный, поэтому каждый метод небезопасен (допускает мутации другими потоков во время итерации), и он не поддерживает большинство методов хеширования.

И здесь ' Jruby

Writiness      0.0   0.1   0.2   0.3   0.4   0.5   0.6   0.7   0.8   0.9   1.0
ConcurrentHash 2.098 3.179 2.971 3.083 2.731 2.941 2.564 2.480 2.369 1.862 1.881
LockedHash     1.873 1.896 2.085 2.058 2.001 2.055 1.904 1.921 1.873 1.841 1.630
Hash           0.530 0.672 0.685 0.822 0.719 0.877 0.901 0.931 0.942 0.950 1.001

И МРТ

Writiness      0.0    0.1    0.2    0.3    0.4    0.5    0.6    0.7    0.8    0.9    1.0
ConcurrentHash  9.214  9.913  9.064 10.112 10.240 10.574 10.566 11.027 11.323 11.837 13.036
LockedHash     19.593 17.712 16.998 17.045 16.687 16.609 16.647 15.307 14.464 13.931 14.146
Hash            0.535  0.537  0.534  0.599  0.594  0.676  0.635  0.650  0.654  0.661  0.692

Результаты МРТ поразительны. Записывать МРТ - отстой.

4
ответ дан 24 November 2019 в 17:54
поделиться

Иегуда, я думаю, вы упомянули, что настройка ivar была атомарной? А как насчет простого копирования и обмена?

require 'thread'

class ConcurrentHash
  def initialize
    @reader, @writer = {}, {}
    @lock = Mutex.new
  end

  def [](key)
    @reader[key]
  end

  def []=(key, value)
    @lock.synchronize {
      @writer[key] = value
      @reader, @writer = @writer, @reader
      @writer[key] = value
    }
  end
end
7
ответ дан 24 November 2019 в 17:54
поделиться

это ( видео , pdf ) о хэш-таблице без блокировки, реализованной в Java.

спойлер: используется атомарный Сравнить Операции -And-Swap (CAS) , если они недоступны в Ruby, вы можете эмулировать их с помощью блокировок. не уверен, даст ли это какое-либо преимущество перед простыми хэш-таблицами с защитой блокировкой

1
ответ дан 24 November 2019 в 17:54
поделиться

я не совсем понимаю, что это означает. Я думаю, что самая простая реализация - это просто

Hash

, то есть встроенный рубиновый хеш является потокобезопасным, если под потокобезопасностью вы имеете в виду, не взорвется, если> 1 потока попытается получить к нему доступ. этот код будет безопасно работать вечно

n = 4242
hash = {}

loop do
  a =
    Thread.new do
      n.times do
        hash[:key] = :val
      end
    end

  b =
    Thread.new do
      n.times do
        hash.delete(:key)
      end
    end

  c =
    Thread.new do
      n.times do
        val = hash[:key]
        raise val.inspect unless [nil, :val].include?(val)
      end
    end

  a.join
  b.join
  c.join
  p :THREADSAFE
end

Я подозреваю, что под потокобезопасностью вы действительно имеете в виду ACID - например, запись типа hash [: key] =: val с последующим чтением, если has [: key] вернет: val. но никакие хитрости с блокировкой не могут этого обеспечить - всегда побеждает последний. например, скажем, у вас есть 42 потока, все обновляющие потокобезопасный хэш - какое значение должно быть прочитано 43-м ?? конечно, под словом «трехбезопасность» вы не подразумеваете своего рода тотальный порядок записи - поэтому, если 42 потока активно записывали, «правильным» значением будет любое верно? но встроенный в Ruby Hash работает именно так ...

возможно, вы имеете в виду что-то вроде

hash.each do ...

в одном потоке, а

hash.delete(key)

не будет мешать друг другу? Я могу представить, чтобы это было потокобезопасным, но это даже небезопасно в единственном потоке с MRI ruby ​​(очевидно, вы не можете изменять хеш во время итерации по нему)

, так что можете быть более конкретными что вы имеете в виду под «потокобезопасным» ??

единственный способ предоставить семантику ACID - это грубая блокировка (конечно, это может быть метод, который взял блок, но все же внешнюю блокировку).

1
ответ дан 24 November 2019 в 17:54
поделиться

Не тестировалось, и наивная попытка оптимизации для чтения. Предполагается, что в большинстве случаев значение не будет заблокировано. Если это так, тугая петля будет пытаться, пока это не произойдет. Я поместил туда Thread.critical , чтобы гарантировать, что потоки чтения не будут запущены до завершения записи. Не уверен, нужна ли критическая часть, это действительно зависит от того, насколько тяжелое для чтения вы имеете в виду, поэтому некоторый бенчмаркинг в порядке.

class ConcurrentHash < Hash

  def initialize(*args)
    @semaphore = Mutex.new
    super
  end

  def []=(k,v)
    begin
      old_crit = Thread.critical
      Thread.critical = true unless old_crit
      @semaphore.synchronize { super }
    ensure
      Thread.critical = old_crit
    end
  end

  def [](k)
    while(true)
      return super unless @semaphore.locked?
    end
  end

end

Может быть несколько других методов чтения, которым потребуется проверить блокировку @semaphore, я не знаю, реализовано ли все остальное в терминах # [].

Предполагается, что в большинстве случаев значение не будет заблокировано. Если это так, тугая петля будет пытаться, пока это не произойдет. Я поместил туда Thread.critical , чтобы гарантировать, что потоки чтения не будут запущены до завершения записи. Не уверен, нужна ли критическая часть, это действительно зависит от того, насколько тяжелое для чтения вы имеете в виду, поэтому некоторый бенчмаркинг в порядке.

class ConcurrentHash < Hash

  def initialize(*args)
    @semaphore = Mutex.new
    super
  end

  def []=(k,v)
    begin
      old_crit = Thread.critical
      Thread.critical = true unless old_crit
      @semaphore.synchronize { super }
    ensure
      Thread.critical = old_crit
    end
  end

  def [](k)
    while(true)
      return super unless @semaphore.locked?
    end
  end

end

Может быть несколько других методов чтения, которым потребуется проверить блокировку @semaphore, я не знаю, реализовано ли все остальное в терминах # [].

Предполагается, что в большинстве случаев значение не будет заблокировано. Если это так, тугая петля будет пытаться, пока это не произойдет. Я поместил туда Thread.critical , чтобы гарантировать, что потоки чтения не будут запущены до завершения записи. Не уверен, нужна ли критическая часть, это действительно зависит от того, насколько тяжелое для чтения вы имеете в виду, поэтому некоторый бенчмаркинг в порядке.

class ConcurrentHash < Hash

  def initialize(*args)
    @semaphore = Mutex.new
    super
  end

  def []=(k,v)
    begin
      old_crit = Thread.critical
      Thread.critical = true unless old_crit
      @semaphore.synchronize { super }
    ensure
      Thread.critical = old_crit
    end
  end

  def [](k)
    while(true)
      return super unless @semaphore.locked?
    end
  end

end

Может быть несколько других методов чтения, которым потребуется проверить блокировку @semaphore, я не знаю, реализовано ли все остальное в терминах # [].

так что некоторый сравнительный анализ в порядке.

class ConcurrentHash < Hash

  def initialize(*args)
    @semaphore = Mutex.new
    super
  end

  def []=(k,v)
    begin
      old_crit = Thread.critical
      Thread.critical = true unless old_crit
      @semaphore.synchronize { super }
    ensure
      Thread.critical = old_crit
    end
  end

  def [](k)
    while(true)
      return super unless @semaphore.locked?
    end
  end

end

Может быть несколько других методов чтения, которым потребуется проверить блокировку @semaphore, я не знаю, реализовано ли все остальное в терминах # [].

так что некоторый сравнительный анализ в порядке.

class ConcurrentHash < Hash

  def initialize(*args)
    @semaphore = Mutex.new
    super
  end

  def []=(k,v)
    begin
      old_crit = Thread.critical
      Thread.critical = true unless old_crit
      @semaphore.synchronize { super }
    ensure
      Thread.critical = old_crit
    end
  end

  def [](k)
    while(true)
      return super unless @semaphore.locked?
    end
  end

end

Может быть несколько других методов чтения, которым потребуется проверить блокировку @semaphore, я не знаю, реализовано ли все остальное в терминах # [].

1
ответ дан 24 November 2019 в 17:54
поделиться

Поскольку вы упомянули, что хэш будет тяжелым для чтения, наличие одной блокировки мьютекса для чтения и записи приведет к условиям гонки которые, скорее всего, выиграют чтения. Если вас это устраивает, проигнорируйте ответ.

Если вы хотите дать приоритет записи, вам поможет блокировка чтения-записи. Следующий код основан на каком-то старом назначении C ++ для класса операционных систем, поэтому может быть не самого лучшего качества, но дает общее представление.

require 'thread'

class ReadWriteLock
  def initialize
    @critical_section = Mutex.new
    @are_writers_finished = ConditionVariable.new
    @are_readers_finished = ConditionVariable.new
    @readers = 0
    @writers = 0
    @writer_locked = false
  end

  def read
    begin
      start_read
      yield
    ensure
      end_read
    end
  end

  def start_read
    @critical_section.lock
    while (@writers != 0 || @writer_locked)
      @are_writers_finished.wait(@critical_section)
    end
    @readers += 1
    @critical_section.unlock
  end

  def end_read
    @critical_section.lock
    if (@readers -= 1) == 0
      @are_readers_finished.broadcast
    end
    @critical_section.unlock
  end

  def write
    begin
      start_write
      yield
    ensure
      end_write
    end
  end

  def start_write
    @critical_section.lock
    @writers += 1
    while @readers > 0
      @are_readers_finished.wait(@critical_section)
    end
    while @writer_locked
      @are_writers_finished.wait(@critical_section)
    end
    @writers -= 1
    @writer_locked = true
    @critical_section.unlock
  end

  def end_write
    @critical_section.lock
    @writer_locked = false
    @are_writers_finished.broadcast
    @critical_section.unlock
  end
end

Затем просто оберните [] = и [] в lock.write и lock.read. Может повлиять на производительность, но гарантирует, что операции записи «пройдут» чтения. Его полезность зависит от того, насколько он тяжелый для чтения.

-1
ответ дан 24 November 2019 в 17:54
поделиться
Другие вопросы по тегам:

Похожие вопросы: