Шаблон "наблюдатель" Python: примеры, подсказки? [закрытый]

Есть ли какие-либо образцовые примеры Наблюдателя GoF, реализованного в Python? У меня есть немного кода, который в настоящее время имеет биты отладки кода, приданного остроту через ключевой класс (в настоящее время генерирующий сообщения к stderr, если волшебный ENV установлен). Кроме того, класс имеет интерфейс для инкрементно результатов возврата, а также хранения их (в памяти) для обработки сообщения. (Сам класс является диспетчером заданий для того, чтобы одновременно выполнить команды на удаленных машинах по ssh).

В настоящее время использование класса смотрит что-то как:

job = SSHJobMan(hostlist, cmd)
job.start()
while not job.done():
    for each in job.poll():
        incrementally_process(job.results[each])
        time.sleep(0.2) # or other more useful work
post_process(job.results)

alernative модель использования:

job = SSHJobMan(hostlist, cmd)
job.wait()  # implicitly performs a start()
process(job.results)

Это все хорошо работает для текущей утилиты. Однако это действительно испытывает недостаток в гибкости. Например, я в настоящее время поддерживаю краткий выходной формат или индикатор выполнения как возрастающие результаты, я также поддерживаю резюме, полное и "объединенное сообщение" выводы для post_process() функция.

Однако я хотел бы поддерживать несколько результатов/потоков вывода (индикатор выполнения к терминалу, отладке и предупреждениям файлу журнала, выводам от успешных заданий до одного файла/каталога, сообщений об ошибках и других результатов неуспешных заданий другому, и т.д.).

Это походит на ситуацию, которая зовет Наблюдателя... имеют экземпляры моего класса, принимают регистрацию от других объектов и отзывают их с определенными типами событий, как они происходят.

Я смотрю на PyPubSub, так как я видел несколько ссылок на это в ТАК связанных вопросах. Я не уверен, что я готов добавить внешнюю зависимость к своей утилите, но я видел значение в использовании их интерфейса как модель для моего, если это собирается помочь другим использовать. (Проект предназначается и как автономная утилита командной строки и как класс для записи других сценариев/утилит).

Короче говоря я знаю, как сделать то, что я хочу..., но существуют многочисленные способы выполнить его. Я хочу предложения на том, что, скорее всего, будет работать на других пользователей кода в конечном счете.

Сам код в: столкновение.

43
задан riven 17 January 2012 в 02:52
поделиться

4 ответа

However it does lack flexibility.

Well... actually, this looks like a good design to me if an asynchronous API is what you want. It usually is. Maybe all you need is to switch from stderr to Python's logging module, which has a sort of publish/subscribe model of its own, what with Logger.addHandler() and so on.

If you do want to support observers, my advice is to keep it simple. You really only need a few lines of code.

class Event(object):
    pass

class Observable(object):
    def __init__(self):
        self.callbacks = []
    def subscribe(self, callback):
        self.callbacks.append(callback)
    def fire(self, **attrs):
        e = Event()
        e.source = self
        for k, v in attrs.iteritems():
            setattr(e, k, v)
        for fn in self.callbacks:
            fn(e)

Your Job class can subclass Observable. When something of interest happens, call self.fire(type="progress", percent=50) or the like.

49
ответ дан 26 November 2019 в 22:42
поделиться

Из википедии :

from collections import defaultdict

class Observable (defaultdict):

  def __init__ (self):
      defaultdict.__init__(self, object)

  def emit (self, *args):
      '''Pass parameters to all observers and update states.'''
      for subscriber in self:
          response = subscriber(*args)
          self[subscriber] = response

  def subscribe (self, subscriber):
      '''Add a new subscriber to self.'''
      self[subscriber]

  def stat (self):
      '''Return a tuple containing the state of each observer.'''
      return tuple(self.values())

Observable используется следующим образом.

myObservable = Observable ()

# subscribe some inlined functions.
# myObservable[lambda x, y: x * y] would also work here.
myObservable.subscribe(lambda x, y: x * y)
myObservable.subscribe(lambda x, y: float(x) / y)
myObservable.subscribe(lambda x, y: x + y)
myObservable.subscribe(lambda x, y: x - y)

# emit parameters to each observer
myObservable.emit(6, 2)

# get updated values
myObservable.stat()         # returns: (8, 3.0, 4, 12)
4
ответ дан 26 November 2019 в 22:42
поделиться

Example: twisted log observers

To register an observer yourCallable() (a callable that accepts a dictionary) to receive all log events (in addition to any other observers):

twisted.python.log.addObserver(yourCallable)

Example: complete producer/consumer example

From Twisted-Python mailing list:

#!/usr/bin/env python
"""Serve as a sample implementation of a twisted producer/consumer
system, with a simple TCP server which asks the user how many random
integers they want, and it sends the result set back to the user, one
result per line."""

import random

from zope.interface import implements
from twisted.internet import interfaces, reactor
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver

class Producer:
    """Send back the requested number of random integers to the client."""
    implements(interfaces.IPushProducer)
    def __init__(self, proto, cnt):
        self._proto = proto
        self._goal = cnt
        self._produced = 0
        self._paused = False
    def pauseProducing(self):
        """When we've produced data too fast, pauseProducing() will be
called (reentrantly from within resumeProducing's transport.write
method, most likely), so set a flag that causes production to pause
temporarily."""
        self._paused = True
        print('pausing connection from %s' % (self._proto.transport.getPeer()))
    def resumeProducing(self):
        self._paused = False
        while not self._paused and self._produced < self._goal:
            next_int = random.randint(0, 10000)
            self._proto.transport.write('%d\r\n' % (next_int))
            self._produced += 1
        if self._produced == self._goal:
            self._proto.transport.unregisterProducer()
            self._proto.transport.loseConnection()
    def stopProducing(self):
        pass

class ServeRandom(LineReceiver):
    """Serve up random data."""
    def connectionMade(self):
        print('connection made from %s' % (self.transport.getPeer()))
        self.transport.write('how many random integers do you want?\r\n')
    def lineReceived(self, line):
        cnt = int(line.strip())
        producer = Producer(self, cnt)
        self.transport.registerProducer(producer, True)
        producer.resumeProducing()
    def connectionLost(self, reason):
        print('connection lost from %s' % (self.transport.getPeer()))
factory = Factory()
factory.protocol = ServeRandom
reactor.listenTCP(1234, factory)
print('listening on 1234...')
reactor.run()
2
ответ дан 26 November 2019 в 22:42
поделиться

Еще несколько подходов ...

Пример: модуль регистрации

Возможно, все, что вам нужно, это переключиться со stderr на модуль logging Python, в котором есть мощная модель публикации / подписки.

Начать создание записей журнала несложно.

# producer
import logging

log = logging.getLogger("myjobs")  # that's all the setup you need

class MyJob(object):
    def run(self):
        log.info("starting job")
        n = 10
        for i in range(n):
            log.info("%.1f%% done" % (100.0 * i / n))
        log.info("work complete")

Со стороны потребителя есть немного больше работы. К сожалению, для настройки вывода логгера требуется около 7 целых строк кода. ;)

# consumer
import myjobs, sys, logging

if user_wants_log_output:
    ch = logging.StreamHandler(sys.stderr)
    ch.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    ch.setFormatter(formatter)
    myjobs.log.addHandler(ch)
    myjobs.log.setLevel(logging.INFO)

myjobs.MyJob().run()

С другой стороны, в пакете регистрации есть удивительное количество вещей. Если вам когда-нибудь понадобится отправить данные журнала в изменяющийся набор файлов, на адрес электронной почты и журнал событий Windows, все готово.

Пример: самый простой из возможных наблюдателей

Но вам не нужно ничего использовать библиотека вообще. Чрезвычайно простой способ поддержать наблюдателей - вызвать метод, который ничего не делает.

# producer
class MyJob(object):
    def on_progress(self, pct):
        """Called when progress is made. pct is the percent complete.
        By default this does nothing. The user may override this method
        or even just assign to it."""
        pass

    def run(self):
        n = 10
        for i in range(n):
            self.on_progress(100.0 * i / n)
        self.on_progress(100.0)

# consumer
import sys, myjobs
job = myjobs.MyJob()
job.on_progress = lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()

Иногда вместо написания лямбда-выражения вы можете просто сказать job.on_progress = progressBar.update , что приятно.

Это очень просто. Одним из недостатков является то, что он, естественно, не поддерживает несколько слушателей, подписывающихся на одни и те же события.

Пример: события, подобные C #

С небольшим количеством кода поддержки вы можете получать события, подобные C #, в Python. Вот код:

# glue code
class event(object):
    def __init__(self, func):
        self.__doc__ = func.__doc__
        self._key = ' ' + func.__name__
    def __get__(self, obj, cls):
        try:
            return obj.__dict__[self._key]
        except KeyError, exc:
            be = obj.__dict__[self._key] = boundevent()
            return be

class boundevent(object):
    def __init__(self):
        self._fns = []
    def __iadd__(self, fn):
        self._fns.append(fn)
        return self
    def __isub__(self, fn):
        self._fns.remove(fn)
        return self
    def __call__(self, *args, **kwargs):
        for f in self._fns[:]:
            f(*args, **kwargs)

Производитель объявляет событие с помощью декоратора:

# producer
class MyJob(object):
    @event
    def progress(pct):
        """Called when progress is made. pct is the percent complete."""

    def run(self):
        n = 10
        for i in range(n+1):
            self.progress(100.0 * i / n)

#consumer
import sys, myjobs
job = myjobs.MyJob()
job.progress += lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()

Это работает точно так же, как приведенный выше код «простого наблюдателя», но вы можете добавить столько слушателей, сколько захотите, используя + = . (В отличие от C #, здесь нет типов обработчиков событий, вам не нужно new EventHandler (foo.bar) при подписке на событие, и вам не нужно проверять значение null перед запуском события Как и C #, события не подавляют исключения. )

Как выбрать

Если запись в журнал делает все, что вам нужно, используйте это. В противном случае сделайте самое простое, что работает для вас. Важно отметить, что вам не нужно брать на себя большую внешнюю зависимость.

13
ответ дан 26 November 2019 в 22:42
поделиться
Другие вопросы по тегам:

Похожие вопросы: