Многие объяснения уже присутствуют, чтобы объяснить, как это происходит и как это исправить, но вы также должны следовать рекомендациям, чтобы избежать NullPointerException
вообще.
См. также: A хороший список лучших практик
Я бы добавил, очень важно, хорошо использовать модификатор final
. Использование "окончательной" модификатор, когда это применимо в Java
Сводка:
final
для обеспечения хорошей инициализации. @NotNull
и @Nullable
if("knownObject".equals(unknownObject)
valueOf()
поверх toString (). StringUtils
StringUtils.isEmpty(null)
. Как и было предложено, используйте TPL Dataflow.
Возможно, вы TransformBlock<TInput, TOutput>
.
Вы определяете MaxDegreeOfParallelism
для ограничения того, сколько строк может быть преобразовано (т. е. сколько URL-адресов может быть загружено) параллельно. Затем вы отправляете URL-адреса блоку, и когда вы закончите, вы сообщите блоку, что вы закончили добавление элементов, и вы получите ответы.
var downloader = new TransformBlock<string, HttpResponse>(
url => Download(url),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
);
var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);
foreach(var url in urls)
downloader.Post(url);
//or await downloader.SendAsync(url);
downloader.Complete();
await downloader.Completion;
IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
//process responses
}
Примечание. Буферы TransformBlock
как его ввод, так и вывод. Почему тогда нам нужно связать его с BufferBlock
?
Поскольку TransformBlock
не будет завершена до тех пор, пока все элементы (HttpResponse
) не будут использованы, а await downloader.Completion
будет висеть , Вместо этого мы позволяем downloader
перенаправлять весь свой вывод на выделенный буферный блок - тогда мы ждем завершения downloader
и проверяем буферный блок.
В соответствии с запросом, вот код, в который я закончил.
Работа настраивается в конфигурации мастер-детали, и каждый мастер обрабатывается как пакет. Каждая единица работы ставится в очередь таким образом:
var success = true;
// Start processing all the master records.
Master master;
while (null != (master = await StoredProcedures.ClaimRecordsAsync(...)))
{
await masterBuffer.SendAsync(master);
}
// Finished sending master records
masterBuffer.Complete();
// Now, wait for all the batches to complete.
await batchAction.Completion;
return success;
Мастера буферизуют по одному, чтобы сохранить работу для других внешних процессов. Детали для каждого мастера отправляются для работы через masterTransform
TransformManyBlock
. A BatchedJoinBlock
также создается для сбора деталей в одной партии.
Фактическая работа выполняется в detailTransform
TransformBlock
асинхронно, 150 за раз. BoundedCapacity
установлено значение 300, чтобы гарантировать, что слишком много мастеров не получают буферизацию в начале цепочки, а также оставляют место для достаточно подробных записей, которые должны быть поставлены в очередь, чтобы позволить 150 записей обрабатываться за один раз. Блок выводит object
на свои цели, потому что он фильтруется по ссылкам в зависимости от того, является ли это Detail
или Exception
.
batchAction
ActionBlock
собирает вывод из всех пакеты и выполняет массовые обновления баз данных, протоколирование ошибок и т. д. для каждой партии.
Будет несколько BatchedJoinBlock
s, по одному для каждого мастера. Поскольку каждый ISourceBlock
выводится последовательно, и каждая партия принимает только количество подробных записей, связанных с одним мастером, партии будут обрабатываться по порядку. Каждый блок выводит только одну группу и при завершении отсоединяется. Только последний пакетный блок передает свое завершение окончательному ActionBlock
.
Сеть потока данных:
// The dataflow network
BufferBlock<Master> masterBuffer = null;
TransformManyBlock<Master, Detail> masterTransform = null;
TransformBlock<Detail, object> detailTransform = null;
ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;
// Buffer master records to enable efficient throttling.
masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });
// Sequentially transform master records into a stream of detail records.
masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>
{
var records = await StoredProcedures.GetObjectsAsync(masterRecord);
// Filter the master records based on some criteria here
var filteredRecords = records;
// Only propagate completion to the last batch
var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;
// Create a batch join block to encapsulate the results of the master record.
var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });
// Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block.
var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });
// Unlink batchjoinblock upon completion.
// (the returned task does not need to be awaited, despite the warning.)
batchjoinblock.Completion.ContinueWith(task =>
{
detailLink1.Dispose();
detailLink2.Dispose();
batchLink.Dispose();
});
return filteredRecords;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
// Process each detail record asynchronously, 150 at a time.
detailTransform = new TransformBlock<Detail, object>(async detail => {
try
{
// Perform the action for each detail here asynchronously
await DoSomethingAsync();
return detail;
}
catch (Exception e)
{
success = false;
return e;
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });
// Perform the proper action for each batch
batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>
{
var details = batch.Item1.Cast<Detail>();
var errors = batch.Item2.Cast<Exception>();
// Do something with the batch here
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });
masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });
Скажите, что у вас 1000 URL-адресов, и вы хотите, чтобы одновременно открывалось 50 запросов; но как только один запрос завершается, вы открываете соединение со следующим URL-адресом в списке. Таким образом, всегда открыто ровно 50 подключений за раз, пока список URL не исчерпан.
blockquote>Следующее простое решение появилось много раз здесь, на SO. Он не использует блокирующий код и не создает потоки явно, поэтому он очень хорошо масштабируется:
const int MAX_DOWNLOADS = 50; static async Task DownloadAsync(string[] urls) { using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS)) using (var httpClient = new HttpClient()) { var tasks = urls.Select(async url => { await semaphore.WaitAsync(); try { var data = await httpClient.GetStringAsync(url); Console.WriteLine(data); } finally { semaphore.Release(); } }); await Task.WhenAll(tasks); } }
Дело в том, что обработка загруженных данных должна быть выполненный на другом конвейере, с другим уровнем параллелизма, особенно если это обработка, связанная с процессором.
Например, вы, вероятно, захотите иметь 4 потока одновременно с обработкой данных (количество ядер ЦП) и до 50 ожидающих запросов для большего количества данных (которые вообще не используют потоки). AFAICT, это не то, что делает ваш код в настоящее время.
Вот где TPL Dataflow или Rx может пригодиться в качестве предпочтительного решения. Однако, безусловно, можно реализовать что-то подобное с помощью простого TPL. Примечание. Единственным кодом блокировки является тот, который выполняет фактическую обработку данных внутри
Task.Run
:const int MAX_DOWNLOADS = 50; const int MAX_PROCESSORS = 4; // process data class Processing { SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS); HashSet<Task> _pending = new HashSet<Task>(); object _lock = new Object(); async Task ProcessAsync(string data) { await _semaphore.WaitAsync(); try { await Task.Run(() => { // simuate work Thread.Sleep(1000); Console.WriteLine(data); }); } finally { _semaphore.Release(); } } public async void QueueItemAsync(string data) { var task = ProcessAsync(data); lock (_lock) _pending.Add(task); try { await task; } catch { if (!task.IsCanceled && !task.IsFaulted) throw; // not the task's exception, rethrow // don't remove faulted/cancelled tasks from the list return; } // remove successfully completed tasks from the list lock (_lock) _pending.Remove(task); } public async Task WaitForCompleteAsync() { Task[] tasks; lock (_lock) tasks = _pending.ToArray(); await Task.WhenAll(tasks); } } // download data static async Task DownloadAsync(string[] urls) { var processing = new Processing(); using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS)) using (var httpClient = new HttpClient()) { var tasks = urls.Select(async (url) => { await semaphore.WaitAsync(); try { var data = await httpClient.GetStringAsync(url); // put the result on the processing pipeline processing.QueueItemAsync(data); } finally { semaphore.Release(); } }); await Task.WhenAll(tasks.ToArray()); await processing.WaitForCompleteAsync(); } }
BlockingCollection
. Я просто не понимал, что могу использовать WaitAsync
таким образом. Благодарю вас @ Noseratio.
– Josh Wyant
19 March 2014 в 03:04
async
на несколько ядер, что было моей другой целью. Этот ответ касается моей первой цели, и Dataflow обращается к ним обоим. @Noseratio
– Josh Wyant
20 March 2014 в 17:25
MaxDegreeOfParallelism
– dcastro 19 March 2014 в 01:03SendAsync
). Теза будет буферизирована блоком. Блок будет продолжать брать URL-адреса из буфера и обрабатывать не более 50 за раз. Затем результаты будут помещены в другой буфер. ATransformBlock
буферизует как входной, так и выходной. – dcastro 19 March 2014 в 01:05MaxDegreeOfParallelism
работал точно так же, какParallel.ForEach
, просто создавая произвольное количество потоков для достижения параллелизма. Я ошибался, и параметр очень хорошо работает сasync
. Tpl.Dataflow работает красиво. Благодаря! – Josh Wyant 20 March 2014 в 17:21