Я просто попытался измерить времена:
class Foo(object):
@classmethod
def singleton(self):
if not hasattr(self, 'instance'):
self.instance = Foo()
return self.instance
class Bar(object):
@classmethod
def singleton(self):
try:
return self.instance
except AttributeError:
self.instance = Bar()
return self.instance
from time import time
n = 1000000
foo = [Foo() for i in xrange(0,n)]
bar = [Bar() for i in xrange(0,n)]
print "Objs created."
print
for times in xrange(1,4):
t = time()
for d in foo: d.singleton()
print "#%d Foo pass in %f" % (times, time()-t)
t = time()
for d in bar: d.singleton()
print "#%d Bar pass in %f" % (times, time()-t)
print
На моей машине:
Objs created.
#1 Foo pass in 1.719000
#1 Bar pass in 1.140000
#2 Foo pass in 1.750000
#2 Bar pass in 1.187000
#3 Foo pass in 1.797000
#3 Bar pass in 1.203000
кажется, что попытка/кроме быстрее. Это кажется также более читаемым мне, так или иначе зависит от случая, этот тест был очень прост, возможно, Вам будет нужен более сложный.
I found out how to do it.
It is not about threads, and not about select().
When I run the first process (grep
), it creates two low-level file descriptors, one for each pipe. Lets call those a
and b
.
When I run the second process, b
gets passed to cut
sdtin
. But there is a brain-dead default on Popen
- close_fds=False
.
The effect of that is that cut
also inherits a
. So grep
can't die even if I close a
, because stdin is still open on cut
's process (cut
ignores it).
The following code now runs perfectly.
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read()
assert result == "Hello Worl\n"
close_fds=True
SHOULD BE THE DEFAULT on unix systems. On windows it closes all fds, so it prevents piping.
EDIT:
PS: For people with a similar problem reading this answer: As pooryorick said in a comment, that also could block if data written to p1.stdin
is bigger than the buffers. In that case you should chunk the data into smaller pieces, and use select.select()
to know when to read/write. The code in the question should give a hint on how to implement that.
EDIT2: Found another solution, with more help from pooryorick - instead of using close_fds=True
and close ALL fds, one could close the fd
s that belongs to the first process, when executing the second, and it will work. The closing must be done in the child so the preexec_fn
function from Popen comes very handy to do just that. On executing p2 you can do:
p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)
Это нужно делать в несколько потоков. В противном случае вы попадете в ситуацию, когда вы не сможете отправить данные: дочерний элемент p1 не будет читать ваш ввод, поскольку p2 не читает вывод p1, потому что вы не читаете вывод p2.
Итак, вам нужен фоновый поток, который читает то, что записывает p2. Это позволит p2 продолжить работу после записи некоторых данных в канал, чтобы он мог прочитать следующую строку ввода из p1, что снова позволяет p1 обрабатывать данные, которые вы ему отправляете.
В качестве альтернативы вы можете отправить данные на p1 с фоновым потоком и считайте вывод p2 в основном потоке. Но обе стороны должны быть нитью.
Я думаю, вы исследуете неправильную проблему. Конечно, как говорит Аарон, если вы пытаетесь быть и производителем в начале конвейера, и потребителем в конце конвейера, легко попасть в тупиковую ситуацию. Это проблема, которую решает общение ().
Коммуникация () не совсем подходит для вас, поскольку stdin и stdout находятся на разных объектах подпроцесса; но если вы посмотрите на реализацию в subprocess.py, вы увидите, что она делает именно то, что предлагал Аарон.
Как только вы увидите, что связь и чтение, и запись, вы увидите, что во второй попытке подключения () конкурирует с p2 за вывод p1:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
# ...
p1.communicate('data\n') # reads from p1.stdout, as does p2
Я работаю на win32,
Как насчет использования SpooledTemporaryFile? Это обходит (но, возможно, не решает) проблему:
http://docs.python.org/library/tempfile.html#tempfile.SpooledTemporaryFile
Вы можете писать в него как в файл, но на самом деле это блок памяти.
Или я совершенно не понимаю ...
Открытый универсальный тип - это только универсальный тип, для которого еще не задан тип (например, CargoCrate < T >
). Он становится «закрытым» после назначения конкретного типа (например, CargoCrate < виджет >
).
Например, предположим, что у вас есть что-то подобное:
public class Basket<T> {
T[] basketItems;
}
public class PicnicBlanket<T> {
Basket<T> picnicBasket; // Open type here. We don't know what T is.
}
// Closed type here: T is Food.
public class ParkPicnicBlanket : PicnicBlanket<Food> {
}
Здесь открыт тип picnicBasket
: T
еще ничего не назначено. Когда вы делаете бетонный PicnicBlanket с определенным типом - например, написав PicnicBlanket < Food > p = new PicnicBlanket < Food > ()
- теперь мы называем его закрытым .
Потому что они еще не изучили этот трюк:
class ExceptionUtils {
public static RuntimeException cloak(Throwable t) {
return ExceptionUtils.<RuntimeException>castAndRethrow(t);
}
@SuppressWarnings("unchecked")
private static <X extends Throwable> X castAndRethrow(Throwable t) throws X {
throw (X) t;
}
}
class Main {
public static void main(String[] args) { // Note no "throws" declaration
try {
// Do stuff that can throw IOException
} catch (IOException ex) {
// Pretend to throw RuntimeException, but really rethrowing the IOException
throw ExceptionUtils.cloak(ex);
}
}
}
-121--850777- Существует три основных метода, позволяющих сделать трубы работающими, как ожидалось
Убедитесь, что каждый конец трубы используется в различных резьбах/процессах (некоторые из примеров вблизи вершины страдают от этой проблемы).
явно закрыть неиспользуемый конец канала в каждом процессе
справиться с буферизацией, либо отключив его (параметр Python -u), используя pty's, или просто заполнение буфера чем-то, что не повлияет на данные, (возможно, '\n ', но все, что подходит).
Примеры в «конвейерном» модуле Python (я автор) соответствуют вашему сценарию точно, и сделать низкоуровневые шаги достаточно ясными.
http://pypi.python.org/pypi/pipeline/
Недавно я использовал модуль подпроцесса как часть потребительский диспетчер процессора производителя образец:
http://www.darkarchive.org/w/Pub/PythonInteract
Этот пример имеет дело с буферизированным stdin, не обращаясь к использованию имущества, и также показывает, какие концы труб должны быть закрыты, где. Я предпочитаю процессы многопоточность, но принцип тот же. Кроме того, он иллюстрирует синхронизация очередей, которые подают производителю и собирают выходные данные от потребителя, и как закрыть их чисто (следите за дозорными, вставленными в очереди). Этот образец позволяет генерировать новые входные данные на основе последних выходных данных. обеспечение рекурсивного обнаружения и обработки.
Предложенное Носкло решение быстро сломается, если на принимающий конец трубы будет записано слишком много данных:
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n' * 20000)
p1.stdin.close()
result = p2.stdout.read()
assert result == "Hello Worl\n"
Если этот скрипт не зависнет на вашей машине, просто увеличьте "20000" до размера, превышающего размер буфера трубы вашей операционной системы.
Это происходит потому, что операционная система буферизирует входные данные для "grep", но когда этот буфер заполнится, вызов p1.stdin.write
будет блокироваться, пока что-нибудь не прочитает из p2.stdout
. В игрушечных сценариях можно обойтись записью в трубу и чтением из трубы в одном и том же процессе, но при обычном использовании необходимо писать из одного потока/процесса и читать из отдельного потока/процесса. Это справедливо для subprocess.popen, os.pipe, os.popen* и т.д.
Другая изюминка заключается в том, что иногда вы хотите продолжать кормить трубу элементами, сгенерированными из предыдущего вывода той же трубы. Решение состоит в том, чтобы сделать как податчик, так и считыватель трубок асинхронными по отношению к программе man, и реализовать две очереди: одну между основной программой и податчиком трубок, другую между основной программой и считывателем трубок. PythonInteract является примером этого.
Subprocess - хорошая удобная модель, но поскольку она скрывает детали вызовов os.popen и os.fork, которые она выполняет под капотом, с ней иногда бывает сложнее работать, чем с используемыми ею вызовами более низкого уровня. По этой причине subprocess не является хорошим способом узнать, как на самом деле работают межпроцессные каналы.
С помощью прямой сериализации можно выполнить следующие действия:
С помощью отражения, но с большим количеством кода можно выполнить следующие действия: http://msmvps.com/blogs/matthieu/archive/2008/05/31/entity-cloner.aspx
-121--1040611-Это копии файлов перед их возвратом. Если они вам не нужны, вы можете удалить их либо вручную, либо с помощью расширения Очистить :
hg clean
-121--940703- В одном из комментариев выше я попросил nosklo либо опубликовать какой-либо код, чтобы подкрепить его утверждения о select.select
, либо повысить мои ответы, которые у него были Он ответил следующим кодом:
from subprocess import Popen, PIPE
import select
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
data_to_write = 100000 * 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0
while to_read or to_write:
read_now, write_now, xlist = select.select(to_read, to_write, [])
if read_now:
data = p2.stdout.read(1024)
if not data:
p2.stdout.close()
to_read = []
else:
b.append(data)
if write_now:
if written < len(data_to_write):
part = data_to_write[written:written+1024]
written += len(part)
p1.stdin.write(part); p1.stdin.flush()
else:
p1.stdin.close()
to_write = []
print b
Одна проблема с этим сценарием заключается в том, что он второй догадывается размер/характер буферы системных каналов. При удалении скрипта произойдет меньше сбоев магические числа, такие как 1024.
Большая проблема заключается в том, что этот код скрипта работает только в соответствии с правом сочетание ввода данных и внешних программ. grep и сократить обе работы с линии, и поэтому их внутренние буферы ведут себя немного иначе. Если мы используем более универсальная команда, такая как «cat», и записывать меньшие биты данных в канал, фатальное состояние гонки будет всплывать чаще:
from subprocess import Popen, PIPE
import select
import time
p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
data_to_write = 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0
while to_read or to_write:
time.sleep(1)
read_now, write_now, xlist = select.select(to_read, to_write, [])
if read_now:
print 'I am reading now!'
data = p2.stdout.read(1024)
if not data:
p1.stdout.close()
to_read = []
else:
b.append(data)
if write_now:
print 'I am writing now!'
if written < len(data_to_write):
part = data_to_write[written:written+1024]
written += len(part)
p1.stdin.write(part); p1.stdin.flush()
else:
print 'closing file'
p1.stdin.close()
to_write = []
print b
В этом случае будут проявляться два разных результата:
write, write, close file, read -> success
write, read -> hang
Так что опять же, я вызываю nosklo к любому почтовому коду, показывающему использование
select.выберите
для обработки произвольного ввода и буферизации каналов из
одиночный поток или для активизации моих ответов.
Итог: не пытайтесь манипулировать обоими концами трубы из одной резьбы. Это просто не стоит того. Посмотрите трубопровод для хорошего низкого уровня пример того, как это сделать правильно.
Ответ на утверждение nosklo (см. Другие комментарии к этому вопросу) о том, что это невозможно сделать без close_fds = True
:
close_fds = True
необходимо только в том случае, если вы ушли открываются другие дескрипторы файлов
. При открытии нескольких дочерних процессов всегда полезно
отслеживать открытые файлы, которые могут быть унаследованы, и явно закрывать все
, которые не нужны:
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p1.stdin.write('Hello World\n')
p1.stdin.close()
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
result = p2.stdout.read()
assert result == "Hello Worl\n"
close_fds
по умолчанию False
, потому что подпроцесс
предпочитает доверять вызывающей программе, чтобы знать, что она делает с дескрипторами открытых файлов
, и просто предоставляет вызывающей стороне простой способ закрыть их все {{ 1}}, если он этого хочет.
Но настоящая проблема в том, что буферы для трубок будут укусить вас за все, кроме игрушечных примеров. Как я уже говорил в других моих ответах на этот вопрос, практическое правило - не иметь ваш читатель и ваш писатель открываются в одном процессе / потоке. Любой , кто хочет использовать модуль подпроцесса для двусторонней связи, будет хорошо подготовлен для изучения os.pipe и os.fork в первую очередь. На самом деле их не так использовать, если у вас есть хороший пример , на который стоит взглянуть.
Вот пример использования Popen вместе с os.fork для достижения того же самого
того же самого. Вместо использования close_fds
он просто закрывает трубы в
нужных местах. Это намного проще, чем пытаться использовать select.select
, и
использует все преимущества системных буферов труб.
from subprocess import Popen, PIPE
import os
import sys
p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)
pid = os.fork()
if pid: #parent
p1.stdin.close()
p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE)
data = p2.stdout.read()
sys.stdout.write(data)
p2.stdout.close()
else: #child
data_to_write = 'hello world\n' * 100000
p1.stdin.write(data_to_write)
p1.stdin.close()