Как использовать IObservable/IObserver с ConcurrentQueue или ConcurrentStack

Я понял, что, когда я пытаюсь обработать объекты в параллельной очереди, использующей несколько потоков, в то время как несколько потоков могут помещать объекты в нее, идеальное решение состояло бы в том, чтобы использовать Реактивные Расширения с Параллельными структурами данных.

Мой исходный вопрос в:

При использовании ConcurrentQueue, попытке исключить из очереди, в то время как цикличное выполнение через параллельно

Таким образом, мне любопытно, если существует какой-либо способ иметь LINQ (или PLINQ) запрос, который непрерывно будет исключением из очереди, поскольку объекты помещаются в него.

Я пытаюсь заставить это работать способом, где у меня может быть n число производителей, продвигающих в очередь и ограниченное количество потоков для обработки, таким образом, я не перегружаю базу данных.

Если я мог бы использовать платформу Rx затем, я ожидаю, что мог только запустить ее, и если 100 объектов помещаются в в 100 мс, то 20 потоков, которые являются частью запроса PLINQ, просто обработали бы через очередь.

Существует три технологии, я пытаюсь сотрудничать:

  1. Платформа Rx (реактивный LINQ)
  2. PLING
  3. Система. Наборы. Параллельные структуры

5
задан Community 23 May 2017 в 11:53
поделиться