Clojure агенты, потребляющие из очереди

Когда вы объявляете ссылочную переменную (т. е. объект), вы действительно создаете указатель на объект. Рассмотрим следующий код, в котором вы объявляете переменную примитивного типа int:

int x;
x = 10;

В этом примере переменная x является int, и Java инициализирует ее для 0. Когда вы назначаете его 10 во второй строке, ваше значение 10 записывается в ячейку памяти, на которую указывает x.

Но когда вы пытаетесь объявить ссылочный тип, произойдет что-то другое. Возьмите следующий код:

Integer num;
num = new Integer(10);

Первая строка объявляет переменную с именем num, но она не содержит примитивного значения. Вместо этого он содержит указатель (потому что тип Integer является ссылочным типом). Поскольку вы еще не указали, что указать на Java, он устанавливает значение null, что означает «Я ничего не указываю».

Во второй строке ключевое слово new используется для создания экземпляра (или создания ) объекту типа Integer и переменной указателя num присваивается этот объект. Теперь вы можете ссылаться на объект, используя оператор разыменования . (точка).

Exception, о котором вы просили, возникает, когда вы объявляете переменную, но не создавали объект. Если вы попытаетесь разыменовать num. Перед созданием объекта вы получите NullPointerException. В самых тривиальных случаях компилятор поймает проблему и сообщит вам, что «num не может быть инициализирован», но иногда вы пишете код, который непосредственно не создает объект.

Например, вы можете имеют следующий метод:

public void doSomething(SomeObject obj) {
   //do something to obj
}

В этом случае вы не создаете объект obj, скорее предполагая, что он был создан до вызова метода doSomething. К сожалению, этот метод можно вызвать следующим образом:

doSomething(null);

В этом случае obj имеет значение null. Если метод предназначен для того, чтобы что-то сделать для переданного объекта, целесообразно бросить NullPointerException, потому что это ошибка программиста, и программисту понадобится эта информация для целей отладки.

Альтернативно, там могут быть случаи, когда цель метода заключается не только в том, чтобы работать с переданным в объекте, и поэтому нулевой параметр может быть приемлемым. В этом случае вам нужно будет проверить нулевой параметр и вести себя по-другому. Вы также должны объяснить это в документации. Например, doSomething может быть записано как:

/**
  * @param obj An optional foo for ____. May be null, in which case 
  *  the result will be ____.
  */
public void doSomething(SomeObject obj) {
    if(obj != null) {
       //do something
    } else {
       //do something else
    }
}

Наконец, Как определить исключение & amp; причина использования Трассировки стека

26
задан erikcw 8 April 2010 в 19:28
поделиться

4 ответа

Возможно, вы могли бы использовать функцию seque? Цитируя (doc seque):

clojure.core/seque
([s] [n-or-q s])
  Creates a queued seq on another (presumably lazy) seq s. The queued
  seq will produce a concrete seq in the background, and can get up to
  n items ahead of the consumer. n-or-q can be an integer n buffer
  size, or an instance of java.util.concurrent BlockingQueue. Note
  that reading from a seque can block if the reader gets ahead of the
  producer.

Я имею в виду ленивую последовательность получения элементов очереди по сети; Вы обернули бы это в seque, поместили это в Ссылку и попросили рабочих Агентов потреблять предметы из этого seque. seque возвращает что-то, похожее на обычный seq с точки зрения вашего кода, с магией очереди, происходящей прозрачным образом. Обратите внимание, что если последовательность, которую вы помещаете внутрь, является чанковой, то она все равно будет принудительно чанковой за один раз. Также обратите внимание, что первоначальный вызов самого seque, кажется, блокируется до тех пор, пока не будет получен один или два начальных элемента (или порция, в зависимости от обстоятельств; я думаю, что это больше связано с тем, как работают ленивые последовательности, чем с самим seque). хотя).

Набросок кода ( действительно схематичный, совсем не тестировался):

(defn get-queue-items-seq []
  (lazy-seq
   (cons (get-queue-item)
         (get-queue-items-seq))))

(def task-source (ref (seque (get-queue-items-seq))))

(defn do-stuff []
  (let [worker (agent nil)]
    (if-let [result
             (dosync
               (when-let [task (first @task-source)]
                (send worker (fn [_] (do-stuff-with task)))))]
      (do (await worker)
          ;; maybe do something with worker's state
          (do-stuff))))) ;; continue working

(defn do-lots-of-stuff []
  (let [fs (doall (repeatedly 20 #(future (do-stuff))))]
    fs)))

На самом деле вам, вероятно, понадобится более сложный производитель элемента очереди seq так что вы можете попросить его прекратить производство новых предметов (необходимость, чтобы все было в состоянии изящно завершить работу; фьючерсы умрут, когда источник задачи иссякнет, используйте future-done?, чтобы увидеть, выполнили ли они так уже). И это только то, что я вижу на первый взгляд ... Я уверен, что здесь есть еще кое-что для полировки. Я думаю, что общий подход будет работать, хотя.

4
ответ дан Michał Marczyk 8 April 2010 в 19:28
поделиться
  • 1
    Для хранения больше, чем наборы данных RAM, disk.frame может быть альтернатива MonetDbLite. Я надеюсь, что это делает к CRAN рано. – San 25 November 2018 в 07:33
  • 2
    Для хранения больше, чем наборы данных RAM, disk.frame может быть альтернатива MonetDbLite. Я надеюсь, что это делает к CRAN рано. – San 25 November 2018 в 07:33
  • 3
    Для хранения больше, чем наборы данных RAM, disk.frame может быть альтернатива MonetDbLite. Я надеюсь, что это делает к CRAN рано. – San 25 November 2018 в 07:33
  • 4
    Для хранения больше, чем наборы данных RAM, disk.frame может быть альтернатива MonetDbLite. Я надеюсь, что это делает к CRAN рано. – San 25 November 2018 в 07:33
  • 5
    Для хранения больше, чем наборы данных RAM, disk.frame может быть альтернатива MonetDbLite. Я надеюсь, что это делает к CRAN рано. – San 25 November 2018 в 07:33

Не уверен, насколько это идиоматично, так как я все еще новичок с языком, но мне подходит следующее решение:

(let [number-of-messages-per-time 2
      await-timeout 1000]
  (doseq [p-messages (partition number-of-messages-per-time messages)]
    (let [agents (map agent p-messages)]
      (doseq [a agents] (send-off a process))
      (apply await-for await-timeout agents)
      (map deref agents))))
0
ответ дан Marco Lazzeri 8 April 2010 в 19:28
поделиться
(let [switch (atom true) ; a switch to stop workers
      workers (doall 
                (repeatedly 20 ; 20 workers pulling and processing items from SQS
                  #(future (while @switch 
                             (retrieve item from Amazon SQS and process)))))]
  (Thread/sleep 100000) ; arbitrary rule to decide when to stop ;-)
  (reset! switch false) ; stop !
  (doseq [worker workers] @worker)) ; waiting for all workers to be done
23
ответ дан cgrand 8 April 2010 в 19:28
поделиться
  • 1
    disk.frame большие взгляды и это включает поддержку двух из моих любимых пакетов - data.table и fst, которые являются среди самого эффективного из их вида. Можно ли любезно указать на дальнейшую документацию/примеры disk.frame кроме этого доступного на странице GitHub. – San 24 November 2018 в 16:24
  • 2
    disk.frame большие взгляды и это включает поддержку двух из моих любимых пакетов - data.table и fst, которые являются среди самого эффективного из их вида. Можно ли любезно указать на дальнейшую документацию/примеры disk.frame кроме этого доступного на странице GitHub. – San 24 November 2018 в 16:24
  • 3
    disk.frame большие взгляды и это включает поддержку двух из моих любимых пакетов - data.table и fst, которые являются среди самого эффективного из их вида. Можно ли любезно указать на дальнейшую документацию/примеры disk.frame кроме этого доступного на странице GitHub. – San 24 November 2018 в 16:24
  • 4
    disk.frame большие взгляды и это включает поддержку двух из моих любимых пакетов - data.table и fst, которые являются среди самого эффективного из их вида. Можно ли любезно указать на дальнейшую документацию/примеры disk.frame кроме этого доступного на странице GitHub. – San 24 November 2018 в 16:24
  • 5
    disk.frame большие взгляды и это включает поддержку двух из моих любимых пакетов - data.table и fst, которые являются среди самого эффективного из их вида. Можно ли любезно указать на дальнейшую документацию/примеры disk.frame кроме этого доступного на странице GitHub. – San 24 November 2018 в 16:24

То, что вы просите, - это способ раздачи заданий, но с некоторым верхним пределом. Один простой подход к этому - использовать семафор для координации предела. Вот как я бы подошел к этому:

(let [limit (.availableProcessors (Runtime/getRuntime))
      ; note: you might choose limit 20 based upon your problem description
      sem (java.util.concurrent.Semaphore. limit)]
  (defn submit-future-call
    "Takes a function of no args and yields a future object that will
    invoke the function in another thread, and will cache the result and
    return it on all subsequent calls to deref/@. If the computation has
    not yet finished, calls to deref/@ will block. 
    If n futures have already been submitted, then submit-future blocks
    until the completion of another future, where n is the number of
    available processors."  
    [#^Callable task]
    ; take a slot (or block until a slot is free)
    (.acquire sem)
    (try
      ; create a future that will free a slot on completion
      (future (try (task) (finally (.release sem))))
      (catch java.util.concurrent.RejectedExecutionException e
        ; no task was actually submitted
        (.release sem)
        (throw e)))))

(defmacro submit-future
  "Takes a body of expressions and yields a future object that will
  invoke the body in another thread, and will cache the result and
  return it on all subsequent calls to deref/@. If the computation has
  not yet finished, calls to deref/@ will block.
  If n futures have already been submitted, then submit-future blocks
  until the completion of another future, where n is the number of
  available processors."  
  [& body] `(submit-future-call (fn [] ~@body)))

#_(example
    user=> (submit-future (reduce + (range 100000000)))
    #<core$future_call$reify__5782@6c69d02b: :pending>
    user=> (submit-future (reduce + (range 100000000)))
    #<core$future_call$reify__5782@38827968: :pending>
    user=> (submit-future (reduce + (range 100000000)))
    ;; blocks at this point for a 2 processor PC until the previous
    ;; two futures complete
    #<core$future_call$reify__5782@214c4ac9: :pending>
    ;; then submits the job

Теперь, когда вам это нужно, вам просто нужно скоординировать выполнение самих задач. Похоже, у вас уже есть механизмы для этого. Цикл (submit-future (process-queue-item))

6
ответ дан Timothy Pratley 8 April 2010 в 19:28
поделиться
  • 1
    @san я пишу им в данный момент. Можно проверить папку виньетки или войти в inst/fannie_mae для большего количества примеров – xiaodai 24 November 2018 в 20:46
  • 2
    @san я пишу им в данный момент. Можно проверить папку виньетки или войти в inst/fannie_mae для большего количества примеров – xiaodai 24 November 2018 в 20:46
  • 3
    @san я пишу им в данный момент. Можно проверить папку виньетки или войти в inst/fannie_mae для большего количества примеров – xiaodai 24 November 2018 в 20:46
  • 4
    @san я пишу им в данный момент. Можно проверить папку виньетки или войти в inst/fannie_mae для большего количества примеров – xiaodai 24 November 2018 в 20:46
  • 5
    @san я пишу им в данный момент. Можно проверить папку виньетки или войти в inst/fannie_mae для большего количества примеров – xiaodai 24 November 2018 в 20:46
Другие вопросы по тегам:

Похожие вопросы: