Так что это продолжение моего последнего вопроса - поэтому вопрос был «Каков наилучший способ создания программы, которая является поточно-ориентированной, с точки зрения необходимости записи двойных значений в файл. Если функция, сохраняющая значения через streamwriter, вызывается несколькими потоками? Каков наилучший способ сделать это? ? "
И я изменил некоторый код, найденный в MSDN, как насчет следующего? Этот файл корректно записывает все в файл.
namespace SafeThread
{
class Program
{
static void Main()
{
Threading threader = new Threading();
AutoResetEvent autoEvent = new AutoResetEvent(false);
Thread regularThread =
new Thread(new ThreadStart(threader.ThreadMethod));
regularThread.Start();
ThreadPool.QueueUserWorkItem(new WaitCallback(threader.WorkMethod),
autoEvent);
// Wait for foreground thread to end.
regularThread.Join();
// Wait for background thread to end.
autoEvent.WaitOne();
}
}
class Threading
{
List<double> Values = new List<double>();
static readonly Object locker = new Object();
StreamWriter writer = new StreamWriter("file");
static int bulkCount = 0;
static int bulkSize = 100000;
public void ThreadMethod()
{
lock (locker)
{
while (bulkCount < bulkSize)
Values.Add(bulkCount++);
}
bulkCount = 0;
}
public void WorkMethod(object stateInfo)
{
lock (locker)
{
foreach (double V in Values)
{
writer.WriteLine(V);
writer.Flush();
}
}
// Signal that this thread is finished.
((AutoResetEvent)stateInfo).Set();
}
}
}
Thread
и QueueUserWorkItem
являются самыми низкими доступными API для многопоточности. Я бы не стал их использовать, если бы у меня, наконец, не было другого выбора. Попробуйте класс Task
для абстракции гораздо более высокого уровня. Подробнее см. в моем недавнем сообщении в блоге на эту тему.
Вы также можете использовать BlockingCollection
в качестве правильной очереди производителя/потребителя вместо того, чтобы пытаться создать ее вручную с наименьшим доступным API для синхронизации.
Правильно изобрести эти колеса на удивление сложно. Я настоятельно рекомендую использовать классы, разработанные для этого типа потребностей (Task
и BlockingCollection
, если быть точным). Они встроены в инфраструктуру .NET 4.0 и доступны в виде надстройки для .NET 3.5.
Код, который у вас есть, немного нарушен - в частности, если поставленный в очередь рабочий элемент запускается первым, то он немедленно сбрасывает (пустой) список значений перед завершением, после чего ваш рабочий идет и заполняет список (который в конечном итоге будет проигнорирован). Событие автоматического сброса также ничего не делает, поскольку ничто никогда не запрашивает и не ожидает своего состояния.
Кроме того, поскольку каждый поток использует разные блокировки, блокировки не имеют смысла! Вам нужно убедиться, что вы удерживаете единую общую блокировку при доступе к стримрайтеру. Вам не нужна блокировка между кодом сброса и кодом генерации; вам просто нужно убедиться, что флеш запускается после завершения генерации.
Вы, вероятно, на правильном пути, хотя я бы использовал массив фиксированного размера вместо списка и сбрасывал все записи из массива, когда он заполнялся. Это позволяет избежать возможности исчерпания памяти, если поток долгоживущий.
«Правильный ответ» на самом деле зависит от того, что вы ищете с точки зрения блокирующего/блокирующего поведения. Например, самым простым было бы пропустить промежуточную структуру данных, просто иметь метод WriteValues, чтобы каждый поток, «отчитывающийся» о своих результатах, продолжал и записывал их в файл. Что-то вроде:
StreamWriter writer = new StreamWriter("file");
public void WriteValues(IEnumerable<double> values)
{
lock (writer)
{
foreach (var d in values)
{
writer.WriteLine(d);
}
writer.Flush();
}
}
Конечно, это означает, что рабочие потоки сериализуются во время их фаз «отчета о результатах» — в зависимости от характеристик производительности это может быть вполне нормально (например, 5 минут на генерацию, 500 мс на запись).
С другой стороны, рабочие потоки записывают данные в структуру данных. Если вы используете .NET 4, я бы рекомендовал просто использовать ConcurrentQueue, а не делать эту блокировку самостоятельно.
Кроме того, вы можете захотеть выполнять файловый ввод-вывод большими пакетами, чем те, о которых сообщают рабочие потоки, поэтому вы можете просто выполнять запись в фоновом потоке на некоторой частоте. Этот конец спектра выглядит примерно так, как показано ниже (вы бы удалили вызовы Console.WriteLine в реальном коде, они просто есть, чтобы вы могли увидеть, как он работает в действии)
public class ThreadSafeFileBuffer<T> : IDisposable
{
private readonly StreamWriter m_writer;
private readonly ConcurrentQueue<T> m_buffer = new ConcurrentQueue<T>();
private readonly Timer m_timer;
public ThreadSafeFileBuffer(string filePath, int flushPeriodInSeconds = 5)
{
m_writer = new StreamWriter(filePath);
var flushPeriod = TimeSpan.FromSeconds(flushPeriodInSeconds);
m_timer = new Timer(FlushBuffer, null, flushPeriod, flushPeriod);
}
public void AddResult(T result)
{
m_buffer.Enqueue(result);
Console.WriteLine("Buffer is up to {0} elements", m_buffer.Count);
}
public void Dispose()
{
Console.WriteLine("Turning off timer");
m_timer.Dispose();
Console.WriteLine("Flushing final buffer output");
FlushBuffer(); // flush anything left over in the buffer
Console.WriteLine("Closing file");
m_writer.Dispose();
}
/// <summary>
/// Since this is only done by one thread at a time (almost always the background flush thread, but one time via Dispose), no need to lock
/// </summary>
/// <param name="unused"></param>
private void FlushBuffer(object unused = null)
{
T current;
while (m_buffer.TryDequeue(out current))
{
Console.WriteLine("Buffer is down to {0} elements", m_buffer.Count);
m_writer.WriteLine(current);
}
m_writer.Flush();
}
}
class Program
{
static void Main(string[] args)
{
var tempFile = Path.GetTempFileName();
using (var resultsBuffer = new ThreadSafeFileBuffer<double>(tempFile))
{
Parallel.For(0, 100, i =>
{
// simulate some 'real work' by waiting for awhile
var sleepTime = new Random().Next(10000);
Console.WriteLine("Thread {0} doing work for {1} ms", Thread.CurrentThread.ManagedThreadId, sleepTime);
Thread.Sleep(sleepTime);
resultsBuffer.AddResult(Math.PI*i);
});
}
foreach (var resultLine in File.ReadAllLines(tempFile))
{
Console.WriteLine("Line from result: {0}", resultLine);
}
}
}
То есть вы хотите, чтобы несколько потоков записывали данные в один файл с помощью StreamWriter? Легкий. Просто заблокируйте объект StreamWriter.
Код здесь создаст 5 потоков. Каждый поток будет выполнять 5 «действий», и в конце каждого действия он будет записывать 5 строк в файл с именем «файл».
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
namespace ConsoleApplication1 {
class Program {
static void Main() {
StreamWriter Writer = new StreamWriter("file");
Action<int> ThreadProcedure = (i) => {
// A thread may perform many actions and write out the result after each action
// The outer loop here represents the multiple actions this thread will take
for (int x = 0; x < 5; x++) {
// Here is where the thread would generate the data for this action
// Well simulate work time using a call to Sleep
Thread.Sleep(1000);
// After generating the data the thread needs to lock the Writer before using it.
lock (Writer) {
// Here we'll write a few lines to the Writer
for (int y = 0; y < 5; y++) {
Writer.WriteLine("Thread id = {0}; Action id = {1}; Line id = {2}", i, x, y);
}
}
}
};
//Now that we have a delegate for the thread code lets make a few instances
List<IAsyncResult> AsyncResultList = new List<IAsyncResult>();
for (int w = 0; w < 5; w++) {
AsyncResultList.Add(ThreadProcedure.BeginInvoke(w, null, null));
}
// Wait for all threads to complete
foreach (IAsyncResult r in AsyncResultList) {
r.AsyncWaitHandle.WaitOne();
}
// Flush/Close the writer so all data goes to disk
Writer.Flush();
Writer.Close();
}
}
}
Результатом должен быть файл «файл» со 125 строками, в котором все «действия» выполняются одновременно, а результат каждого действия синхронно записывается в файл.