Если скорость и память не являются никакой проблемой, , dom4j является действительно хорошим вариантом. При необходимости в скорости с помощью синтаксического анализатора StAX как , Woodstox является правильным путем, но необходимо записать больше кода для добиваний цели, и необходимо привыкнуть для обработки XML в потоках.
Это не потокобезопасно; одновременные вызовы могут чередоваться и портить локальные переменные.
Общий подход заключается в использовании шаблона главный-подчиненный (теперь называемого шаблоном фермер-рабочий на ПК). Создайте третий поток, который генерирует данные, и добавьте очередь между ведущим и ведомыми устройствами, где ведомые устройства будут читать из очереди, а ведущее устройство будет писать в нее. Стандартный модуль очереди обеспечивает необходимую безопасность потоков и блокирует ведущее устройство до тех пор, пока ведомые устройства не будут готовы к чтению дополнительных данных.
Отредактировано для добавления теста ниже.
Вы можете обернуть генератор блокировкой. Например,
import threading
class LockedIterator(object):
def __init__(self, it):
self.lock = threading.Lock()
self.it = it.__iter__()
def __iter__(self): return self
def next(self):
self.lock.acquire()
try:
return self.it.next()
finally:
self.lock.release()
gen = [x*2 for x in [1,2,3,4]]
g2 = LockedIterator(gen)
print list(g2)
Блокировка в моей системе занимает 50 мс, очередь - 350 мс. Очередь полезна, когда у вас действительно есть очередь; например, если у вас есть входящие HTTP-запросы и вы хотите поставить их в очередь для обработки рабочими потоками. (Это не вписывается в модель итератора Python - как только в итераторе заканчиваются элементы, все готово.) Если у вас действительно есть итератор, то LockedIterator - более быстрый и простой способ сделать его потокобезопасным.
from datetime import datetime
import threading
num_worker_threads = 4
class LockedIterator(object):
def __init__(self, it):
self.lock = threading.Lock()
self.it = it.__iter__()
def __iter__(self): return self
def next(self):
self.lock.acquire()
try:
return self.it.next()
finally:
self.lock.release()
def test_locked(it):
it = LockedIterator(it)
def worker():
try:
for i in it:
pass
except Exception, e:
print e
raise
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
threads.append(t)
t.start()
for t in threads:
t.join()
def test_queue(it):
from Queue import Queue
def worker():
try:
while True:
item = q.get()
q.task_done()
except Exception, e:
print e
raise
q = Queue()
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.setDaemon(True)
t.start()
t1 = datetime.now()
for item in it:
q.put(item)
q.join()
start_time = datetime.now()
it = [x*2 for x in range(1,10000)]
test_locked(it)
#test_queue(it)
end_time = datetime.now()
took = end_time-start_time
print "took %.01f" % ((took.seconds + took.microseconds/1000000.0)*1000)
Это зависит от того, какую реализацию Python вы используете. В CPython GIL делает все операции с объектами python потокобезопасными, поскольку только один поток может выполнять код в любой момент времени.
No, they are not thread-safe. You can find interesting info about generators and multi-threading in: