Справа от этот самый абзац связан с:
Устанавливать объекты должны быть реализованы с использованием [механизмов], которые в среднем предоставляют время доступа, которое являются сублинейными по количеству элементов в коллекции.
blockquote>Вы найдете то же предложение для Maps , WeakMaps и WeakSets .
Похоже, спецификация ECMA указывает, что реализации (например, Set.prototype.has) должны использовать алгоритм линейного времени (
blockquote>O(n)
).Нет:
Структуры данных, используемые в этой спецификации объектов
Set
, предназначены только для описания требуемой наблюдаемой семантики объектов Set. blockquote>Наблюдаемая семантика в основном связана с предсказуемым порядком итерации (который все еще может быть реализован эффективным и быстрым ). В спецификации предполагается, что реализация использует хеш-таблицу или что-то подобное с постоянным доступом, хотя деревья (с логарифмической степенью доступа) также разрешены.
Единственный способ иметь дело с этим ненавязчиво к:
select
от дескрипторов файлов каналов, выполните сортировку с объединением на доступных записях в журнале и сброс к централизованному журналу. Повториться.) Одна из альтернатив должна записать многопроцессорную обработку, регистрирующуюся в известный файл, и зарегистрироваться atexit
, обработчик для присоединения на тех процессах считал его назад на stderr; однако, Вы не получите поток в реальном времени к выходным сигналам на stderr тот путь.
просто опубликуйте где-нибудь свой экземпляр регистратора. тем путем другие модули и клиенты могут использовать Ваш API для получения регистратора, не имея необходимость к import multiprocessing
.
Еще одна альтернатива могла бы быть различным non-file-based регистрирующиеся обработчики в logging
пакет :
SocketHandler
DatagramHandler
SyslogHandler
(и другие)
Таким образом, у Вас мог легко быть регистрирующийся демон где-нибудь, что Вы могли записать в безопасно и обработаете результаты правильно. (Например, простой сервер сокета, который просто не солит сообщение и испускает его к его собственному обработчику файлов вращения.)
Эти SyslogHandler
заботился бы об этом для Вас, также. Конечно, Вы могли использовать свой собственный экземпляр syslog
, не система один.
Самая простая идея, как упомянуто:
[WatchedFileHandler][1]
. Причины этого обработчика обсуждены подробно здесь , но короче говоря существуют определенные худшие условия состязания с другими обработчиками входов. У этого есть самое короткое окно для состояния состязания. I только что написал собственный обработчик журнала, который просто передает все родительскому процессу через канал. Я тестировал его всего десять минут, но, похоже, он работает очень хорошо.
( Примечание: Это жестко запрограммировано на RotatingFileHandler
, что является моим собственным вариантом использования.)
Теперь он использует очередь для правильной обработки параллелизма, а также правильно восстанавливается после ошибок. Я уже несколько месяцев использую это в производстве, и текущая версия ниже работает без проблем.
from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback
class MultiProcessingLog(logging.Handler):
def __init__(self, name, mode, maxsize, rotate):
logging.Handler.__init__(self)
self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
self.queue = multiprocessing.Queue(-1)
t = threading.Thread(target=self.receive)
t.daemon = True
t.start()
def setFormatter(self, fmt):
logging.Handler.setFormatter(self, fmt)
self._handler.setFormatter(fmt)
def receive(self):
while True:
try:
record = self.queue.get()
self._handler.emit(record)
except (KeyboardInterrupt, SystemExit):
raise
except EOFError:
break
except:
traceback.print_exc(file=sys.stderr)
def send(self, s):
self.queue.put_nowait(s)
def _format_record(self, record):
# ensure that exc_info and args
# have been stringified. Removes any chance of
# unpickleable things inside and possibly reduces
# message size sent over the pipe
if record.args:
record.msg = record.msg % record.args
record.args = None
if record.exc_info:
dummy = self.format(record)
record.exc_info = None
return record
def emit(self, record):
try:
s = self._format_record(record)
self.send(s)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
def close(self):
self._handler.close()
logging.Handler.close(self)
Мне понравился ответ zzzeek. Я бы просто заменил очередь конвейером, поскольку, если несколько потоков / процессов используют один и тот же конец конвейера для генерации сообщений журнала, они будут искажены.
Мне также нравится ответ zzzeek, но Андре прав в том, что для предотвращения искажения требуется очередь. Мне немного повезло с трубкой, но я заметил искажения, чего можно было ожидать. Реализовать это оказалось сложнее, чем я думал, особенно из-за работы в Windows, где есть некоторые дополнительные ограничения на глобальные переменные и прочее (см .: Как многопроцессорная обработка Python реализована в Windows? )
Но , Наконец-то он заработал. Этот пример, вероятно, не идеален, так что комментарии и предложения приветствуются. Он также не поддерживает настройку форматера или чего-либо кроме корневого регистратора. По сути, вы должны повторно инициализировать регистратор в каждом из процессов пула с очередью и настроить другие атрибуты регистратора.
Опять же, любые предложения о том, как улучшить код, приветствуются. Я, конечно, еще не знаю всех уловок Python: -)
import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue
class MultiProcessingLogHandler(logging.Handler):
def __init__(self, handler, queue, child=False):
logging.Handler.__init__(self)
self._handler = handler
self.queue = queue
# we only want one of the loggers to be pulling from the queue.
# If there is a way to do this without needing to be passed this
# information, that would be great!
if child == False:
self.shutdown = False
self.polltime = 1
t = threading.Thread(target=self.receive)
t.daemon = True
t.start()
def setFormatter(self, fmt):
logging.Handler.setFormatter(self, fmt)
self._handler.setFormatter(fmt)
def receive(self):
#print "receive on"
while (self.shutdown == False) or (self.queue.empty() == False):
# so we block for a short period of time so that we can
# check for the shutdown cases.
try:
record = self.queue.get(True, self.polltime)
self._handler.emit(record)
except Queue.Empty, e:
pass
def send(self, s):
# send just puts it in the queue for the server to retrieve
self.queue.put(s)
def _format_record(self, record):
ei = record.exc_info
if ei:
dummy = self.format(record) # just to get traceback text into record.exc_text
record.exc_info = None # to avoid Unpickleable error
return record
def emit(self, record):
try:
s = self._format_record(record)
self.send(s)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
def close(self):
time.sleep(self.polltime+1) # give some time for messages to enter the queue.
self.shutdown = True
time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown
def __del__(self):
self.close() # hopefully this aids in orderly shutdown when things are going poorly.
def f(x):
# just a logging command...
logging.critical('function number: ' + str(x))
# to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
time.sleep(x % 3)
def initPool(queue, level):
"""
This causes the logging module to be initialized with the necessary info
in pool threads to work correctly.
"""
logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
logging.getLogger('').setLevel(level)
if __name__ == '__main__':
stream = StringIO.StringIO()
logQueue = multiprocessing.Queue(100)
handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
logging.getLogger('').addHandler(handler)
logging.getLogger('').setLevel(logging.DEBUG)
logging.debug('starting main')
# when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes
pool.map(f, range(0,50))
pool.close()
logging.debug('done')
logging.shutdown()
print "stream output is:"
print stream.getvalue()
Вариант остальных, в котором потоки протоколирования и очереди разделяются.
"""sample code for logging in subprocesses using multiprocessing
* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
process.
* As in the other implementations, a thread reads the queue and calls the
handlers. Except in this implementation, the thread is defined outside of a
handler, which makes the logger definitions simpler.
* Works with multiple handlers. If the logger in the main process defines
multiple handlers, they will all be fed records generated by the
subprocesses loggers.
tested with Python 2.5 and 2.6 on Linux and Windows
"""
import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys
DEFAULT_LEVEL = logging.DEBUG
formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")
class SubProcessLogHandler(logging.Handler):
"""handler used by subprocesses
It simply puts items on a Queue for the main process to log.
"""
def __init__(self, queue):
logging.Handler.__init__(self)
self.queue = queue
def emit(self, record):
self.queue.put(record)
class LogQueueReader(threading.Thread):
"""thread to write subprocesses log records to main process log
This thread reads the records written by subprocesses and writes them to
the handlers defined in the main process's handlers.
"""
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
self.daemon = True
def run(self):
"""read from the queue and write to the log handlers
The logging documentation says logging is thread safe, so there
shouldn't be contention between normal logging (from the main
process) and this thread.
Note that we're using the name of the original logger.
"""
# Thanks Mike for the error checking code.
while True:
try:
record = self.queue.get()
# get the logger for this record
logger = logging.getLogger(record.name)
logger.callHandlers(record)
except (KeyboardInterrupt, SystemExit):
raise
except EOFError:
break
except:
traceback.print_exc(file=sys.stderr)
class LoggingProcess(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def _setupLogger(self):
# create the logger to use.
logger = logging.getLogger('test.subprocess')
# The only handler desired is the SubProcessLogHandler. If any others
# exist, remove them. In this case, on Unix and Linux the StreamHandler
# will be inherited.
for handler in logger.handlers:
# just a check for my sanity
assert not isinstance(handler, SubProcessLogHandler)
logger.removeHandler(handler)
# add the handler
handler = SubProcessLogHandler(self.queue)
handler.setFormatter(formatter)
logger.addHandler(handler)
# On Windows, the level will not be inherited. Also, we could just
# set the level to log everything here and filter it in the main
# process handlers. For now, just set it from the global default.
logger.setLevel(DEFAULT_LEVEL)
self.logger = logger
def run(self):
self._setupLogger()
logger = self.logger
# and here goes the logging
p = multiprocessing.current_process()
logger.info('hello from process %s with pid %s' % (p.name, p.pid))
if __name__ == '__main__':
# queue used by the subprocess loggers
queue = multiprocessing.Queue()
# Just a normal logger
logger = logging.getLogger('test')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(DEFAULT_LEVEL)
logger.info('hello from the main process')
# This thread will read from the subprocesses and write to the main log's
# handlers.
log_queue_reader = LogQueueReader(queue)
log_queue_reader.start()
# create the processes.
for i in range(10):
p = LoggingProcess(queue)
p.start()
# The way I read the multiprocessing warning about Queue, joining a
# process before it has finished feeding the Queue can cause a deadlock.
# Also, Queue.empty() is not realiable, so just make sure all processes
# are finished.
# active_children joins subprocesses when they're finished.
while multiprocessing.active_children():
time.sleep(.1)
У меня есть решение, похожее на решение ironhacker, за исключением того, что я использую logging.exception в некоторых частях своего кода и обнаружил, что мне нужно отформатировать исключение, прежде чем передавать его обратно в очередь, поскольку трассировки не могут быть обработаны:
class QueueHandler(logging.Handler):
def __init__(self, queue):
logging.Handler.__init__(self)
self.queue = queue
def emit(self, record):
if record.exc_info:
# can't pass exc_info across processes so just format now
record.exc_text = self.formatException(record.exc_info)
record.exc_info = None
self.queue.put(record)
def formatException(self, ei):
sio = cStringIO.StringIO()
traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
s = sio.getvalue()
sio.close()
if s[-1] == "\n":
s = s[:-1]
return s