Передовой опыт для запроса большого количества сущностей ndb из хранилища данных

Я столкнулся с интересным ограничением в хранилище данных App Engine. Я создаю обработчик, который поможет нам проанализировать некоторые данные об использовании на одном из наших рабочих серверов. Чтобы выполнить анализ, мне нужно запросить и обобщить более 10 000 объектов, извлеченных из хранилища данных. Расчет не сложный, это просто гистограмма элементов, которые проходят определенный фильтр образцов использования. Проблема, с которой я столкнулся, заключается в том, что я не могу получить данные из хранилища данных достаточно быстро, чтобы выполнить какую-либо обработку до наступления крайнего срока запроса.

Я пробовал все, что мог придумать, чтобы разбить запрос на параллельные вызовы RPC для повышения производительности, но, согласно статистике приложений, я не могу заставить запросы выполняться параллельно. Какой бы метод я ни пробовал (, см. ниже ), всегда кажется, что RPC откатывается к водопаду последовательных следующих запросов.

Обратите внимание :код запроса и анализа работает, просто он работает слишком медленно, потому что я не могу достаточно быстро получить данные из хранилища данных.

Фон

У меня нет живой версии, которой я мог бы поделиться, но вот базовая модель той части системы, о которой я говорю:

class Session(ndb.Model):
   """ A tracked user session. (customer account (company), version, OS, etc) """
   data = ndb.JsonProperty(required = False, indexed = False)

class Sample(ndb.Model):
   name      = ndb.StringProperty  (required = True,  indexed = True)
   session   = ndb.KeyProperty     (required = True,  kind = Session)
   timestamp = ndb.DateTimeProperty(required = True,  indexed = True)
   tags      = ndb.StringProperty  (repeated = True,  indexed = True)

Вы можете думать о примерах как о моментах, когда пользователь использует возможность с заданным именем. (ex :'systemA.feature _x' ). Теги основаны на данных клиента, информации о системе и функции. ex :['winxp', '2.5.1', ​​'systemA', 'функция _x', 'учетная запись премиум-класса _'] ). Таким образом, теги образуют денормализованный набор токенов, который можно использовать для поиска интересующих образцов.

Анализ, который я пытаюсь провести, состоит в том, чтобы взять диапазон дат и задать вопрос, сколько раз использовалась функция набора функций (, возможно, все функции )в день (или в час )для каждой учетной записи клиента (компания, а не на пользователя ).

Таким образом, ввод обработчика будет примерно таким::

  • Дата начала
  • Дата окончания
  • Тег (с)

Вывод будет:

[{
   'company_account': <string>,
   'counts': [
      {'timeperiod': <iso8601 date>, 'count': <int>},...
   ]
 },...
]

Общий код для запросов

Вот некоторый код, общий для всех запросов. Общая структура обработчика представляет собой простой обработчик получения с использованием webapp2, который устанавливает параметры запроса, выполняет запрос, обрабатывает результаты, создает данные для возврата.

# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500   # Bring in large groups of entities

q = Sample.query()
q = q.order(Sample.timestamp)

# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))

def handle_sample(sample):
   session_obj = sample.session.get()    # Usually found in local or memcache thanks to ndb
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)

Опробованные методы

Я испробовал различные методы, чтобы извлечь данные из хранилища данных как можно быстрее и параллельно. Методы, которые я пробовал до сих пор, включают:

А.Одна итерация

Это более простой базовый случай для сравнения с другими методами. Я просто создаю запрос и перебираю все элементы, позволяя ndb делать то, что он делает, чтобы извлекать их один за другим.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)

for sample in q_iter:
   handle_sample(sample)

B. Большой выбор

Идея здесь заключалась в том, чтобы посмотреть, смогу ли я сделать одну очень большую выборку.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)

for sample in samples:
   handle_sample(sample)

C. Асинхронная выборка во временном диапазоне

Идея здесь состоит в том, чтобы признать, что выборки довольно хорошо разнесены по времени, поэтому я могу создать набор независимых запросов, которые разбивают весь временной регион на куски, и попытаться запустить каждый из них параллельно, используя асинхронность :

# split up timestamp space into 20 equal parts and async query each of them
ts_delta       = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []

for x in range(ts_intervals):
   cur_end_time = (cur_start_time + ts_delta)
   if x == (ts_intervals-1):    # Last one has to cover full range
      cur_end_time = end_time

   f = q.filter(Sample.timestamp >= cur_start_time,
                Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
   q_futures.append(f)
   cur_start_time = cur_end_time

# Now loop through and collect results
for f in q_futures:
   samples = f.get_result()
   for sample in samples:
      handle_sample(sample)

. D. Асинхронное отображение

Я попробовал этот метод, потому что в документации было сказано, что ndb может автоматически использовать некоторый параллелизм при использовании асинхронного метода Query.map _.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)

@ndb.tasklet
def process_sample(sample):
   period_ts   = getPeriodTimestamp(sample.timestamp)
   session_obj = yield sample.session.get_async()    # Lookup the session object from cache
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)
   raise ndb.Return(None)

q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()

Результат

Я протестировал один пример запроса для сбора данных об общем времени отклика и статистике приложений. Результаты:

A. Одиночная итерация

реальный :15,645 с

Этот последовательно извлекает пакеты один за другим, а затем извлекает каждый сеанс из кэша памяти.

Method A appstats

B. Большой выбор

реальный :12,12 с

Фактически то же самое, что и вариант А, но по какой-то причине немного быстрее.

Method B appstats

C. Асинхронная выборка во временном диапазоне

реальный :15,251 с

По-видимому, обеспечивает больший параллелизм в начале, но, похоже, замедляется из-за последовательности вызовов next во время итерации результатов. Также, похоже, не удается перекрыть поиск кэша памяти сеанса ожидающими запросами.

Method C appstats

D. Асинхронное отображение

реальное :13,752 с

Это мне труднее всего понять. Похоже, что q много перекрытий, но кажется, что все вытягивается водопадом, а не параллельно.

Method D appstats

Рекомендации

Исходя из всего этого, что я упускаю? Я просто достиг предела в App Engine или есть лучший способ параллельного извлечения большого количества сущностей?

Я в недоумении, что попробовать дальше. Я думал о том, чтобы переписать клиент, чтобы он делал несколько запросов к движку приложения параллельно, но это кажется довольно грубой силой. Я действительно ожидаю, что движок приложения сможет справиться с этим вариантом использования, поэтому я предполагаю, что мне чего-то не хватает.

Обновление

В конце концов я обнаружил, что вариант C был лучшим для моего случая. Я смог оптимизировать его, чтобы завершить за 6,1 секунды. Все еще не идеально, но намного лучше.

Получив совет от нескольких человек, я обнаружил, что следующие пункты являются ключевыми для понимания и сохранения в памяти:

  • Несколько запросов могут выполняться параллельно
  • Одновременно в полете может находиться только 10 RPC
  • Попробуйте денормализировать до такой степени, чтобы не было вторичных запросов
  • Этот тип задачи лучше оставить для сопоставления сокращения и очередей задач, а не запросов реального -времени

Итак, что я сделал, чтобы сделать это быстрее:

  • Я разделил пространство запросов с самого начала на основе времени. (примечание :чем более равны разделы с точки зрения возвращаемых объектов, тем лучше)
  • Я дополнительно денормализовал данные, чтобы исключить необходимость во вторичном запросе сеанса
  • . Я использовал асинхронные операции ndb и ждал _любых (), чтобы перекрыть запросы с обработкой

Я все еще не получаю той производительности, которую ожидал или хотел бы, но пока это работает. Я просто хотел бы, чтобы это был лучший способ быстро загружать большое количество последовательных объектов в память в обработчиках.

62
задан Dan McGrath 2 September 2016 в 22:55
поделиться