Скрученный: почему случается так, что передача задержанного обратного вызова к задержанному потоку делает поток, блокирующийся внезапно?

Я неудачно пытался использовать txredis (не блокирование скрутило API для советов) для сохраняющейся очереди сообщений, которую я пытаюсь настроить с пестрым проектом, я продолжаю работать. Я нашел, что, хотя клиент не блокировался, это стало намного медленнее, чем это, возможно, было, потому что то, что должно было быть одним событием в реакторном цикле, было разделено на тысячи шагов.

Таким образом вместо этого, я пытался использовать redis-py (регулярное блокирование скрутило API), и обертывание вызова в задержанном потоке. Это работает отлично, однако я хочу выполнить внутреннее, задержанное, когда я звоню советам, поскольку я хотел бы настроить организацию пула подключений в попытках ускорить вещи далее.

Ниже моя интерпретация некоторого примера кода, взятого из скрученных документов для задержанного потока для иллюстрирования моего варианта использования:

#!/usr/bin/env python
from twisted.internet import reactor,threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'


def aBlockingRedisCall():
    print 'doing lookup... this may take a while'
    time.sleep(10)
    return 'results from redis'

def result(res):
    print res

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)
    d = threads.deferToThread(aBlockingRedisCall)
    d.addCallback(result)
    reactor.run()

if __name__=='__main__':
    main()

И вот мое изменение для организации пула подключений, которая делает код в задержанном блокировании потока:

#!/usr/bin/env python
from twisted.internet import reactor,defer
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        time.sleep(10) # this is now blocking.. any ideas?
        d = defer.Deferred()
        d.addCallback(gotFinalResult)
        d.callback(x)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

Таким образом, мой вопрос, кто-либо знает, почему мое изменение заставляет задержанный поток блокироваться, и/или кто-либо может предложить лучшее решение?

7
задан surtyaar 18 March 2010 в 01:03
поделиться

2 ответа

Что ж, как сказано в искаженных документах :

Deferred не заставляют код волшебным образом не блокироваться

Каждый раз, когда вы используете код блокировки, такой как sleep , вы должны передать его новому потоку.

#!/usr/bin/env python
from twisted.internet import reactor,defer, threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        def getstuff( x ):
            time.sleep(3)
            return "stuff is %s" % x

        # getstuff is blocking, so you need to push it to a new thread
        d = threads.deferToThread(getstuff, x)
        d.addCallback(gotFinalResult)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

В случае, если api redis не очень сложен, было бы более естественно переписать его с помощью twisted.web, вместо того, чтобы просто вызывать блокирующий api в большом количестве потоков.

12
ответ дан 6 December 2019 в 15:20
поделиться

Попутно замечу, что вы могли бы получить много пользы, используя клиент Redis, созданный специально для Twisted, например, вот этот: http://github.com/deldotdr/txRedis

0
ответ дан 6 December 2019 в 15:20
поделиться
Другие вопросы по тегам:

Похожие вопросы: