python: чтение вывода подпроцесса в потоках

У меня есть исполняемый файл, который я вызываю с помощью subprocess.Popen. Затем я намерен передать ему некоторые данные через стандартный ввод, используя поток, который считывает его значение из очереди, которая позже будет заполнена в другом потоке.Вывод должен быть прочитан с использованием канала stdout в другом потоке и снова отсортирован в очереди.

Насколько я понял из своего предыдущего исследования, использование потоков с Queue является хорошей практикой.

Внешний исполняемый файл, к сожалению, не даст мне быстрого ответа для каждой переданной строки, так что простые циклы записи, чтения не вариант. Исполняемый файл реализует некоторую внутреннюю многопоточность, и мне нужен вывод, как только он станет доступен, поэтому дополнительный поток чтения.

В качестве примера для тестирования исполняемый файл будет просто перемешивать каждую строку (shuffleline.py):

#!/usr/bin/python -u
import sys
from random import shuffle

for line in sys.stdin:
    line = line.strip()

    # shuffle line
    line = list(line)
    shuffle(line)
    line = "".join(line)

    sys.stdout.write("%s\n"%(line))
    sys.stdout.flush() # avoid buffers

Обратите внимание, что это уже настолько небуферизовано, насколько это возможно. Или нет? Это моя урезанная тестовая программа:

#!/usr/bin/python -u
import sys
import Queue
import threading
import subprocess

class WriteThread(threading.Thread):
    def __init__(self, p_in, source_queue):
        threading.Thread.__init__(self)
        self.pipe = p_in
        self.source_queue = source_queue

    def run(self):
        while True:
            source = self.source_queue.get()
            print "writing to process: ", repr(source)
            self.pipe.write(source)
            self.pipe.flush()
            self.source_queue.task_done()

class ReadThread(threading.Thread):
    def __init__(self, p_out, target_queue):
        threading.Thread.__init__(self)
        self.pipe = p_out
        self.target_queue = target_queue

    def run(self):
        while True:
            line = self.pipe.readline() # blocking read
            if line == '':
                break
            print "reader read: ", line.rstrip()
            self.target_queue.put(line)

if __name__ == "__main__":

    cmd = ["python", "-u", "./shuffleline.py"] # unbuffered
    proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE)

    source_queue = Queue.Queue()
    target_queue = Queue.Queue()

    writer = WriteThread(proc.stdin, source_queue)
    writer.setDaemon(True)
    writer.start()

    reader = ReadThread(proc.stdout, target_queue)
    reader.setDaemon(True)
    reader.start()

    # populate queue
    for i in range(10):
        source_queue.put("string %s\n" %i)
    source_queue.put("")

    print "source_queue empty: ", source_queue.empty()
    print "target_queue empty: ", target_queue.empty()

    import time
    time.sleep(2) # expect some output from reader thread

    source_queue.join() # wait until all items in source_queue are processed
    proc.stdin.close()  # should end the subprocess
    proc.wait()

это дает следующий вывод (python2.7):

writing to process:  'string 0\n'
writing to process:  'string 1\n'
writing to process:  'string 2\n'
writing to process:  'string 3\n'
writing to process:  'string 4\n'
writing to process:  'string 5\n'
writing to process:  'string 6\n'
source_queue empty: writing to process:  'string 7\n'
writing to process:  'string 8\n'
writing to process:  'string 9\n'
writing to process:  ''
 True
target_queue empty:  True

затем ничего в течение 2 секунд ...

reader read:  rgsn0i t
reader read:  nrg1sti
reader read:  tis n2rg
reader read:  snt gri3
reader read:  nsri4 tg
reader read:  stir5 gn
reader read:   gnri6ts
reader read:   ngrits7
reader read:  8nsrt ig
reader read:  sg9 nitr

Чередование в начале ожидается. Однако выходные данные подпроцесса не появляются до после завершения подпроцесса. С большим количеством переданных строк я получаю некоторый вывод, поэтому я предполагаю проблему с кэшированием в канале stdout. Согласно другим вопросам, размещенным здесь, очистка стандартного вывода (в подпроцессе) должна работать, по крайней мере, в Linux.

10
задан muckl 21 March 2012 в 19:19
поделиться