В Django запуск ./ manage.py runserver
действительно удобен для разработчиков, избавляя от хлопот по настройке и запуску настоящий веб-сервер.
Если вы не используете Django, вы все равно можете очень легко настроить сервер-пулемет.
Есть ли что-то подобное для AMQP?
Мне не нужна ни полная реализация, ни что-то надежное, просто то, что легко установить и запустить для разработчиков. Пакет PyPi был бы отличным.
Celery - не ответ. Мне не нужен клиент, мне нужен сервер. Как мини-питон RabbitMq. setInterval
позволяет повторять вызов функции каждые x секунд, не блокируя выполнение другого кода. Я создал этот пример декоратора:
import time, threading
def setInterval(interval, times = -1):
# This will be the actual decorator,
# with fixed interval and times parameter
def outer_wrap(function):
# This will be the function to be
# called
def wrap(*args, **kwargs):
# This is another function to be executed
# in a different thread to simulate setInterval
def inner_wrap():
i = 0
while i != times:
time.sleep(interval)
function(*args, **kwargs)
i += 1
threading.Timer(0, inner_wrap).start()
return wrap
return outer_wrap
, который будет использоваться следующим образом
@setInterval(1, 3)
def foo(a):
print(a)
foo('bar')
# Will print 'bar' 3 times with 1 second delays
, и мне кажется, что он работает нормально. Моя проблема в том, что
sys.exit ()
из основного потока не остановит его, равно как и нажатие Ctrl + c
. Единственный способ остановить это - убить процесс python извне. Я хотел бы иметь возможность отправить сигнал из основного потока, который остановит обратный вызов. Но я новичок в потоках - как я могу общаться между ними? РЕДАКТИРОВАТЬ На всякий случай, это последняя версия декоратора, благодаря помощи jd
import threading
def setInterval(interval, times = -1):
# This will be the actual decorator,
# with fixed interval and times parameter
def outer_wrap(function):
# This will be the function to be
# called
def wrap(*args, **kwargs):
stop = threading.Event()
# This is another function to be executed
# in a different thread to simulate setInterval
def inner_wrap():
i = 0
while i != times and not stop.isSet():
stop.wait(interval)
function(*args, **kwargs)
i += 1
t = threading.Timer(0, inner_wrap)
t.daemon = True
t.start()
return stop
return wrap
return outer_wrap
Его можно использовать с фиксированное количество повторов, как указано выше
@setInterval(1, 3)
def foo(a):
print(a)
foo('bar')
# Will print 'bar' 3 times with 1 second delays
, или их можно оставить для выполнения до тех пор, пока он не получит сигнал остановки
import time
@setInterval(1)
def foo(a):
print(a)
stopper = foo('bar')
time.sleep(5)
stopper.set()
# It will stop here, after printing 'bar' 5 times.