Как ускорить обработку большого CSV с помощью ruby ​​

Для проекта мне нужно проанализировать несколько довольно больших файлов CSV. Содержимое некоторых записей хранится в базе данных MySQL. Я пытаюсь ускорить это с помощью многопоточности, но до сих пор это только замедляло работу.

Я анализирую файл CSV (до 10 ГБ) и некоторые из этих записей (примерно 5 МБ из 20 млн + записей CSV) необходимо вставить в базу данных MySQL. Чтобы определить, какую запись необходимо вставить, мы используем сервер Redis с наборами, которые содержат правильные идентификаторы / ссылки.

Поскольку мы обрабатываем около 30 из этих файлов в любой момент времени, и есть некоторые зависимости, мы сохраняем каждый файл на a Resque queue и иметь несколько серверов, обрабатывающих эти (с приоритетом) очереди.

В двух словах:

class Worker
  def self.perform(file)
    CsvParser.each(file) do |line|
      next unless check_line_with_redis(line)
      a = ObjectA.find_or_initialize_by_reference(line[:reference])
      a.object_bs.destroy_all
      a.update_attributes(line)
    end
  end

Это работает, хорошо масштабируется по горизонтали (больше файлов CSV = больше серверов), но большие файлы CSV создают проблему. В настоящее время у нас есть файлы, на синтаксический анализ которых уходит более 75 часов. Я уже придумал несколько оптимизаций:

Одна заключается в сокращении количества запросов MySQL; мы создаем экземпляры объектов AR, в то время как вставка с помощью простого SQL, если мы знаем идентификатор объекта, выполняется намного быстрее. Таким образом, мы, вероятно, сможем избавиться от большей части AR и, возможно, даже от Rails, чтобы таким образом удалить накладные расходы. Мы можем' t использовать простые данные загрузки MySQL, поскольку мы должны сопоставить записи CSV с другими объектами, у которых к настоящему времени могут быть другие идентификаторы (мы объединяем дюжину устаревших баз данных в новую базу данных).

Другой пытается сделать больше в то же время. Существует некоторое время ожидания ввода-вывода, время ожидания сети как для Redis, так и для MySQL, и даже если MRI использует зеленые потоки, это может позволить нам планировать наши запросы MySQL одновременно с чтением операций ввода-вывода и т. Д. Но используя следующий код:

class Worker
  def self.perform(file)
    CsvParser.each(file) do |line|
      next unless check_line_with_redis(line)
      create_or_join_thread(line) do |myLine|
        a = ObjectA.find_or_initialize_by_reference(myLine[:reference])
        a.object_bs.destroy_all
        a.update_attributes(myLine)
      end
    end

    def self.create_or_join_thread(line)
      @thread.join if @thread.present?
      @thread = Thread.new(line) do |myLine|
        yield myLine
      end
    end
  end

Это медленно замедляет процесс. Когда я ps au , он запускается на 100% ЦП, но с течением времени он снижается до 2-3%. В этот момент он вообще не вставляет новые записи, он просто зависает.

У меня есть strace d процесс, и сначала я вижу, что запросы MySQL проходят, через некоторое время кажется, что это вообще не выполняет мой рубиновый код. Может быть тупик (он завис после разбора последней строки CSV, но процесс продолжал работать на 5% ЦП и не завершился) или что-то, что я прочитал здесь: http: / /timetobleed.com/ruby-threading-bugfix-small-fix-goes-a-long-way/

Я использую Rails 2.3.8, REE, 1.8.7-2010.02 в Ubuntu 10.10. Приветствуются любые идеи о том, как обрабатывать большое количество потоков (или, может быть, почему бы не использовать потоки здесь вообще)!

7
задан Kamiel Wanrooij 24 March 2011 в 22:54
поделиться