Облегченная реализация сервера Python AMQP для разработки и имитации?

В Django запуск ./ manage.py runserver действительно удобен для разработчиков, избавляя от хлопот по настройке и запуску настоящий веб-сервер.

Если вы не используете Django, вы все равно можете очень легко настроить сервер-пулемет.

Есть ли что-то подобное для AMQP?

Мне не нужна ни полная реализация, ни что-то надежное, просто то, что легко установить и запустить для разработчиков. Пакет PyPi был бы отличным.

Celery - не ответ. Мне не нужен клиент, мне нужен сервер. Как мини-питон RabbitMq. setInterval позволяет повторять вызов функции каждые x секунд, не блокируя выполнение другого кода. Я создал этот пример декоратора:

import time, threading

def setInterval(interval, times = -1):
    # This will be the actual decorator,
    # with fixed interval and times parameter
    def outer_wrap(function):
        # This will be the function to be
        # called
        def wrap(*args, **kwargs):
            # This is another function to be executed
            # in a different thread to simulate setInterval
            def inner_wrap():
                i = 0
                while i != times:
                    time.sleep(interval)
                    function(*args, **kwargs)
                    i += 1
            threading.Timer(0, inner_wrap).start()
        return wrap
    return outer_wrap

, который будет использоваться следующим образом

@setInterval(1, 3)
def foo(a):
    print(a)

foo('bar')
# Will print 'bar' 3 times with 1 second delays

, и мне кажется, что он работает нормально. Моя проблема в том, что

  • это кажется слишком сложным, и я боюсь, что, возможно, пропустил более простой / лучший механизм
  • , декоратор может быть вызван без второго параметра, и в этом случае он будет продолжаться вечно. Когда я говорю «раньше», я имею в виду навсегда - даже вызов sys.exit () из основного потока не остановит его, равно как и нажатие Ctrl + c . Единственный способ остановить это - убить процесс python извне. Я хотел бы иметь возможность отправить сигнал из основного потока, который остановит обратный вызов. Но я новичок в потоках - как я могу общаться между ними?

РЕДАКТИРОВАТЬ На всякий случай, это последняя версия декоратора, благодаря помощи jd

import threading

def setInterval(interval, times = -1):
    # This will be the actual decorator,
    # with fixed interval and times parameter
    def outer_wrap(function):
        # This will be the function to be
        # called
        def wrap(*args, **kwargs):
            stop = threading.Event()

            # This is another function to be executed
            # in a different thread to simulate setInterval
            def inner_wrap():
                i = 0
                while i != times and not stop.isSet():
                    stop.wait(interval)
                    function(*args, **kwargs)
                    i += 1

            t = threading.Timer(0, inner_wrap)
            t.daemon = True
            t.start()
            return stop
        return wrap
    return outer_wrap

Его можно использовать с фиксированное количество повторов, как указано выше

@setInterval(1, 3)
def foo(a):
    print(a)

foo('bar')
# Will print 'bar' 3 times with 1 second delays

, или их можно оставить для выполнения до тех пор, пока он не получит сигнал остановки

import time

@setInterval(1)
def foo(a):
    print(a)

stopper = foo('bar')

time.sleep(5)
stopper.set()
# It will stop here, after printing 'bar' 5 times.
20
задан Community 23 May 2017 в 12:08
поделиться