блоки - отправляют вход на конвейер подпроцесса Python

Я просто попытался измерить времена:

class Foo(object):
    @classmethod
    def singleton(self):
        if not hasattr(self, 'instance'):
            self.instance = Foo()
        return self.instance



class Bar(object):
    @classmethod
    def singleton(self):
        try:
            return self.instance
        except AttributeError:
            self.instance = Bar()
            return self.instance



from time import time

n = 1000000
foo = [Foo() for i in xrange(0,n)]
bar = [Bar() for i in xrange(0,n)]

print "Objs created."
print


for times in xrange(1,4):
    t = time()
    for d in foo: d.singleton()
    print "#%d Foo pass in %f" % (times, time()-t)

    t = time()
    for d in bar: d.singleton()
    print "#%d Bar pass in %f" % (times, time()-t)

    print

На моей машине:

Objs created.

#1 Foo pass in 1.719000
#1 Bar pass in 1.140000

#2 Foo pass in 1.750000
#2 Bar pass in 1.187000

#3 Foo pass in 1.797000
#3 Bar pass in 1.203000

кажется, что попытка/кроме быстрее. Это кажется также более читаемым мне, так или иначе зависит от случая, этот тест был очень прост, возможно, Вам будет нужен более сложный.

34
задан 3 revs 23 October 2009 в 02:06
поделиться

9 ответов

I found out how to do it.

It is not about threads, and not about select().

When I run the first process (grep), it creates two low-level file descriptors, one for each pipe. Lets call those a and b.

When I run the second process, b gets passed to cut sdtin. But there is a brain-dead default on Popen - close_fds=False.

The effect of that is that cut also inherits a. So grep can't die even if I close a, because stdin is still open on cut's process (cut ignores it).

The following code now runs perfectly.

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds=True SHOULD BE THE DEFAULT on unix systems. On windows it closes all fds, so it prevents piping.

EDIT:

PS: For people with a similar problem reading this answer: As pooryorick said in a comment, that also could block if data written to p1.stdin is bigger than the buffers. In that case you should chunk the data into smaller pieces, and use select.select() to know when to read/write. The code in the question should give a hint on how to implement that.

EDIT2: Found another solution, with more help from pooryorick - instead of using close_fds=True and close ALL fds, one could close the fds that belongs to the first process, when executing the second, and it will work. The closing must be done in the child so the preexec_fn function from Popen comes very handy to do just that. On executing p2 you can do:

p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)
21
ответ дан 27 November 2019 в 17:10
поделиться

Это нужно делать в несколько потоков. В противном случае вы попадете в ситуацию, когда вы не сможете отправить данные: дочерний элемент p1 не будет читать ваш ввод, поскольку p2 не читает вывод p1, потому что вы не читаете вывод p2.

Итак, вам нужен фоновый поток, который читает то, что записывает p2. Это позволит p2 продолжить работу после записи некоторых данных в канал, чтобы он мог прочитать следующую строку ввода из p1, что снова позволяет p1 обрабатывать данные, которые вы ему отправляете.

В качестве альтернативы вы можете отправить данные на p1 с фоновым потоком и считайте вывод p2 в основном потоке. Но обе стороны должны быть нитью.

2
ответ дан 27 November 2019 в 17:10
поделиться

Я думаю, вы исследуете неправильную проблему. Конечно, как говорит Аарон, если вы пытаетесь быть и производителем в начале конвейера, и потребителем в конце конвейера, легко попасть в тупиковую ситуацию. Это проблема, которую решает общение ().

Коммуникация () не совсем подходит для вас, поскольку stdin и stdout находятся на разных объектах подпроцесса; но если вы посмотрите на реализацию в subprocess.py, вы увидите, что она делает именно то, что предлагал Аарон.

Как только вы увидите, что связь и чтение, и запись, вы увидите, что во второй попытке подключения () конкурирует с p2 за вывод p1:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
# ...
p1.communicate('data\n')       # reads from p1.stdout, as does p2

Я работаю на win32,

1
ответ дан 27 November 2019 в 17:10
поделиться

Как насчет использования SpooledTemporaryFile? Это обходит (но, возможно, не решает) проблему:

http://docs.python.org/library/tempfile.html#tempfile.SpooledTemporaryFile

Вы можете писать в него как в файл, но на самом деле это блок памяти.

Или я совершенно не понимаю ...

0
ответ дан 27 November 2019 в 17:10
поделиться

Открытый универсальный тип - это только универсальный тип, для которого еще не задан тип (например, CargoCrate < T > ). Он становится «закрытым» после назначения конкретного типа (например, CargoCrate < виджет > ).

Например, предположим, что у вас есть что-то подобное:

public class Basket<T> {
  T[] basketItems;
}

public class PicnicBlanket<T> {
  Basket<T> picnicBasket;   // Open type here. We don't know what T is.
}

                                 // Closed type here: T is Food.
public class ParkPicnicBlanket : PicnicBlanket<Food> {
}

Здесь открыт тип picnicBasket : T еще ничего не назначено. Когда вы делаете бетонный PicnicBlanket с определенным типом - например, написав PicnicBlanket < Food > p = new PicnicBlanket < Food > () - теперь мы называем его закрытым .

-121--701028-

Потому что они еще не изучили этот трюк:

class ExceptionUtils {
    public static RuntimeException cloak(Throwable t) {
        return ExceptionUtils.<RuntimeException>castAndRethrow(t);
    }

    @SuppressWarnings("unchecked")
    private static <X extends Throwable> X castAndRethrow(Throwable t) throws X {
        throw (X) t;
    }
}

class Main {
    public static void main(String[] args) { // Note no "throws" declaration
        try {
            // Do stuff that can throw IOException
        } catch (IOException ex) {
            // Pretend to throw RuntimeException, but really rethrowing the IOException
            throw ExceptionUtils.cloak(ex);
        }
    }
}
-121--850777-

Существует три основных метода, позволяющих сделать трубы работающими, как ожидалось

  1. Убедитесь, что каждый конец трубы используется в различных резьбах/процессах (некоторые из примеров вблизи вершины страдают от этой проблемы).

  2. явно закрыть неиспользуемый конец канала в каждом процессе

  3. справиться с буферизацией, либо отключив его (параметр Python -u), используя pty's, или просто заполнение буфера чем-то, что не повлияет на данные, (возможно, '\n ', но все, что подходит).

Примеры в «конвейерном» модуле Python (я автор) соответствуют вашему сценарию точно, и сделать низкоуровневые шаги достаточно ясными.

http://pypi.python.org/pypi/pipeline/

Недавно я использовал модуль подпроцесса как часть потребительский диспетчер процессора производителя образец:

http://www.darkarchive.org/w/Pub/PythonInteract

Этот пример имеет дело с буферизированным stdin, не обращаясь к использованию имущества, и также показывает, какие концы труб должны быть закрыты, где. Я предпочитаю процессы многопоточность, но принцип тот же. Кроме того, он иллюстрирует синхронизация очередей, которые подают производителю и собирают выходные данные от потребителя, и как закрыть их чисто (следите за дозорными, вставленными в очереди). Этот образец позволяет генерировать новые входные данные на основе последних выходных данных. обеспечение рекурсивного обнаружения и обработки.

3
ответ дан 27 November 2019 в 17:10
поделиться

Предложенное Носкло решение быстро сломается, если на принимающий конец трубы будет записано слишком много данных:


from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n' * 20000)
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

Если этот скрипт не зависнет на вашей машине, просто увеличьте "20000" до размера, превышающего размер буфера трубы вашей операционной системы.

Это происходит потому, что операционная система буферизирует входные данные для "grep", но когда этот буфер заполнится, вызов p1.stdin.write будет блокироваться, пока что-нибудь не прочитает из p2.stdout. В игрушечных сценариях можно обойтись записью в трубу и чтением из трубы в одном и том же процессе, но при обычном использовании необходимо писать из одного потока/процесса и читать из отдельного потока/процесса. Это справедливо для subprocess.popen, os.pipe, os.popen* и т.д.

Другая изюминка заключается в том, что иногда вы хотите продолжать кормить трубу элементами, сгенерированными из предыдущего вывода той же трубы. Решение состоит в том, чтобы сделать как податчик, так и считыватель трубок асинхронными по отношению к программе man, и реализовать две очереди: одну между основной программой и податчиком трубок, другую между основной программой и считывателем трубок. PythonInteract является примером этого.

Subprocess - хорошая удобная модель, но поскольку она скрывает детали вызовов os.popen и os.fork, которые она выполняет под капотом, с ней иногда бывает сложнее работать, чем с используемыми ею вызовами более низкого уровня. По этой причине subprocess не является хорошим способом узнать, как на самом деле работают межпроцессные каналы.

3
ответ дан 27 November 2019 в 17:10
поделиться

С помощью прямой сериализации можно выполнить следующие действия:

http://social.msdn.microsoft.com/forums/en-US/adodotnetentityframework/thread/a967b44b-c85c-4afd-a499-f6ff604e2139/

С помощью отражения, но с большим количеством кода можно выполнить следующие действия: http://msmvps.com/blogs/matthieu/archive/2008/05/31/entity-cloner.aspx

-121--1040611-

Это копии файлов перед их возвратом. Если они вам не нужны, вы можете удалить их либо вручную, либо с помощью расширения Очистить :

hg clean
-121--940703-

В одном из комментариев выше я попросил nosklo либо опубликовать какой-либо код, чтобы подкрепить его утверждения о select.select , либо повысить мои ответы, которые у него были Он ответил следующим кодом:

from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)

data_to_write = 100000 * 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0


while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = p2.stdout.read(1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if written < len(data_to_write):
            part = data_to_write[written:written+1024]
            written += len(part)
            p1.stdin.write(part); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

Одна проблема с этим сценарием заключается в том, что он второй догадывается размер/характер буферы системных каналов. При удалении скрипта произойдет меньше сбоев магические числа, такие как 1024.

Большая проблема заключается в том, что этот код скрипта работает только в соответствии с правом сочетание ввода данных и внешних программ. grep и сократить обе работы с линии, и поэтому их внутренние буферы ведут себя немного иначе. Если мы используем более универсальная команда, такая как «cat», и записывать меньшие биты данных в канал, фатальное состояние гонки будет всплывать чаще:

from subprocess import Popen, PIPE
import select
import time

p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE, close_fds=True)

data_to_write = 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0


while to_read or to_write:
    time.sleep(1)
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        print 'I am reading now!'
        data = p2.stdout.read(1024)
        if not data:
            p1.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        print 'I am writing now!'
        if written < len(data_to_write):
            part = data_to_write[written:written+1024]
            written += len(part)
            p1.stdin.write(part); p1.stdin.flush()
        else:
            print 'closing file'
            p1.stdin.close()
            to_write = []

print b

В этом случае будут проявляться два разных результата:

write, write, close file, read -> success
write, read -> hang

Так что опять же, я вызываю nosklo к любому почтовому коду, показывающему использование select.выберите для обработки произвольного ввода и буферизации каналов из одиночный поток или для активизации моих ответов.

Итог: не пытайтесь манипулировать обоими концами трубы из одной резьбы. Это просто не стоит того. Посмотрите трубопровод для хорошего низкого уровня пример того, как это сделать правильно.

1
ответ дан 27 November 2019 в 17:10
поделиться

Ответ на утверждение nosklo (см. Другие комментарии к этому вопросу) о том, что это невозможно сделать без close_fds = True :

close_fds = True необходимо только в том случае, если вы ушли открываются другие дескрипторы файлов . При открытии нескольких дочерних процессов всегда полезно отслеживать открытые файлы, которые могут быть унаследованы, и явно закрывать все , которые не нужны:

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p1.stdin.write('Hello World\n')
p1.stdin.close()
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds по умолчанию False , потому что подпроцесс предпочитает доверять вызывающей программе, чтобы знать, что она делает с дескрипторами открытых файлов , и просто предоставляет вызывающей стороне простой способ закрыть их все {{ 1}}, если он этого хочет.

Но настоящая проблема в том, что буферы для трубок будут укусить вас за все, кроме игрушечных примеров. Как я уже говорил в других моих ответах на этот вопрос, практическое правило - не иметь ваш читатель и ваш писатель открываются в одном процессе / потоке. Любой , кто хочет использовать модуль подпроцесса для двусторонней связи, будет хорошо подготовлен для изучения os.pipe и os.fork в первую очередь. На самом деле их не так использовать, если у вас есть хороший пример , на который стоит взглянуть.

2
ответ дан 27 November 2019 в 17:10
поделиться

Вот пример использования Popen вместе с os.fork для достижения того же самого того же самого. Вместо использования close_fds он просто закрывает трубы в нужных местах. Это намного проще, чем пытаться использовать select.select, и использует все преимущества системных буферов труб.

from subprocess import Popen, PIPE
import os
import sys

p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)

pid = os.fork()

if pid: #parent
    p1.stdin.close()
    p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE)
    data = p2.stdout.read()
    sys.stdout.write(data)
    p2.stdout.close()

else: #child
    data_to_write = 'hello world\n' * 100000
    p1.stdin.write(data_to_write)
    p1.stdin.close()
-1
ответ дан 27 November 2019 в 17:10
поделиться
Другие вопросы по тегам:

Похожие вопросы: