Надежный способ выполнить тысячи независимых транзакций?

Я достиг узкого места в своем приложении и с трудом нахожу решение вокруг него. Немного предыстории:

  • Мое приложение пингует API для сбора информации о сотнях тысяч элементов и сохранения их в хранилище данных
  • . Нам нужно выполнить простые агрегации по сочетанию измерений этих элементов, которые мы пытаемся вычислить во время хранения элементов

. Текущая реализация:

  • Мы запускаем загрузку этих элементов вручную по мере необходимости, что создает задачи на серверной части, предназначенные для загрузки этих элементов. Каждая задача будет запускать больше задач в зависимости от #вызовов API, необходимых для разбиения на страницы и получения каждого элемента.
  • Каждая задача будет загружать, анализировать и массово хранить элементы,сохраняя при этом нужные агрегаты в памяти с помощью словаря.
  • В конце выполнения каждой задачи мы записываем словарь агрегатов в пулл-очередь.
  • Как только мы обнаружим, что мы приближаемся к концу вызовов API, мы запускаем задачу агрегации для второй конфигурации серверной части
  • . Эта «задача агрегирования» извлекает из очереди извлечения (по 20 за раз )и объединяет словари, найденные в каждой задаче (, выполняя агрегацию в памяти )перед попыткой сохранить каждую агрегацию. Эта задача также запускает другие задачи для выполнения агрегирования оставшихся задач в очереди извлечения (сотен)
  • Мы используем подход с сегментированным счетчиком , чтобы облегчить любые конфликты при сохранении в хранилище данных
  • . Каждая задача агрегирования может попытаться сохранить 500 -1500 агрегаций, которые должны быть независимы друг от друга

. Существуют дополнительные проверки и тому подобное, чтобы убедиться, что все задачи очереди извлечения правильно обработаны и все элементы загружены.

Проблема:

Мы хотим загрузить и сохранить все элементы и агрегаты как можно быстрее. У меня есть 20 экземпляров, включенных для каждой конфигурации серверной части, описанной (. Я буду называть их серверной частью «агрегатора» и серверной частью «загрузчика» ). Серверная часть загрузчика, похоже, довольно быстро обрабатывает вызовы API. Я активно использую библиотеку NDB и асинхронные вызовы URL Fetches/Datastore, чтобы получить это. Я также включил threadsafe :true, так что ни один экземпляр не будет ждать завершения вызовов RPC перед запуском следующей задачи (все задачи могут работать независимо друг от друга и являются идемпотентными ).

Серверная часть агрегатора — это место, где вступает в игру большой поглотитель времени. Хранение 500 -1500 таких агрегатов асинхронно посредством транзакций занимает 40 или более секунд (, и я даже не думаю, что все транзакции правильно фиксируются ).Я сохраняю этот бэкенд с threadsafe :false, так как я использую крайний срок истечения очереди pull, равный 300 секундам, но если я разрешаю выполнение более чем одной задачи в одном экземпляре, они могут каскадироваться вниз и подтолкнуть выполнение некоторых задач к 300-секундная отметка, что позволяет другой задаче выполнить ту же задачу во второй раз и, возможно, удвоить -счет.

Журналы показывают BadRequestError: Nested transactions are not supported.с предыдущей ошибкой (в трассировке стека )из TransactionFailedError: too much contention on these datastore entities. please try again.. Другая ошибка, которую я обычно вижу, этоBadRequestError(The referenced transaction has expired or is no longer valid.)

Насколько я понимаю, иногда эти ошибки означают, что транзакция может быть совершена без дальнейшего взаимодействия. Как я узнаю, что это было правильно совершено? Делаю ли я это логично/эффективно или есть больше возможностей для параллелизма без риска все испортить?

Соответствующий код:

class GeneralShardConfig(ndb.Model):
    """Tracks the number of shards for each named counter."""
    name = ndb.StringProperty(required=True)
    num_shards = ndb.IntegerProperty(default=4)

class GeneralAggregateShard(ndb.Model):
    """Shards for each named counter"""
    name = ndb.StringProperty(name='n', required=True)
    count = ndb.FloatProperty(name='c', default=0.00) #acts as a total now

@ndb.tasklet
def increment_batch(data_set):
    def run_txn(name, value):
        @ndb.tasklet
        def txn():
            to_put = []
            dbkey = ndb.Key(GeneralShardConfig, name)
            config = yield dbkey.get_async(use_memcache=False)
            if not config:
                config = GeneralShardConfig(key=dbkey,name=name)
                to_put.append(config)
            index = random.randint(0, config.num_shards-1)
            shard_name =  name + str(index)
            dbkey = ndb.Key(GeneralAggregateShard, shard_name)
            counter = yield dbkey.get_async()
            if not counter:
                counter = GeneralAggregateShard(key=dbkey, name=name)
            counter.count += value
            to_put.append(counter)
            yield ndb.put_multi_async(to_put)
        return ndb.transaction_async(txn, use_memcache=False, xg=True)
    res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]
    raise ndb.Return(res)

Учитывая реализацию, единственное место для «конфликта», которое я вижу, это если 2 или более агрегатных задач должны обновлять одно и то же агрегатное имя, что не должно происходить слишком часто, и с сегментированными счетчиками я ожидаю, что это совпадение будет редко, если вообще когда-либо, происходить. я предполагаю Ошибка BadRequestError(The referenced transaction has expired or is no longer valid.)появляется, когда цикл обработки событий проверяет состояние всех тасклетов и находит ссылку на завершенную транзакцию. Проблема здесь в том, что возникают ошибки, значит ли это, что все транзакции преждевременно обрываются, или я могу предположить, что все транзакции прошли? Я также предполагаю, что эта строка res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]должна быть разбита на try/except для каждого тасклета, чтобы обнаружить эти ошибки.

Прежде чем я сойду с ума из-за этого, я был бы признателен за любое руководство / помощь в том, как оптимизировать этот процесс и сделать это надежным способом.

РЕДАКТИРОВАТЬ 1: Я изменил поведение задачи агрегатора следующим образом:

  • Если из очереди было арендовано более 1 задачи, агрегируйте задачи в памяти, затем сохраните результат в другой задаче в очереди pull -и сразу же запустите другую «задачу-агрегатор»
  • . В противном случае, если 1 задача была сдана в аренду,попробуйте сохранить результаты

Это помогло уменьшить количество ошибок конкуренции, которые я наблюдал, но все еще не очень надежно. Совсем недавно я наткнулся на BadRequestError: Nested transactions are not supported.с трассировкой стека, указывающейRuntimeError: Deadlock waiting for <Future fbf0db50 created by transaction_async(model.py:3345) for tasklet transaction(context.py:806) suspended generator transaction(context.py:876); pending>

Я считаю, что эта модификация должна оптимизировать процесс, позволяя объединять и пробовать все возможные перекрытия в процессе агрегации одновременно в одном экземпляре, а не в нескольких экземплярах, все выполняющие транзакции, которые могут столкнуться. У меня все еще есть проблемы с сохранением результатов надежным способом.

13
задан someone1 28 June 2012 в 21:03
поделиться