Сила Скрученных (для Python) является своей асинхронной платформой (я думаю). Я записал сервер обработки изображений, который берет запросы через Перспективного Брокера. Это работает отлично, пока я подаю его меньше чем пара сотни изображений за один раз. Однако иногда это пронзено с сотнями изображений в фактически то же время. Поскольку это пытается обработать их всех одновременно катастрофические отказы сервера.
Как решение я хотел бы стоять в очереди remote_calls на сервере так, чтобы это только обработало ~100 изображений за один раз. Кажется, что это могло бы быть чем-то, что Скрученный уже делает, но я, может казаться, не нахожу его. Какие-либо идеи о том, как начать реализовывать это? Точка в правильном направлении?Спасибо!
Один из готовых вариантов, который может помочь в этом, - это twisted.internet.defer.DeferredSemaphore
. Это асинхронная версия обычного (подсчитывающего) семафора, который вы, возможно, уже знаете, если вы много работали с многопоточным программированием.
(счетный) семафор во многом похож на мьютекс (блокировку). Но там, где мьютекс может быть получен только один раз до соответствующего выпуска, (счетный) семафор может быть настроен так, чтобы разрешить произвольное (но заданное) количество захватов до того, как потребуются какие-либо соответствующие выпуски.
Вот пример использования DeferredSemaphore
для выполнения десяти асинхронных операций, но не более трех из них одновременно:
from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def main():
sem = DeferredSemaphore(3)
jobs = []
for i in range(10):
jobs.append(sem.run(async, i))
d = gatherResults(jobs)
d.addCallback(lambda ignored: reactor.stop())
reactor.run()
if __name__ == '__main__':
main()
DeferredSemaphore
также имеет явное получение
и release
, но метод run
настолько удобен, что почти всегда то, что вам нужно. Он вызывает метод приобретения
, который возвращает Deferred
. К этому первому Deferred
он добавляет обратный вызов, который вызывает переданную вами функцию (вместе с любыми позиционными или ключевыми аргументами). Если эта функция возвращает Deferred
, то к этому второму Deferred
добавляется обратный вызов, который вызывает метод release
.
Синхронный случай также обрабатывается путем немедленного вызова выпуска
. Ошибки также обрабатываются, позволяя им распространяться, но проверяя, что необходим релиз
, чтобы оставить DeferredSemaphore
в согласованном состоянии.Результат функции, переданной в run
(или результат Deferred
, который она возвращает)становится результатом Deferred
, возвращенного прогоном
.
Другой возможный подход может быть основан на DeferredQueue
и кооперация
. DeferredQueue
в основном похожа на обычную очередь, но ее метод get
возвращает Deferred
. Если во время вызова в очереди нет элементов, Deferred
не сработает, пока элемент не будет добавлен.
Вот пример:
from random import randrange
from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def assign(jobs):
# Create new jobs to be processed
jobs.put(randrange(10))
reactor.callLater(randrange(10), assign, jobs)
def worker(jobs):
while True:
yield jobs.get().addCallback(async)
def main():
jobs = DeferredQueue()
for i in range(10):
jobs.put(i)
assign(jobs)
for i in range(3):
cooperate(worker(jobs))
reactor.run()
if __name__ == '__main__':
main()
Обратите внимание, что рабочая функция async
такая же, как и в первом примере. Однако на этот раз есть также функция worker
, которая явно извлекает задания из DeferredQueue
и обрабатывает их с помощью async
(путем добавления async
как обратный вызов Deferred
, возвращенный get
). Генератор worker
управляется кооперативом
, который повторяет его один раз после каждого отложенного
возгорания. Затем основной цикл запускает три из этих генераторов рабочих, так что три задания будут выполняться в любой момент времени.
Этот подход включает немного больше кода, чем подход DeferredSemaphore
, но имеет некоторые преимущества, которые могут быть интересны. Во-первых, cooperate
возвращает экземпляр CooperativeTask
, который имеет полезные методы, такие как pause
, resume
, и несколько других.Кроме того, все задания, назначенные одному и тому же кооператору, будут взаимодействовать друг с другом при планировании, чтобы не перегружать цикл событий (и это то, что дает API его имя). На стороне DeferredQueue
также можно установить ограничения на количество элементов, ожидающих обработки, чтобы избежать полной перегрузки сервера (например, если процессоры изображений застревают и перестают выполнять задачи). Если код, вызывающий put
, обрабатывает исключение переполнения очереди, вы можете использовать это как давление, чтобы попытаться прекратить прием новых заданий (возможно, перенаправив их на другой сервер или предупредив администратора). Выполнение аналогичных действий с DeferredSemaphore
немного сложнее, поскольку нет способа ограничить количество заданий, ожидающих получения семафора.
Вам также может понравиться написанная мной txRDQ (Resizable Dispatch Queue). Погуглите, он находится в коллекции tx на LaunchPad. Извините, у меня нет времени ответить - собираюсь выйти на сцену.
Терри