Многопоточная обработка файла с.NET

Существует папка, которая содержит 1000-е небольших текстовых файлов. Я стремлюсь анализировать и обрабатывать всех их, в то время как больше файлов заполняется в папку. Мое намерение состоит в том, чтобы мультираспараллелить эту операцию, поскольку единственный потоковый прототип занял шесть минут для обработки 1 000 файлов.

Мне нравится иметь поток (потоки) средства чтения и устройства записи как следующее. В то время как поток (потоки) читателя читает файлы, я хотел бы иметь поток (потоки) устройства записи для обработки их. После того как средство чтения запущено, читая файл, я хотел бы отметить его как обрабатываемый, такой как путем переименования его. После того как это читается, переименуйте его к завершенному.

Как я приближаюсь к такому многопоточному приложению?

Лучше использовать распределенную хеш-таблицу или очередь?

Какую структуру данных я использую, который избежал бы блокировок?

Существует ли лучший подход к этой схеме?

16
задан Peter Mortensen 11 November 2011 в 09:49
поделиться

6 ответов

Поскольку в комментариях есть любопытство по поводу того, как .NET 4 работает с этим, вот этот подход. Извините, скорее всего, это не вариант для ОП. Отказ от ответственности: Это не высоконаучный анализ, просто показано, что есть явное преимущество в производительности. В зависимости от аппаратного обеспечения, ваш пробег может сильно отличаться.

Вот быстрый тест (если вы видите большую ошибку в этом простом тесте, это просто пример. Пожалуйста, прокомментируйте, и мы сможем исправить его, чтобы сделать более полезным/точным). Для этого я просто бросил 12 000 файлов размером ~60 КБ в каталог в качестве образца (запустите LINQPad; вы можете поиграть с ним сами, бесплатно! - обязательно возьмите LINQPad 4):

var files = 
Directory.GetFiles("C:\\temp", "*.*", SearchOption.AllDirectories).ToList();

var sw = Stopwatch.StartNew(); //start timer
files.ForEach(f => File.ReadAllBytes(f).GetHashCode()); //do work - serial
sw.Stop(); //stop
sw.ElapsedMilliseconds.Dump("Run MS - Serial"); //display the duration

sw.Restart();
files.AsParallel().ForAll(f => File.ReadAllBytes(f).GetHashCode()); //parallel
sw.Stop();
sw.ElapsedMilliseconds.Dump("Run MS - Parallel");

Небольшое изменение цикла для распараллеливания запроса - это все, что нужно в most simple situations. Под "простыми" я в основном подразумеваю, что результат одного действия не влияет на следующее. Чаще всего нужно помнить о том, что некоторые коллекции, например, наш удобный List не является потокобезопасным, поэтому использовать его в параллельном сценарии - не лучшая идея :) К счастью, в .NET 4 были добавлены параллельные коллекции, которые являются потокобезопасными. Также имейте в виду, если вы используете блокирующую коллекцию, это тоже может быть узким местом, в зависимости от ситуации.

Здесь используются расширения .AsParallel(IEnumeable) и .ForAll(ParallelQuery) доступные в .NET 4.0. Вызов .AsParallel() оборачивает IEnumerable в ParallelEnumerableWrapper (внутренний класс), который реализует ParallelQuery. Теперь это позволяет использовать методы параллельного расширения, в данном случае мы используем .ForAll().

.ForAll() внутренне создает ForAllOperator(query, action) и запускает его синхронно. Это обрабатывает потоки и объединение потоков после его выполнения... Там довольно много всего, я бы посоветовал начать здесь, если вы хотите узнать больше, включая дополнительные опции.


Результаты (Компьютер 1 - Физический жесткий диск):

  • Серийный: 1288 - 1333 мс
  • Параллельный: 461 - 503 мс

Спецификации компьютера - для сравнения:

Результаты (Компьютер 2 - твердотельный диск):

  • Последовательный: 545 - 601 мс
  • Параллельный: 248 - 278 мс

Спецификации компьютера - для сравнения:

  • Quad Core 2 Quad Q9100 @ 2.26 GHz
  • 8 GB RAM (DDR 1333)
  • 120 GB OCZ Vertex SSD (Стандартная версия - 1.4 Firmware)

В этот раз у меня нет ссылок на CPU/RAM, они пришли установленными. Это ноутбук Dell M6400 (вот ссылка на M6500... собственные ссылки Dell на 6400 являются нерабочими).


Эти цифры получены в результате 10 прогонов, взяты минимальные/максимальные значения из внутренних 8 результатов (удалены исходные минимальные/максимальные значения для каждого из них как возможные отклонения). Здесь мы столкнулись с узким местом ввода-вывода, особенно на физическом диске, но подумайте о том, что делает последовательный метод. Он читает, обрабатывает, читает, обрабатывает, повторяет. При параллельном подходе вы (даже при наличии узкого места ввода-вывода) читаете и обрабатываете одновременно. В наихудшей ситуации с узким местом вы обрабатываете один файл, читая другой. Уже одно это (на любом современном компьютере!) должно привести к некоторому приросту производительности. В приведенных выше результатах видно, что мы можем обрабатывать несколько файлов одновременно, что дает нам здоровый прирост.

Еще одна оговорка: Четырехъядерный процессор + параллельная .NET 4 не дадут вам четырехкратный прирост производительности, он не масштабируется линейно... Есть и другие соображения и узкие места.

Надеюсь, это было интересно, чтобы показать подход и возможные преимущества. Не стесняйтесь критиковать или улучшать... Этот ответ существует исключительно для любопытных, как указано в комментариях :)

26
ответ дан 30 November 2019 в 16:41
поделиться

У вас может быть центральная очередь, потокам чтения нужен доступ на запись во время передачи содержимого в памяти в очередь. Потокам обработки нужен доступ на чтение к этой центральной очереди, чтобы вытащить следующий поток памяти, который нужно обработать. Таким образом, вы минимизируете время, проведенное в блокировках, и не будете иметь дело со сложностями кода без блокировок.

EDIT: В идеале, вы должны изящно обрабатывать все исключения/ошибки (если они есть), чтобы у вас не было точек отказа.

Как альтернатива, вы можете иметь несколько потоков, каждый из которых "претендует" на файл, переименовывая его перед обработкой, таким образом, файловая система становится реализацией заблокированного доступа. Не знаю, будет ли это более производительным, чем мой первоначальный ответ, только тестирование покажет.

1
ответ дан 30 November 2019 в 16:41
поделиться

Дизайн

Шаблон "Производитель / Потребитель", вероятно, будет наиболее полезным в этой ситуации. Вы должны создать достаточно потоков, чтобы максимизировать пропускную способность.

Вот несколько вопросов о шаблоне «Производитель / Потребитель», чтобы дать вам представление о том, как он работает:

Вы должны использовать очередь блокировки и производитель должны добавлять файлы в очередь, пока потребители обрабатывают файлы из очереди. Очередь с блокировкой не требует блокировки, поэтому это наиболее эффективный способ решения вашей проблемы.

Если вы используете .NET 4.0 есть несколько параллельных коллекций , которые вы можете использовать из коробки:

Threading

Один поток производителя вероятно, будет наиболее эффективным способом загрузить файлы с диска и поместить их в очередь; впоследствии несколько потребителей будут извлекать элементы из очереди и обрабатывать их. Я бы посоветовал вам попробовать 2-4 потребительских потока на ядро ​​и провести некоторые измерения производительности, чтобы определить, какой из них наиболее оптимален (то есть количество потоков, обеспечивающих максимальную пропускную способность). Я бы не рекомендовал использовать ThreadPool для этого конкретного примера.

P.S. Я не понимаю, в чем проблема единой точки отказа и использования распределенных хеш-таблиц? Я знаю, что DHT звучат как действительно крутая вещь в использовании, но я бы сначала попробовал обычные методы, если у вас нет конкретной проблемы, которую вы пытаетесь решить.

6
ответ дан 30 November 2019 в 16:41
поделиться

Вообще говоря, 1000 маленьких файлов (насколько маленьких, кстати?) не должны обрабатываться шесть минут. В качестве быстрого теста выполните find "foobar" * в каталоге, содержащем файлы (первый аргумент в кавычках не имеет значения; он может быть любым), и посмотрите, сколько времени потребуется на обработку каждого файла. Если это займет более одной секунды, я буду разочарован.

Если этот тест подтвердит мои подозрения, то процесс работает на CPU, и вы не получите никакого улучшения от разделения чтения на отдельный поток. Вам следует:

  1. Выяснить, почему обработка небольшого ввода занимает в среднем более 350 мс, и, надеюсь, улучшить алгоритм.
  2. Если нет способа ускорить алгоритм и у вас есть многоядерная машина (в наши дни почти у всех), используйте пул потоков, чтобы назначить 1000 задач на чтение одного файла.
1
ответ дан 30 November 2019 в 16:41
поделиться

Я рекомендую вам ставить в очередь поток для каждого файла и отслеживать запущенные потоки в словаре, запускать новый поток, когда поток завершается, вплоть до максимального предела. Я предпочитаю создавать свои собственные потоки, когда они могут быть длительными, и использовать обратные вызовы, чтобы сигнализировать, когда они завершены или столкнулись с исключением. В приведенном ниже примере я использую словарь для отслеживания запущенных рабочих экземпляров. Таким образом, я могу позвонить в инстанс, если хочу преждевременно прекратить работу. Обратные вызовы также могут использоваться для обновления пользовательского интерфейса с указанием хода выполнения и пропускной способности. Вы также можете динамически регулировать лимит запущенного потока для добавления точек.

Код примера является сокращенным демонстратором, но он работает.

class Program
{
    static void Main(string[] args)
    {
        Supervisor super = new Supervisor();
        super.LaunchWaitingThreads();

        while (!super.Done) { Thread.Sleep(200); }
        Console.WriteLine("\nDone");
        Console.ReadKey();
    }
}

public delegate void StartCallbackDelegate(int idArg, Worker workerArg);
public delegate void DoneCallbackDelegate(int idArg);

public class Supervisor
{
    Queue<Thread> waitingThreads = new Queue<Thread>();
    Dictionary<int, Worker> runningThreads = new Dictionary<int, Worker>();
    int maxThreads = 20;
    object locker = new object();

    public bool Done { 
        get { 
            lock (locker) {
                return ((waitingThreads.Count == 0) && (runningThreads.Count == 0)); 
            } 
        } 
    }

    public Supervisor()
    {
        // queue up a thread for each file
        Directory.GetFiles("C:\\folder").ToList().ForEach(n => waitingThreads.Enqueue(CreateThread(n)));
    }

    Thread CreateThread(string fileNameArg)
    {
        Thread thread = new Thread(new Worker(fileNameArg, WorkerStart, WorkerDone).ProcessFile);
        thread.IsBackground = true;
        return thread;
    }

    // called when a worker starts
    public void WorkerStart(int threadIdArg, Worker workerArg)
    {
        lock (locker)
        {
            // update with worker instance
            runningThreads[threadIdArg] = workerArg;
        }
    }

    // called when a worker finishes
    public void WorkerDone(int threadIdArg)
    {
        lock (locker)
        {
            runningThreads.Remove(threadIdArg);
        }
        Console.WriteLine(string.Format("  Thread {0} done", threadIdArg.ToString()));
        LaunchWaitingThreads();
    }

    // launches workers until max is reached
    public void LaunchWaitingThreads()
    {
        lock (locker)
        {
            while ((runningThreads.Count < maxThreads) && (waitingThreads.Count > 0))
            {
                Thread thread = waitingThreads.Dequeue();
                runningThreads.Add(thread.ManagedThreadId, null); // place holder so count is accurate
                thread.Start();
            }
        }
    }
}

public class Worker
{
    string fileName;
    StartCallbackDelegate startCallback;
    DoneCallbackDelegate doneCallback;
    public Worker(string fileNameArg, StartCallbackDelegate startCallbackArg, DoneCallbackDelegate doneCallbackArg)
    {
        fileName = fileNameArg;
        startCallback = startCallbackArg;
        doneCallback = doneCallbackArg;
    }

    public void ProcessFile()
    {
        startCallback(Thread.CurrentThread.ManagedThreadId, this);
        Console.WriteLine(string.Format("Reading file {0} on thread {1}", fileName, Thread.CurrentThread.ManagedThreadId.ToString()));
        File.ReadAllBytes(fileName);
        doneCallback(Thread.CurrentThread.ManagedThreadId);
    }
}
3
ответ дан 30 November 2019 в 16:41
поделиться

Вы можете рассмотреть очередь файлов для обработки. Заполните очередь один раз, сканируя каталог при запуске и обновив очередь с помощью FileSystemWatcher , чтобы эффективно добавлять новые файлы в очередь без постоянного повторного сканирования каталога.

По возможности считывайте и записывайте на разные физические диски. Это даст вам максимальную производительность ввода-вывода.

Если у вас есть первоначальный пакет из большого количества файлов для обработки, а затем неравномерная скорость добавления новых файлов, и все это происходит на одном диске (чтение / запись), вы можете рассмотреть возможность буферизации обработанных файлы в память, пока не будет выполнено одно из двух условий:

  • Нет (временно) новых файлов
  • Вы поместили в буфер так много файлов, что вы не хотите использовать больше памяти для {{1} } буферизация (в идеале настраиваемый порог)

Если ваша фактическая обработка файлов требует интенсивной работы ЦП, вы можете подумать о наличии одного потока обработки на ядро ​​ЦП. Однако для «нормальной» обработки время ЦП будет тривиальным по сравнению с временем ввода-вывода, и сложность не будет стоить каких-либо незначительных выигрышей.

0
ответ дан 30 November 2019 в 16:41
поделиться
Другие вопросы по тегам:

Похожие вопросы: