Как я могу обработать xml асинхронно в Python?

У меня есть большой файл данных XML (> 160M) для обработки, и кажется, что парсинг SAX/expat/pulldom является способом пойти. Я хотел бы иметь поток, который отсеивает через узлы и продвигает узлы быть обработанными на очередь, и затем другие рабочие потоки вытягивают следующий доступный узел от очереди и обрабатывают его.

У меня есть следующее (это должно иметь блокировки, я знаю - это будет, позже),

import sys, time
import xml.parsers.expat
import threading

q = []

def start_handler(name, attrs):
    q.append(name)

def do_expat():
    p = xml.parsers.expat.ParserCreate()
    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")


t = threading.Thread(group=None, target=do_expat)
t.start()

while True:
    print(q)
    time.sleep(1)

Проблема состоит в том что тело while блок называют только однажды, и затем я не могу даже ctrl-C прерывать его. На меньших файлах вывод как ожидалось, но это, кажется, указывает, что обработчик только называют, когда документ полностью анализируется, который, кажется, побеждает цель синтаксического анализатора SAX.

Я уверен, что это - мое собственное незнание, но я не вижу, где я делаю ошибку.

PS: Я также пытался измениться start_handler таким образом:

def start_handler(name, attrs):
    def app():
        q.append(name)
    u = threading.Thread(group=None, target=app)
    u.start()

Никакая любовь, все же.

6
задан decitrig 18 January 2010 в 23:59
поделиться

4 ответа

Джефф Этвуд опубликовал эту тему в «Великий раскол в новой линии»

Википедия охватывает историю новой линии .

Вкратце, CR + LF - это ссылка на дни использования электрической пишущей машинки, когда для перемещения курсора влево сначала используется возврат каретки, а для перемещения вниз по линии - линейная подача. Иногда вы просто используете один или другой для написания специальных символов, таких как подчеркивание или зачеркивание.

CRLF

Компьютеры использовали эту номенклатуру, поскольку она была полезна для взаимодействия с машинами teletype . Поэтому MS-DOS и впоследствии Windows используют CRLF.

LF

Очевидно, что Multics и впоследствии Unix взяли свой ключ из 1963-64 проекта проекта стандартов ISO, который указал, что CRLF или LF могут представлять новую линию, и пошли с одним символом LF.

CR

Я все еще немного озадачен историей принятия CR яблоками. Я смотрю на это... До сих пор, я думаю, что это связано яблоком ранним различением между ввод и возврат клавиши . В первые дни ключ возврата соответствует возврату каретки, а ключ ввода - подаче строки, а не одному ключу «ввода», который выполняет двойную функцию, как в мире Windows. Для пользователей пишущих машинок, которые обычно используют возврат каретки для специального форматирования, это, возможно, имело смысл разделить их, и яблоко фактически использовало ключ возврата для специальных опций форматирования.

Где я не так уверен, как, когда они отказались от этого, они остановились только на использовании CR, но я предполагаю, что это даты этого раннего решения использовать CR для ключа возврата... но я могу ошибаться.

-121--1906228-

ParseFile , как вы заметили, просто «пробивает» все - нет пользы для инкрементного анализа, который вы хотите сделать! Так что, просто подайте файл в синтаксический анализатор немного за раз, убедившись, что условно уступите управление другим потокам, как вы идете - например,

while True:
  data = f.read(BUFSIZE)
  if not data:
    p.Parse('', True)
    break
  p.Parse(data, False)
  time.sleep(0.0)

вызов time.sleep (0.0) это способ Python сказать «уступить другим потокам, если они готовы и ждут»; метод Синтаксический анализ документирован здесь .

Второй пункт - это, забудьте о блокировках для этого использования! -- используйте вместо этого Queue.Queue , это внутренне многопоточный и почти неизменно лучший и простой способ координации нескольких потоков в Python. Просто создайте экземпляр Queue q , q.put (имя) в нем и включите блок рабочих потоков в q.get () , ожидая получения дополнительной работы - это ТАК просто!

(Существует несколько вспомогательных стратегий, которые можно использовать для координации завершения рабочих потоков, когда для них больше нет работы, но простейшие, отсутствующие особые требования заключаются в том, чтобы просто сделать их демонными потоками, так что все они завершатся, когда это сделает основной поток - см. документы ).

7
ответ дан 8 December 2019 в 16:03
поделиться

Я не слишком уверен в этой проблеме. Я предполагаю, что звонок для Parsefile - блокировка, и только поток разборки запускается из-за GIL. Способ этого будет использовать многопроцессорное вместо этого. В любом случае, он предназначен для работы с очередями.

Вы делаете процесс , и вы можете пройти через его очередь :

import sys, time
import xml.parsers.expat
import multiprocessing
import Queue

def do_expat(q):
    p = xml.parsers.expat.ParserCreate()

    def start_handler(name, attrs):
        q.put(name)

    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")

if __name__ == '__main__':
    q = multiprocessing.Queue()
    process = multiprocessing.Process(target=do_expat, args=(q,))
    process.start()

    elements = []
    while True:
        while True:
            try:
                elements.append(q.get_nowait())
            except Queue.Empty:
                break

        print elements
        time.sleep(1)

Я включил список элементов, просто для репликации вашего исходного скрипта. Ваше окончательное решение, вероятно, будет использовать get_nowait и пул или что-то подобное.

7
ответ дан 8 December 2019 в 16:03
поделиться

Единственное, что я вижу, неправильно, это то, что вы получаете доступ Q одновременно из разных потоков - без блокировки, как вы пишете. Это просит неприятностей - и вы, вероятно, получаете проблемы в виде переводчика Python, блокируя на вас. :)

Попробуйте заблокировать, это действительно не очень сложно:

import sys, time
import xml.parsers.expat
import threading

q = []
q_lock = threading.Lock() <---

def start_handler(name, attrs):
    q_lock.acquire() <---
    q.append(name)
    q_lock.release() <---

def do_expat():
    p = xml.parsers.expat.ParserCreate()
    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")


t = threading.Thread(group=None, target=do_expat)
t.start()

while True:
    q_lock.acquire() <---
    print(q)
    q_lock.release() <---
    time.sleep(1)

Видите ли, это было действительно просто, мы только что создали переменную блокировки, чтобы охранять наш объект, и приобрести этот замок каждый раз, прежде чем использовать объект и выпустить каждый раз Время после того, как мы закончили нашу задачу на объекте. Таким образом, мы гарантировали, что Q.append (имя) никогда не перекрываются с помощью Print (Q) .


(с более новыми версиями Python также есть также «с ...» синтаксисом, который поможет вам не отпустить блокировки или закрывать файлы или другие очистки, которые часто забывают.)

1
ответ дан 8 December 2019 в 16:03
поделиться

Я не знаю много о реализации, но если анализ C код, который выполняется до завершения, другие потоки Python не будут работать. Если парсер вызывает обратно в код Python, GIL может быть освобожден для других потоков, но я не уверен в этом. Вы можете проверить эти детали.

0
ответ дан 8 December 2019 в 16:03
поделиться
Другие вопросы по тегам:

Похожие вопросы: