Есть ли какие-либо образцовые примеры Наблюдателя GoF, реализованного в Python? У меня есть немного кода, который в настоящее время имеет биты отладки кода, приданного остроту через ключевой класс (в настоящее время генерирующий сообщения к stderr, если волшебный ENV установлен). Кроме того, класс имеет интерфейс для инкрементно результатов возврата, а также хранения их (в памяти) для обработки сообщения. (Сам класс является диспетчером заданий для того, чтобы одновременно выполнить команды на удаленных машинах по ssh).
В настоящее время использование класса смотрит что-то как:
job = SSHJobMan(hostlist, cmd)
job.start()
while not job.done():
for each in job.poll():
incrementally_process(job.results[each])
time.sleep(0.2) # or other more useful work
post_process(job.results)
alernative модель использования:
job = SSHJobMan(hostlist, cmd)
job.wait() # implicitly performs a start()
process(job.results)
Это все хорошо работает для текущей утилиты. Однако это действительно испытывает недостаток в гибкости. Например, я в настоящее время поддерживаю краткий выходной формат или индикатор выполнения как возрастающие результаты, я также поддерживаю резюме, полное и "объединенное сообщение" выводы для post_process()
функция.
Однако я хотел бы поддерживать несколько результатов/потоков вывода (индикатор выполнения к терминалу, отладке и предупреждениям файлу журнала, выводам от успешных заданий до одного файла/каталога, сообщений об ошибках и других результатов неуспешных заданий другому, и т.д.).
Это походит на ситуацию, которая зовет Наблюдателя... имеют экземпляры моего класса, принимают регистрацию от других объектов и отзывают их с определенными типами событий, как они происходят.
Я смотрю на PyPubSub, так как я видел несколько ссылок на это в ТАК связанных вопросах. Я не уверен, что я готов добавить внешнюю зависимость к своей утилите, но я видел значение в использовании их интерфейса как модель для моего, если это собирается помочь другим использовать. (Проект предназначается и как автономная утилита командной строки и как класс для записи других сценариев/утилит).
Короче говоря я знаю, как сделать то, что я хочу..., но существуют многочисленные способы выполнить его. Я хочу предложения на том, что, скорее всего, будет работать на других пользователей кода в конечном счете.
Сам код в: столкновение.
However it does lack flexibility.
Well... actually, this looks like a good design to me if an asynchronous API is what you want. It usually is. Maybe all you need is to switch from stderr to Python's logging
module, which has a sort of publish/subscribe model of its own, what with Logger.addHandler()
and so on.
If you do want to support observers, my advice is to keep it simple. You really only need a few lines of code.
class Event(object):
pass
class Observable(object):
def __init__(self):
self.callbacks = []
def subscribe(self, callback):
self.callbacks.append(callback)
def fire(self, **attrs):
e = Event()
e.source = self
for k, v in attrs.iteritems():
setattr(e, k, v)
for fn in self.callbacks:
fn(e)
Your Job class can subclass Observable
. When something of interest happens, call self.fire(type="progress", percent=50)
or the like.
Из википедии :
from collections import defaultdict
class Observable (defaultdict):
def __init__ (self):
defaultdict.__init__(self, object)
def emit (self, *args):
'''Pass parameters to all observers and update states.'''
for subscriber in self:
response = subscriber(*args)
self[subscriber] = response
def subscribe (self, subscriber):
'''Add a new subscriber to self.'''
self[subscriber]
def stat (self):
'''Return a tuple containing the state of each observer.'''
return tuple(self.values())
Observable используется следующим образом.
myObservable = Observable ()
# subscribe some inlined functions.
# myObservable[lambda x, y: x * y] would also work here.
myObservable.subscribe(lambda x, y: x * y)
myObservable.subscribe(lambda x, y: float(x) / y)
myObservable.subscribe(lambda x, y: x + y)
myObservable.subscribe(lambda x, y: x - y)
# emit parameters to each observer
myObservable.emit(6, 2)
# get updated values
myObservable.stat() # returns: (8, 3.0, 4, 12)
To register an observer yourCallable()
(a callable that accepts a dictionary) to receive all log events (in addition to any other observers):
twisted.python.log.addObserver(yourCallable)
From Twisted-Python mailing list:
#!/usr/bin/env python
"""Serve as a sample implementation of a twisted producer/consumer
system, with a simple TCP server which asks the user how many random
integers they want, and it sends the result set back to the user, one
result per line."""
import random
from zope.interface import implements
from twisted.internet import interfaces, reactor
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver
class Producer:
"""Send back the requested number of random integers to the client."""
implements(interfaces.IPushProducer)
def __init__(self, proto, cnt):
self._proto = proto
self._goal = cnt
self._produced = 0
self._paused = False
def pauseProducing(self):
"""When we've produced data too fast, pauseProducing() will be
called (reentrantly from within resumeProducing's transport.write
method, most likely), so set a flag that causes production to pause
temporarily."""
self._paused = True
print('pausing connection from %s' % (self._proto.transport.getPeer()))
def resumeProducing(self):
self._paused = False
while not self._paused and self._produced < self._goal:
next_int = random.randint(0, 10000)
self._proto.transport.write('%d\r\n' % (next_int))
self._produced += 1
if self._produced == self._goal:
self._proto.transport.unregisterProducer()
self._proto.transport.loseConnection()
def stopProducing(self):
pass
class ServeRandom(LineReceiver):
"""Serve up random data."""
def connectionMade(self):
print('connection made from %s' % (self.transport.getPeer()))
self.transport.write('how many random integers do you want?\r\n')
def lineReceived(self, line):
cnt = int(line.strip())
producer = Producer(self, cnt)
self.transport.registerProducer(producer, True)
producer.resumeProducing()
def connectionLost(self, reason):
print('connection lost from %s' % (self.transport.getPeer()))
factory = Factory()
factory.protocol = ServeRandom
reactor.listenTCP(1234, factory)
print('listening on 1234...')
reactor.run()
Еще несколько подходов ...
Возможно, все, что вам нужно, это переключиться со stderr на модуль logging
Python, в котором есть мощная модель публикации / подписки.
Начать создание записей журнала несложно.
# producer
import logging
log = logging.getLogger("myjobs") # that's all the setup you need
class MyJob(object):
def run(self):
log.info("starting job")
n = 10
for i in range(n):
log.info("%.1f%% done" % (100.0 * i / n))
log.info("work complete")
Со стороны потребителя есть немного больше работы. К сожалению, для настройки вывода логгера требуется около 7 целых строк кода. ;)
# consumer
import myjobs, sys, logging
if user_wants_log_output:
ch = logging.StreamHandler(sys.stderr)
ch.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch.setFormatter(formatter)
myjobs.log.addHandler(ch)
myjobs.log.setLevel(logging.INFO)
myjobs.MyJob().run()
С другой стороны, в пакете регистрации есть удивительное количество вещей. Если вам когда-нибудь понадобится отправить данные журнала в изменяющийся набор файлов, на адрес электронной почты и журнал событий Windows, все готово.
Но вам не нужно ничего использовать библиотека вообще. Чрезвычайно простой способ поддержать наблюдателей - вызвать метод, который ничего не делает.
# producer
class MyJob(object):
def on_progress(self, pct):
"""Called when progress is made. pct is the percent complete.
By default this does nothing. The user may override this method
or even just assign to it."""
pass
def run(self):
n = 10
for i in range(n):
self.on_progress(100.0 * i / n)
self.on_progress(100.0)
# consumer
import sys, myjobs
job = myjobs.MyJob()
job.on_progress = lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()
Иногда вместо написания лямбда-выражения вы можете просто сказать job.on_progress = progressBar.update
, что приятно.
Это очень просто. Одним из недостатков является то, что он, естественно, не поддерживает несколько слушателей, подписывающихся на одни и те же события.
С небольшим количеством кода поддержки вы можете получать события, подобные C #, в Python. Вот код:
# glue code
class event(object):
def __init__(self, func):
self.__doc__ = func.__doc__
self._key = ' ' + func.__name__
def __get__(self, obj, cls):
try:
return obj.__dict__[self._key]
except KeyError, exc:
be = obj.__dict__[self._key] = boundevent()
return be
class boundevent(object):
def __init__(self):
self._fns = []
def __iadd__(self, fn):
self._fns.append(fn)
return self
def __isub__(self, fn):
self._fns.remove(fn)
return self
def __call__(self, *args, **kwargs):
for f in self._fns[:]:
f(*args, **kwargs)
Производитель объявляет событие с помощью декоратора:
# producer
class MyJob(object):
@event
def progress(pct):
"""Called when progress is made. pct is the percent complete."""
def run(self):
n = 10
for i in range(n+1):
self.progress(100.0 * i / n)
#consumer
import sys, myjobs
job = myjobs.MyJob()
job.progress += lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()
Это работает точно так же, как приведенный выше код «простого наблюдателя», но вы можете добавить столько слушателей, сколько захотите, используя + =
. (В отличие от C #, здесь нет типов обработчиков событий, вам не нужно new EventHandler (foo.bar)
при подписке на событие, и вам не нужно проверять значение null перед запуском события Как и C #, события не подавляют исключения. )
Если запись в журнал
делает все, что вам нужно, используйте это. В противном случае сделайте самое простое, что работает для вас. Важно отметить, что вам не нужно брать на себя большую внешнюю зависимость.