Потребитель производителя с квалификациями

Я плохо знаком с clojure и пытаюсь понять, как правильно использовать его функции параллелизма, таким образом, любой критический анализ/предложения ценится. Таким образом, я пытаюсь записать маленькую тестовую программу в clojure, который работает следующим образом:

  1. там 5 производителей и 2 потребителя
  2. производитель ожидает в течение случайного времени и затем продвигает число на общую очередь.
  3. потребитель должен вытянуть число от очереди, как только очередь непуста, и затем спите в течение короткого времени для моделирования выполнения работы
  4. потребители должны заблокироваться, когда очередь пуста
  5. производители должны заблокироваться, когда у очереди есть больше чем 4 объекта в ней, чтобы препятствовать тому, чтобы он стал огромным

Вот мой план относительно каждого шага выше:

  1. производители и потребители будут агентами, которые действительно не заботятся об их состоянии (просто нулевые значения или что-то); я просто использую агенты для проводов функция "потребителя" или "производителя", чтобы сделать в некоторое время. Затем общая очередь будет (очередь определения (касательно [])). Возможно, это должно быть атомом хотя?
  2. в функции агента "производителя", просто (Поток/сон (интервал рэнда 1000)) и затем (dosync (изменяют союз очереди (интервал рэнда 100))) продвигать на очередь.
  3. Я думаю, чтобы заставить потребительские агенты наблюдать за очередью за изменениями с добавлять-наблюдателем. Не уверенный в этом, хотя.. это разбудит потребителей на любом изменении, даже если изменения произошли от потребителя, осуществляющего что-то (возможно создание его пустой). Возможно, проверка это в функции наблюдателя достаточна. Другая проблема, которую я вижу, состоит в том что, если все потребители заняты, то, что происходит, когда производитель добавляет что-то новое для очереди? Наблюдаемое событие добирается, стоял в очереди на некотором потребительском агенте, или это исчезает?
  4. посмотрите выше
  5. Я действительно не знаю, как сделать это. Я слышал, что переход clojure может быть полезным, но я не мог найти достаточно документа о том, как использовать его, и мое начальное тестирование, казалось, не работало (извините не имеют кода меня больше),
21
задан tgguy 3 May 2010 в 17:46
поделиться

1 ответ

Вот мое мнение. Я решил использовать только структуры данных Clojure, чтобы увидеть, как это сработает. Обратите внимание, что было бы совершенно обычным и идиоматическим решением взять очередь блокировки из набора инструментов Java и использовать ее здесь; я думаю, код будет легко адаптировать. Обновление: Я действительно адаптировал его к java.util.concurrent.LinkedBlockingQueue , см. Ниже.

clojure.lang.PersistentQueue

Вызов (pro-con) , чтобы начать тестовый прогон; затем просмотрите содержимое вывода output , чтобы узнать, произошло ли что-нибудь, и длины очереди , чтобы убедиться, что они остались в заданных пределах.

Обновление: Чтобы объяснить, почему я почувствовал необходимость использования обеспечения ниже (меня спросили об этом в IRC), это сделано для предотвращения перекоса записи (см. Статью в Википедии о Изоляция моментальных снимков для определения). Если бы я заменил @queue на (обеспечить очередь) , два или более производителей могли бы проверить длину очереди, найти, что она меньше 4, затем поместите дополнительные элементы в очереди и, возможно, увеличивают общую длину очереди выше 4, нарушая ограничение. Точно так же два потребителя, выполняющие @queue , могут принять один и тот же элемент для обработки, а затем удалить два элемента из очереди. гарантирует, что предотвращает любой из этих сценариев.

(def go-on? (atom true))
(def queue (ref clojure.lang.PersistentQueue/EMPTY))
(def output (ref ()))
(def queue-lengths (ref ()))
(def *max-queue-length* 4)

(defn overseer
  ([] (overseer 20000))
  ([timeout]
     (Thread/sleep timeout)
     (swap! go-on? not)))

(defn queue-length-watch [_ _ _ new-queue-state]
  (dosync (alter queue-lengths conj (count new-queue-state))))

(add-watch queue :queue-length-watch queue-length-watch)

(defn producer [tag]
  (future
   (while @go-on?
     (if (dosync (let [l (count (ensure queue))]
                   (when (< l *max-queue-length*)
                     (alter queue conj tag)
                     true)))
       (Thread/sleep (rand-int 2000))))))

(defn consumer []
  (future
   (while @go-on?
     (Thread/sleep 100)       ; don't look at the queue too often
     (when-let [item (dosync (let [item (first (ensure queue))]
                               (alter queue pop)
                               item))]
       (Thread/sleep (rand-int 500))         ; do stuff
       (dosync (alter output conj item)))))) ; and let us know

(defn pro-con []
  (reset! go-on? true)
  (dorun (map #(%1 %2)
              (repeat 5 producer)
              (iterate inc 0)))
  (dorun (repeatedly 2 consumer))
  (overseer))

java.util.concurrent.LinkedBlockingQueue

Версия выше, написанная с использованием LinkedBlockingQueue . Обратите внимание, что общая схема кода в основном такая же, а некоторые детали на самом деле немного чище. Я удалил длины очереди из этой версии, поскольку LBQ позаботится об этом ограничении за нас.

(def go-on? (atom true))
(def *max-queue-length* 4)
(def queue (java.util.concurrent.LinkedBlockingQueue. *max-queue-length*))
(def output (ref ()))

(defn overseer
  ([] (overseer 20000))
  ([timeout]
     (Thread/sleep timeout)
     (swap! go-on? not)))

(defn producer [tag]
  (future
   (while @go-on?
     (.put queue tag)
     (Thread/sleep (rand-int 2000)))))

(defn consumer []
  (future
   (while @go-on?
     ;; I'm using .poll on the next line so as not to block
     ;; indefinitely if we're done; note that this has the
     ;; side effect that nulls = nils on the queue will not
     ;; be handled; there's a number of other ways to go about
     ;; this if this is a problem, see docs on LinkedBlockingQueue
     (when-let [item (.poll queue)]
       (Thread/sleep (rand-int 500)) ; do stuff
       (dosync (alter output conj item)))))) ; and let us know

(defn pro-con []
  (reset! go-on? true)
  (dorun (map #(%1 %2)
              (repeat 5 producer)
              (iterate inc 0)))
  (dorun (repeatedly 2 consumer))
  (overseer))
24
ответ дан 29 November 2019 в 21:44
поделиться
Другие вопросы по тегам:

Похожие вопросы: