Я столкнулся с интересным ограничением в хранилище данных App Engine. Я создаю обработчик, который поможет нам проанализировать некоторые данные об использовании на одном из наших рабочих серверов. Чтобы выполнить анализ, мне нужно запросить и обобщить более 10 000 объектов, извлеченных из хранилища данных. Расчет не сложный, это просто гистограмма элементов, которые проходят определенный фильтр образцов использования. Проблема, с которой я столкнулся, заключается в том, что я не могу получить данные из хранилища данных достаточно быстро, чтобы выполнить какую-либо обработку до наступления крайнего срока запроса.
Я пробовал все, что мог придумать, чтобы разбить запрос на параллельные вызовы RPC для повышения производительности, но, согласно статистике приложений, я не могу заставить запросы выполняться параллельно. Какой бы метод я ни пробовал (, см. ниже ), всегда кажется, что RPC откатывается к водопаду последовательных следующих запросов.
Обратите внимание :код запроса и анализа работает, просто он работает слишком медленно, потому что я не могу достаточно быстро получить данные из хранилища данных.
У меня нет живой версии, которой я мог бы поделиться, но вот базовая модель той части системы, о которой я говорю:
class Session(ndb.Model):
""" A tracked user session. (customer account (company), version, OS, etc) """
data = ndb.JsonProperty(required = False, indexed = False)
class Sample(ndb.Model):
name = ndb.StringProperty (required = True, indexed = True)
session = ndb.KeyProperty (required = True, kind = Session)
timestamp = ndb.DateTimeProperty(required = True, indexed = True)
tags = ndb.StringProperty (repeated = True, indexed = True)
Вы можете думать о примерах как о моментах, когда пользователь использует возможность с заданным именем. (ex :'systemA.feature _x' ). Теги основаны на данных клиента, информации о системе и функции. ex :['winxp', '2.5.1', 'systemA', 'функция _x', 'учетная запись премиум-класса _'] ). Таким образом, теги образуют денормализованный набор токенов, который можно использовать для поиска интересующих образцов.
Анализ, который я пытаюсь провести, состоит в том, чтобы взять диапазон дат и задать вопрос, сколько раз использовалась функция набора функций (, возможно, все функции )в день (или в час )для каждой учетной записи клиента (компания, а не на пользователя ).
Таким образом, ввод обработчика будет примерно таким::
Вывод будет:
[{
'company_account': <string>,
'counts': [
{'timeperiod': <iso8601 date>, 'count': <int>},...
]
},...
]
Вот некоторый код, общий для всех запросов. Общая структура обработчика представляет собой простой обработчик получения с использованием webapp2, который устанавливает параметры запроса, выполняет запрос, обрабатывает результаты, создает данные для возврата.
# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500 # Bring in large groups of entities
q = Sample.query()
q = q.order(Sample.timestamp)
# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))
def handle_sample(sample):
session_obj = sample.session.get() # Usually found in local or memcache thanks to ndb
count_key = session_obj.data['customer']
addCountForPeriod(count_key, sample.timestamp)
Я испробовал различные методы, чтобы извлечь данные из хранилища данных как можно быстрее и параллельно. Методы, которые я пробовал до сих пор, включают:
Это более простой базовый случай для сравнения с другими методами. Я просто создаю запрос и перебираю все элементы, позволяя ndb делать то, что он делает, чтобы извлекать их один за другим.
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)
for sample in q_iter:
handle_sample(sample)
Идея здесь заключалась в том, чтобы посмотреть, смогу ли я сделать одну очень большую выборку.
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)
for sample in samples:
handle_sample(sample)
Идея здесь состоит в том, чтобы признать, что выборки довольно хорошо разнесены по времени, поэтому я могу создать набор независимых запросов, которые разбивают весь временной регион на куски, и попытаться запустить каждый из них параллельно, используя асинхронность :
# split up timestamp space into 20 equal parts and async query each of them
ts_delta = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []
for x in range(ts_intervals):
cur_end_time = (cur_start_time + ts_delta)
if x == (ts_intervals-1): # Last one has to cover full range
cur_end_time = end_time
f = q.filter(Sample.timestamp >= cur_start_time,
Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
q_futures.append(f)
cur_start_time = cur_end_time
# Now loop through and collect results
for f in q_futures:
samples = f.get_result()
for sample in samples:
handle_sample(sample)
Я попробовал этот метод, потому что в документации было сказано, что ndb может автоматически использовать некоторый параллелизм при использовании асинхронного метода Query.map _.
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
@ndb.tasklet
def process_sample(sample):
period_ts = getPeriodTimestamp(sample.timestamp)
session_obj = yield sample.session.get_async() # Lookup the session object from cache
count_key = session_obj.data['customer']
addCountForPeriod(count_key, sample.timestamp)
raise ndb.Return(None)
q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()
Я протестировал один пример запроса для сбора данных об общем времени отклика и статистике приложений. Результаты:
реальный :15,645 с
Этот последовательно извлекает пакеты один за другим, а затем извлекает каждый сеанс из кэша памяти.
реальный :12,12 с
Фактически то же самое, что и вариант А, но по какой-то причине немного быстрее.
реальный :15,251 с
По-видимому, обеспечивает больший параллелизм в начале, но, похоже, замедляется из-за последовательности вызовов next во время итерации результатов. Также, похоже, не удается перекрыть поиск кэша памяти сеанса ожидающими запросами.
реальное :13,752 с
Это мне труднее всего понять. Похоже, что q много перекрытий, но кажется, что все вытягивается водопадом, а не параллельно.
Исходя из всего этого, что я упускаю? Я просто достиг предела в App Engine или есть лучший способ параллельного извлечения большого количества сущностей?
Я в недоумении, что попробовать дальше. Я думал о том, чтобы переписать клиент, чтобы он делал несколько запросов к движку приложения параллельно, но это кажется довольно грубой силой. Я действительно ожидаю, что движок приложения сможет справиться с этим вариантом использования, поэтому я предполагаю, что мне чего-то не хватает.
В конце концов я обнаружил, что вариант C был лучшим для моего случая. Я смог оптимизировать его, чтобы завершить за 6,1 секунды. Все еще не идеально, но намного лучше.
Получив совет от нескольких человек, я обнаружил, что следующие пункты являются ключевыми для понимания и сохранения в памяти:
Итак, что я сделал, чтобы сделать это быстрее:
Я все еще не получаю той производительности, которую ожидал или хотел бы, но пока это работает. Я просто хотел бы, чтобы это был лучший способ быстро загружать большое количество последовательных объектов в память в обработчиках.