Просто некоторые комментарии к очень полезному ответу patricksurry:
Вот немного модифицированный код:
from pyspark.sql.types import *
rdd = sc.parallelize([('X01',41,'US',3),
('X01',41,'UK',1),
('X01',41,'CA',2),
('X02',72,'US',4),
('X02',72,'UK',6),
('X02',72,'CA',7),
('X02',72,'XX',8)])
schema = StructType([StructField('ID', StringType(), True),
StructField('Age', IntegerType(), True),
StructField('Country', StringType(), True),
StructField('Score', IntegerType(), True)])
df = sqlCtx.createDataFrame(rdd, schema)
# u is a dictionarie
# v is a Row
def seqPivot(u, v):
if not u:
u = {}
u[v.Country] = v.Score
# In the original posting the Age column was not specified
u["Age"] = v.Age
return u
# u1
# u2
def cmbPivot(u1, u2):
u1.update(u2)
return u1
pivot = (
rdd
.map(lambda row: Row(ID=row[0], Age=row[1], Country=row[2], Score=row[3]))
.keyBy(lambda row: row.ID)
.aggregateByKey(None, seqPivot, cmbPivot)
)
columns = (
pivot
.values()
.map(lambda u: set(u.keys()))
.reduce(lambda s,t: s.union(t))
)
columns_ord = sorted(columns)
result = sqlCtx.createDataFrame(
pivot
.map(lambda (k, u): [k] + [u.get(c, None) for c in columns_ord]),
schema=StructType(
[StructField('ID', StringType())] +
[StructField(c, IntegerType()) for c in columns_ord]
)
)
print result.show()
Наконец, выход должен быть
+---+---+---+---+---+----+
| ID|Age| CA| UK| US| XX|
+---+---+---+---+---+----+
|X02| 72| 7| 6| 4| 8|
|X01| 41| 2| 1| 3|null|
+---+---+---+---+---+----+
Правила.
Вы создаете события в системе и используете правила в процессоре потока событий.
А именно, скажите, что у Вас есть значок, "сделал 10 сообщений". Вы не выполняете "избранное количество (*) из сообщений где пользователь =: пользователь" для каждого сообщения. Скорее у Вас есть простое правило, которое наблюдает, что каждое сообщение появляется, и "считают их", храня состояние правил в профиле пользователя.
Тот путь "сделал 10 сообщений", является столь же дешевым, как "сделано 1 000 000" сообщений.
Это также делает систему намного более расширяемой.
Я соглашаюсь с Желанием на этом.
Создают "события" по страницам так каждый раз, когда случай происходит, т.е. пользователь удаляет сообщение, он запросит модуль события с событием, позволяет, говорят, EVENT_USER_DELETE_POST, и затем можно выбрать то событие и создать запрос на основе его. Можно затем решить, награжден ли значок или нет.
Это сохранит эти две логики отдельными и сохранит модульную конструкцию. Должно быть очень легко реализовать этот путь.
единственный недостаток - то, что, если событие не было "получено" затем, пользователь, возможно, заработал для значка критерии, но оно еще не было вознаграждено. Однако это никогда не должно происходить. Единственная ситуация, о которой я могу думать, состоит в том, если базой данных управляют вручную.