Дамп многопроцессорной обработки. Очередь в список

  • не - volatile переменные могут кэшироваться поток локально, таким образом, различные потоки могут видеть различные значения одновременно; volatile предотвращает это ( источник )
  • , записи к переменным 32 битов или меньший, как гарантируют, будут атомарными ( подразумеваемый здесь ); не так для long и double, хотя JVMs на 64 бита, вероятно, реализуют их как атомарные операции
21
задан pR0Ps 6 April 2016 в 14:37
поделиться

1 ответ

Try this:

import Queue
import time

def dump_queue(queue):
    """
    Empties all pending items in a queue and returns them in a list.
    """
    result = []

    for i in iter(queue.get, 'STOP'):
        result.append(i)
    time.sleep(.1)
    return result

import multiprocessing
q = multiprocessing.Queue()
for i in range(100):
    q.put([range(200) for j in range(100)])
q.put('STOP')
l=dump_queue(q)
print len(l)

Multiprocessing queues have an internal buffer which has a feeder thread which pulls work off a buffer and flushes it to the pipe. If not all of the objects have been flushed, I could see a case where Empty is raised prematurely. Using a sentinel to indicate the end of the queue is safe (and reliable). Also, using the iter(get, sentinel) idiom is just better than relying on Empty.

I don't like that it could raise empty due to flushing timing (I added the time.sleep(.1) to allow a context switch to the feeder thread, you may not need it, it works without it - it's a habit to release the GIL).

25
ответ дан 29 November 2019 в 21:28
поделиться
Другие вопросы по тегам:

Похожие вопросы: