Я достиг узкого места в своем приложении и с трудом нахожу решение вокруг него. Немного предыстории:
. Текущая реализация:
. Существуют дополнительные проверки и тому подобное, чтобы убедиться, что все задачи очереди извлечения правильно обработаны и все элементы загружены.
Проблема:
Мы хотим загрузить и сохранить все элементы и агрегаты как можно быстрее. У меня есть 20 экземпляров, включенных для каждой конфигурации серверной части, описанной (. Я буду называть их серверной частью «агрегатора» и серверной частью «загрузчика» ). Серверная часть загрузчика, похоже, довольно быстро обрабатывает вызовы API. Я активно использую библиотеку NDB и асинхронные вызовы URL Fetches/Datastore, чтобы получить это. Я также включил threadsafe :true, так что ни один экземпляр не будет ждать завершения вызовов RPC перед запуском следующей задачи (все задачи могут работать независимо друг от друга и являются идемпотентными ).
Серверная часть агрегатора — это место, где вступает в игру большой поглотитель времени. Хранение 500 -1500 таких агрегатов асинхронно посредством транзакций занимает 40 или более секунд (, и я даже не думаю, что все транзакции правильно фиксируются ).Я сохраняю этот бэкенд с threadsafe :false, так как я использую крайний срок истечения очереди pull, равный 300 секундам, но если я разрешаю выполнение более чем одной задачи в одном экземпляре, они могут каскадироваться вниз и подтолкнуть выполнение некоторых задач к 300-секундная отметка, что позволяет другой задаче выполнить ту же задачу во второй раз и, возможно, удвоить -счет.
Журналы показывают BadRequestError: Nested transactions are not supported.
с предыдущей ошибкой (в трассировке стека )из TransactionFailedError: too much contention on these datastore entities. please try again.
. Другая ошибка, которую я обычно вижу, этоBadRequestError(The referenced transaction has expired or is no longer valid.)
Насколько я понимаю, иногда эти ошибки означают, что транзакция может быть совершена без дальнейшего взаимодействия. Как я узнаю, что это было правильно совершено? Делаю ли я это логично/эффективно или есть больше возможностей для параллелизма без риска все испортить?
Соответствующий код:
class GeneralShardConfig(ndb.Model):
"""Tracks the number of shards for each named counter."""
name = ndb.StringProperty(required=True)
num_shards = ndb.IntegerProperty(default=4)
class GeneralAggregateShard(ndb.Model):
"""Shards for each named counter"""
name = ndb.StringProperty(name='n', required=True)
count = ndb.FloatProperty(name='c', default=0.00) #acts as a total now
@ndb.tasklet
def increment_batch(data_set):
def run_txn(name, value):
@ndb.tasklet
def txn():
to_put = []
dbkey = ndb.Key(GeneralShardConfig, name)
config = yield dbkey.get_async(use_memcache=False)
if not config:
config = GeneralShardConfig(key=dbkey,name=name)
to_put.append(config)
index = random.randint(0, config.num_shards-1)
shard_name = name + str(index)
dbkey = ndb.Key(GeneralAggregateShard, shard_name)
counter = yield dbkey.get_async()
if not counter:
counter = GeneralAggregateShard(key=dbkey, name=name)
counter.count += value
to_put.append(counter)
yield ndb.put_multi_async(to_put)
return ndb.transaction_async(txn, use_memcache=False, xg=True)
res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]
raise ndb.Return(res)
Учитывая реализацию, единственное место для «конфликта», которое я вижу, это если 2 или более агрегатных задач должны обновлять одно и то же агрегатное имя, что не должно происходить слишком часто, и с сегментированными счетчиками я ожидаю, что это совпадение будет редко, если вообще когда-либо, происходить. я предполагаю Ошибка BadRequestError(The referenced transaction has expired or is no longer valid.)
появляется, когда цикл обработки событий проверяет состояние всех тасклетов и находит ссылку на завершенную транзакцию. Проблема здесь в том, что возникают ошибки, значит ли это, что все транзакции преждевременно обрываются, или я могу предположить, что все транзакции прошли? Я также предполагаю, что эта строка res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]
должна быть разбита на try/except для каждого тасклета, чтобы обнаружить эти ошибки.
Прежде чем я сойду с ума из-за этого, я был бы признателен за любое руководство / помощь в том, как оптимизировать этот процесс и сделать это надежным способом.
РЕДАКТИРОВАТЬ 1: Я изменил поведение задачи агрегатора следующим образом:
Это помогло уменьшить количество ошибок конкуренции, которые я наблюдал, но все еще не очень надежно. Совсем недавно я наткнулся на BadRequestError: Nested transactions are not supported.
с трассировкой стека, указывающейRuntimeError: Deadlock waiting for <Future fbf0db50 created by transaction_async(model.py:3345) for tasklet transaction(context.py:806) suspended generator transaction(context.py:876); pending>
Я считаю, что эта модификация должна оптимизировать процесс, позволяя объединять и пробовать все возможные перекрытия в процессе агрегации одновременно в одном экземпляре, а не в нескольких экземплярах, все выполняющие транзакции, которые могут столкнуться. У меня все еще есть проблемы с сохранением результатов надежным способом.