Как я должен зарегистрироваться при использовании многопроцессорной обработки в Python?

Справа от этот самый абзац связан с:

Устанавливать объекты должны быть реализованы с использованием [механизмов], которые в среднем предоставляют время доступа, которое являются сублинейными по количеству элементов в коллекции.

blockquote>

Вы найдете то же предложение для Maps , WeakMaps и WeakSets .

Похоже, спецификация ECMA указывает, что реализации (например, Set.prototype.has) должны использовать алгоритм линейного времени (O(n)).

blockquote>

Нет:

Структуры данных, используемые в этой спецификации объектов Set, предназначены только для описания требуемой наблюдаемой семантики объектов Set. blockquote>

Наблюдаемая семантика в основном связана с предсказуемым порядком итерации (который все еще может быть реализован эффективным и быстрым ). В спецификации предполагается, что реализация использует хеш-таблицу или что-то подобное с постоянным доступом, хотя деревья (с логарифмической степенью доступа) также разрешены.

209
задан cdleary 13 March 2009 в 04:02
поделиться

10 ответов

Единственный способ иметь дело с этим ненавязчиво к:

  1. Икра каждый рабочий процесс, таким образом, что его журнал переходит в другой дескриптор файла (к диску или передавать по каналу.) Идеально, ко всем записям в журнале нужно добавить метку времени.
  2. Ваш процесс контроллера может тогда сделать один из следующего:
    • При использовании дисковых файлов: Объединяют файлы журнала в конце выполнения, отсортированного по метке времени
    • При использовании (рекомендуемых) каналов: Объединяют записи в журнале на лету от всех каналов, в центральный файл журнала. (Например, Периодически select от дескрипторов файлов каналов, выполните сортировку с объединением на доступных записях в журнале и сброс к централизованному журналу. Повториться.)
62
ответ дан Zearin 4 November 2019 в 14:13
поделиться

Одна из альтернатив должна записать многопроцессорную обработку, регистрирующуюся в известный файл, и зарегистрироваться atexit, обработчик для присоединения на тех процессах считал его назад на stderr; однако, Вы не получите поток в реальном времени к выходным сигналам на stderr тот путь.

0
ответ дан cdleary 4 November 2019 в 14:13
поделиться

просто опубликуйте где-нибудь свой экземпляр регистратора. тем путем другие модули и клиенты могут использовать Ваш API для получения регистратора, не имея необходимость к import multiprocessing.

3
ответ дан Javier 4 November 2019 в 14:13
поделиться

Еще одна альтернатива могла бы быть различным non-file-based регистрирующиеся обработчики в logging пакет :

  • SocketHandler
  • DatagramHandler
  • SyslogHandler

(и другие)

Таким образом, у Вас мог легко быть регистрирующийся демон где-нибудь, что Вы могли записать в безопасно и обработаете результаты правильно. (Например, простой сервер сокета, который просто не солит сообщение и испускает его к его собственному обработчику файлов вращения.)

Эти SyslogHandler заботился бы об этом для Вас, также. Конечно, Вы могли использовать свой собственный экземпляр syslog, не система один.

20
ответ дан Zearin 4 November 2019 в 14:13
поделиться

Самая простая идея, как упомянуто:

  • Захват имя файла и идентификатор процесса текущего процесса.
  • Настроенный [WatchedFileHandler][1]. Причины этого обработчика обсуждены подробно здесь , но короче говоря существуют определенные худшие условия состязания с другими обработчиками входов. У этого есть самое короткое окно для состояния состязания.
    • Выбирают путь для сохранения журналов к такому как/var/log/...
0
ответ дан 23 November 2019 в 04:29
поделиться

I только что написал собственный обработчик журнала, который просто передает все родительскому процессу через канал. Я тестировал его всего десять минут, но, похоже, он работает очень хорошо.

( Примечание: Это жестко запрограммировано на RotatingFileHandler , что является моим собственным вариантом использования.)


Обновление: @javier теперь поддерживает этот подход как пакет, доступный на Pypi - см. многопроцессорное ведение журнала на Pypi, github на https: // github. com / jruere / multiprocessing-logging


Обновление: реализация!

Теперь он использует очередь для правильной обработки параллелизма, а также правильно восстанавливается после ошибок. Я уже несколько месяцев использую это в производстве, и текущая версия ниже работает без проблем.

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback

class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)
118
ответ дан 23 November 2019 в 04:29
поделиться

Мне понравился ответ zzzeek. Я бы просто заменил очередь конвейером, поскольку, если несколько потоков / процессов используют один и тот же конец конвейера для генерации сообщений журнала, они будут искажены.

3
ответ дан 23 November 2019 в 04:29
поделиться

Мне также нравится ответ zzzeek, ​​но Андре прав в том, что для предотвращения искажения требуется очередь. Мне немного повезло с трубкой, но я заметил искажения, чего можно было ожидать. Реализовать это оказалось сложнее, чем я думал, особенно из-за работы в Windows, где есть некоторые дополнительные ограничения на глобальные переменные и прочее (см .: Как многопроцессорная обработка Python реализована в Windows? )

Но , Наконец-то он заработал. Этот пример, вероятно, не идеален, так что комментарии и предложения приветствуются. Он также не поддерживает настройку форматера или чего-либо кроме корневого регистратора. По сути, вы должны повторно инициализировать регистратор в каждом из процессов пула с очередью и настроить другие атрибуты регистратора.

Опять же, любые предложения о том, как улучшить код, приветствуются. Я, конечно, еще не знаю всех уловок Python: -)

import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue

class MultiProcessingLogHandler(logging.Handler):
    def __init__(self, handler, queue, child=False):
        logging.Handler.__init__(self)

        self._handler = handler
        self.queue = queue

        # we only want one of the loggers to be pulling from the queue.
        # If there is a way to do this without needing to be passed this
        # information, that would be great!
        if child == False:
            self.shutdown = False
            self.polltime = 1
            t = threading.Thread(target=self.receive)
            t.daemon = True
            t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        #print "receive on"
        while (self.shutdown == False) or (self.queue.empty() == False):
            # so we block for a short period of time so that we can
            # check for the shutdown cases.
            try:
                record = self.queue.get(True, self.polltime)
                self._handler.emit(record)
            except Queue.Empty, e:
                pass

    def send(self, s):
        # send just puts it in the queue for the server to retrieve
        self.queue.put(s)

    def _format_record(self, record):
        ei = record.exc_info
        if ei:
            dummy = self.format(record) # just to get traceback text into record.exc_text
            record.exc_info = None  # to avoid Unpickleable error

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        time.sleep(self.polltime+1) # give some time for messages to enter the queue.
        self.shutdown = True
        time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown

    def __del__(self):
        self.close() # hopefully this aids in orderly shutdown when things are going poorly.

def f(x):
    # just a logging command...
    logging.critical('function number: ' + str(x))
    # to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
    time.sleep(x % 3)

def initPool(queue, level):
    """
    This causes the logging module to be initialized with the necessary info
    in pool threads to work correctly.
    """
    logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
    logging.getLogger('').setLevel(level)

if __name__ == '__main__':
    stream = StringIO.StringIO()
    logQueue = multiprocessing.Queue(100)
    handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
    logging.getLogger('').addHandler(handler)
    logging.getLogger('').setLevel(logging.DEBUG)

    logging.debug('starting main')

    # when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
    pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes
    pool.map(f, range(0,50))
    pool.close()

    logging.debug('done')
    logging.shutdown()
    print "stream output is:"
    print stream.getvalue()
6
ответ дан 23 November 2019 в 04:29
поделиться

Вариант остальных, в котором потоки протоколирования и очереди разделяются.

"""sample code for logging in subprocesses using multiprocessing

* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
  process.
* As in the other implementations, a thread reads the queue and calls the
  handlers. Except in this implementation, the thread is defined outside of a
  handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
  multiple handlers, they will all be fed records generated by the
  subprocesses loggers.

tested with Python 2.5 and 2.6 on Linux and Windows

"""

import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys

DEFAULT_LEVEL = logging.DEBUG

formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")

class SubProcessLogHandler(logging.Handler):
    """handler used by subprocesses

    It simply puts items on a Queue for the main process to log.

    """

    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue

    def emit(self, record):
        self.queue.put(record)

class LogQueueReader(threading.Thread):
    """thread to write subprocesses log records to main process log

    This thread reads the records written by subprocesses and writes them to
    the handlers defined in the main process's handlers.

    """

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.daemon = True

    def run(self):
        """read from the queue and write to the log handlers

        The logging documentation says logging is thread safe, so there
        shouldn't be contention between normal logging (from the main
        process) and this thread.

        Note that we're using the name of the original logger.

        """
        # Thanks Mike for the error checking code.
        while True:
            try:
                record = self.queue.get()
                # get the logger for this record
                logger = logging.getLogger(record.name)
                logger.callHandlers(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

class LoggingProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def _setupLogger(self):
        # create the logger to use.
        logger = logging.getLogger('test.subprocess')
        # The only handler desired is the SubProcessLogHandler.  If any others
        # exist, remove them. In this case, on Unix and Linux the StreamHandler
        # will be inherited.

        for handler in logger.handlers:
            # just a check for my sanity
            assert not isinstance(handler, SubProcessLogHandler)
            logger.removeHandler(handler)
        # add the handler
        handler = SubProcessLogHandler(self.queue)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # On Windows, the level will not be inherited.  Also, we could just
        # set the level to log everything here and filter it in the main
        # process handlers.  For now, just set it from the global default.
        logger.setLevel(DEFAULT_LEVEL)
        self.logger = logger

    def run(self):
        self._setupLogger()
        logger = self.logger
        # and here goes the logging
        p = multiprocessing.current_process()
        logger.info('hello from process %s with pid %s' % (p.name, p.pid))


if __name__ == '__main__':
    # queue used by the subprocess loggers
    queue = multiprocessing.Queue()
    # Just a normal logger
    logger = logging.getLogger('test')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(DEFAULT_LEVEL)
    logger.info('hello from the main process')
    # This thread will read from the subprocesses and write to the main log's
    # handlers.
    log_queue_reader = LogQueueReader(queue)
    log_queue_reader.start()
    # create the processes.
    for i in range(10):
        p = LoggingProcess(queue)
        p.start()
    # The way I read the multiprocessing warning about Queue, joining a
    # process before it has finished feeding the Queue can cause a deadlock.
    # Also, Queue.empty() is not realiable, so just make sure all processes
    # are finished.
    # active_children joins subprocesses when they're finished.
    while multiprocessing.active_children():
        time.sleep(.1)
13
ответ дан 23 November 2019 в 04:29
поделиться

У меня есть решение, похожее на решение ironhacker, за исключением того, что я использую logging.exception в некоторых частях своего кода и обнаружил, что мне нужно отформатировать исключение, прежде чем передавать его обратно в очередь, поскольку трассировки не могут быть обработаны:

class QueueHandler(logging.Handler):
    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue
    def emit(self, record):
        if record.exc_info:
            # can't pass exc_info across processes so just format now
            record.exc_text = self.formatException(record.exc_info)
            record.exc_info = None
        self.queue.put(record)
    def formatException(self, ei):
        sio = cStringIO.StringIO()
        traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
        s = sio.getvalue()
        sio.close()
        if s[-1] == "\n":
            s = s[:-1]
        return s
1
ответ дан 23 November 2019 в 04:29
поделиться
Другие вопросы по тегам:

Похожие вопросы: