Удаленные вызовы очереди к Python Скрученный перспективный брокер?

Сила Скрученных (для Python) является своей асинхронной платформой (я думаю). Я записал сервер обработки изображений, который берет запросы через Перспективного Брокера. Это работает отлично, пока я подаю его меньше чем пара сотни изображений за один раз. Однако иногда это пронзено с сотнями изображений в фактически то же время. Поскольку это пытается обработать их всех одновременно катастрофические отказы сервера.

Как решение я хотел бы стоять в очереди remote_calls на сервере так, чтобы это только обработало ~100 изображений за один раз. Кажется, что это могло бы быть чем-то, что Скрученный уже делает, но я, может казаться, не нахожу его. Какие-либо идеи о том, как начать реализовывать это? Точка в правильном направлении?Спасибо!

11
задан agartland 18 May 2010 в 23:22
поделиться

2 ответа

Один из готовых вариантов, который может помочь в этом, - это twisted.internet.defer.DeferredSemaphore . Это асинхронная версия обычного (подсчитывающего) семафора, который вы, возможно, уже знаете, если вы много работали с многопоточным программированием.

(счетный) семафор во многом похож на мьютекс (блокировку). Но там, где мьютекс может быть получен только один раз до соответствующего выпуска, (счетный) семафор может быть настроен так, чтобы разрешить произвольное (но заданное) количество захватов до того, как потребуются какие-либо соответствующие выпуски.

Вот пример использования DeferredSemaphore для выполнения десяти асинхронных операций, но не более трех из них одновременно:

from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor


def async(n):
    print 'Starting job', n
    d = deferLater(reactor, n, lambda: None)
    def cbFinished(ignored):
        print 'Finishing job', n
    d.addCallback(cbFinished)
    return d


def main():
    sem = DeferredSemaphore(3)

    jobs = []
    for i in range(10):
        jobs.append(sem.run(async, i))

    d = gatherResults(jobs)
    d.addCallback(lambda ignored: reactor.stop())
    reactor.run()


if __name__ == '__main__':
    main()

DeferredSemaphore также имеет явное получение и release , но метод run настолько удобен, что почти всегда то, что вам нужно. Он вызывает метод приобретения , который возвращает Deferred . К этому первому Deferred он добавляет обратный вызов, который вызывает переданную вами функцию (вместе с любыми позиционными или ключевыми аргументами). Если эта функция возвращает Deferred , то к этому второму Deferred добавляется обратный вызов, который вызывает метод release .

Синхронный случай также обрабатывается путем немедленного вызова выпуска . Ошибки также обрабатываются, позволяя им распространяться, но проверяя, что необходим релиз , чтобы оставить DeferredSemaphore в согласованном состоянии.Результат функции, переданной в run (или результат Deferred , который она возвращает)становится результатом Deferred , возвращенного прогоном .

Другой возможный подход может быть основан на DeferredQueue и кооперация . DeferredQueue в основном похожа на обычную очередь, но ее метод get возвращает Deferred . Если во время вызова в очереди нет элементов, Deferred не сработает, пока элемент не будет добавлен.

Вот пример:

from random import randrange

from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor


def async(n):
    print 'Starting job', n
    d = deferLater(reactor, n, lambda: None)
    def cbFinished(ignored):
        print 'Finishing job', n
    d.addCallback(cbFinished)
    return d


def assign(jobs):
    # Create new jobs to be processed
    jobs.put(randrange(10))
    reactor.callLater(randrange(10), assign, jobs)


def worker(jobs):
    while True:
        yield jobs.get().addCallback(async)


def main():
    jobs = DeferredQueue()

    for i in range(10):
        jobs.put(i)

    assign(jobs)

    for i in range(3):
        cooperate(worker(jobs))

    reactor.run()


if __name__ == '__main__':
    main()

Обратите внимание, что рабочая функция async такая же, как и в первом примере. Однако на этот раз есть также функция worker , которая явно извлекает задания из DeferredQueue и обрабатывает их с помощью async (путем добавления async как обратный вызов Deferred , возвращенный get ). Генератор worker управляется кооперативом , который повторяет его один раз после каждого отложенного возгорания. Затем основной цикл запускает три из этих генераторов рабочих, так что три задания будут выполняться в любой момент времени.

Этот подход включает немного больше кода, чем подход DeferredSemaphore , но имеет некоторые преимущества, которые могут быть интересны. Во-первых, cooperate возвращает экземпляр CooperativeTask , который имеет полезные методы, такие как pause , resume , и несколько других.Кроме того, все задания, назначенные одному и тому же кооператору, будут взаимодействовать друг с другом при планировании, чтобы не перегружать цикл событий (и это то, что дает API его имя). На стороне DeferredQueue также можно установить ограничения на количество элементов, ожидающих обработки, чтобы избежать полной перегрузки сервера (например, если процессоры изображений застревают и перестают выполнять задачи). Если код, вызывающий put , обрабатывает исключение переполнения очереди, вы можете использовать это как давление, чтобы попытаться прекратить прием новых заданий (возможно, перенаправив их на другой сервер или предупредив администратора). Выполнение аналогичных действий с DeferredSemaphore немного сложнее, поскольку нет способа ограничить количество заданий, ожидающих получения семафора.

29
ответ дан 3 December 2019 в 03:34
поделиться

Вам также может понравиться написанная мной txRDQ (Resizable Dispatch Queue). Погуглите, он находится в коллекции tx на LaunchPad. Извините, у меня нет времени ответить - собираюсь выйти на сцену.

Терри

-2
ответ дан 3 December 2019 в 03:34
поделиться
Другие вопросы по тегам:

Похожие вопросы: