Вы можете попробовать этот код. Я нашел его раньше, когда у меня возникла проблема, похожая на вашу.
if (isset($_GET['q1mrks']) && isset($_GET['marks']) && isset($_GET['qt1'])) {
$Q1mrks = $_GET['q1mrks'];
$marks = $_GET['marks'];
$qt1 = $_GET['qt1'];
$qtype_qry = mysql_query("
SELECT *
FROM s_questiontypes
WHERE quetype_id = '$qt1'
");
$row = mysql_fetch_assoc($qtype_qry);
$qcode = $row['quetype_code'];
$sq_qry = "
SELECT *
FROM s_question
WHERE quetype_code = '$qcode'
ORDER BY RAND() LIMIT $Q1mrks
";
$sq_qry = mysql_query("
SELECT *
FROM s_question
WHERE quetype_code = '$qcode'
LIMIT $Q1mrks
");
while ($qrow = mysql_fetch_array($sq_qry)) {
$qm = $qrow['marks'] . "<br />";
$total += $qm . "<br />";
}
echo $total . "/" . $marks;
}
Начиная с Spark 1.6 вы можете использовать функцию pivot
на GroupedData
и предоставить обобщенное выражение.
pivoted = (df
.groupBy("ID", "Age")
.pivot(
"Country",
['US', 'UK', 'CA']) # Optional list of levels
.sum("Score")) # alternatively you can use .agg(expr))
pivoted.show()
## +---+---+---+---+---+
## | ID|Age| US| UK| CA|
## +---+---+---+---+---+
## |X01| 41| 3| 1| 2|
## |X02| 72| 4| 6| 7|
## +---+---+---+---+---+
Уровни могут быть опущены, но если они могут повысить производительность и служит в качестве внутреннего фильтра.
Этот метод по-прежнему относительно медленный, но, конечно, бит вручную передает данные вручную между JVM и Python.
Прежде всего, это, вероятно, не очень хорошая идея, потому что вы не получаете никакой дополнительной информации, но вы привязываетесь к фиксированной схеме (то есть вам нужно знать, сколько стран вы ожидаете, и, конечно же, дополнительная страна означает изменение кода)
Сказав это, это проблема SQL, которая показана ниже. Но если вы полагаете, что это не слишком «программное обеспечение» (серьезно, я это слышал !!), то вы можете отсылать первое решение.
Решение 1:
def reshape(t):
out = []
out.append(t[0])
out.append(t[1])
for v in brc.value:
if t[2] == v:
out.append(t[3])
else:
out.append(0)
return (out[0],out[1]),(out[2],out[3],out[4],out[5])
def cntryFilter(t):
if t[2] in brc.value:
return t
else:
pass
def addtup(t1,t2):
j=()
for k,v in enumerate(t1):
j=j+(t1[k]+t2[k],)
return j
def seq(tIntrm,tNext):
return addtup(tIntrm,tNext)
def comb(tP,tF):
return addtup(tP,tF)
countries = ['CA', 'UK', 'US', 'XX']
brc = sc.broadcast(countries)
reshaped = calls.filter(cntryFilter).map(reshape)
pivot = reshaped.aggregateByKey((0,0,0,0),seq,comb,1)
for i in pivot.collect():
print i
Теперь решение 2: Конечно, лучше, поскольку SQL является правильным инструментом для этого
callRow = calls.map(lambda t:
Row(userid=t[0],age=int(t[1]),country=t[2],nbrCalls=t[3]))
callsDF = ssc.createDataFrame(callRow)
callsDF.printSchema()
callsDF.registerTempTable("calls")
res = ssc.sql("select userid,age,max(ca),max(uk),max(us),max(xx)\
from (select userid,age,\
case when country='CA' then nbrCalls else 0 end ca,\
case when country='UK' then nbrCalls else 0 end uk,\
case when country='US' then nbrCalls else 0 end us,\
case when country='XX' then nbrCalls else 0 end xx \
from calls) x \
group by userid,age")
res.show()
данных:
data=[('X01',41,'US',3),('X01',41,'UK',1),('X01',41,'CA',2),('X02',72,'US',4),('X02',72,'UK',6),('X02',72,'CA',7),('X02',72,'XX',8)]
calls = sc.parallelize(data,1)
countries = ['CA', 'UK', 'US', 'XX']
Результат:
Из 1-го решения
(('X02', 72), (7, 6, 4, 8))
(('X01', 41), (2, 1, 3, 0))
Из второго решения:
root |-- age: long (nullable = true)
|-- country: string (nullable = true)
|-- nbrCalls: long (nullable = true)
|-- userid: string (nullable = true)
userid age ca uk us xx
X02 72 7 6 4 8
X01 41 2 1 3 0
Пожалуйста, дайте мне знать, если это работает, или нет:)
Best Ayan
Итак, сначала я должен был внести эту коррекцию в ваш RDD (который соответствует вашему фактическому выходу):
rdd = sc.parallelize([('X01',41,'US',3),
('X01',41,'UK',1),
('X01',41,'CA',2),
('X02',72,'US',4),
('X02',72,'UK',6),
('X02',72,'CA',7),
('X02',72,'XX',8)])
Как только я сделал эту коррекцию, это сделало трюк:
df.select($"ID", $"Age").groupBy($"ID").agg($"ID", first($"Age") as "Age")
.join(
df.select($"ID" as "usID", $"Country" as "C1",$"Score" as "US"),
$"ID" === $"usID" and $"C1" === "US"
)
.join(
df.select($"ID" as "ukID", $"Country" as "C2",$"Score" as "UK"),
$"ID" === $"ukID" and $"C2" === "UK"
)
.join(
df.select($"ID" as "caID", $"Country" as "C3",$"Score" as "CA"),
$"ID" === $"caID" and $"C3" === "CA"
)
.select($"ID",$"Age",$"US",$"UK",$"CA")
Не так элегантно, как ваш стержень, конечно.
Просто некоторые комментарии к очень полезному ответу patricksurry:
Вот немного модифицированный код:
from pyspark.sql.types import *
rdd = sc.parallelize([('X01',41,'US',3),
('X01',41,'UK',1),
('X01',41,'CA',2),
('X02',72,'US',4),
('X02',72,'UK',6),
('X02',72,'CA',7),
('X02',72,'XX',8)])
schema = StructType([StructField('ID', StringType(), True),
StructField('Age', IntegerType(), True),
StructField('Country', StringType(), True),
StructField('Score', IntegerType(), True)])
df = sqlCtx.createDataFrame(rdd, schema)
# u is a dictionarie
# v is a Row
def seqPivot(u, v):
if not u:
u = {}
u[v.Country] = v.Score
# In the original posting the Age column was not specified
u["Age"] = v.Age
return u
# u1
# u2
def cmbPivot(u1, u2):
u1.update(u2)
return u1
pivot = (
rdd
.map(lambda row: Row(ID=row[0], Age=row[1], Country=row[2], Score=row[3]))
.keyBy(lambda row: row.ID)
.aggregateByKey(None, seqPivot, cmbPivot)
)
columns = (
pivot
.values()
.map(lambda u: set(u.keys()))
.reduce(lambda s,t: s.union(t))
)
columns_ord = sorted(columns)
result = sqlCtx.createDataFrame(
pivot
.map(lambda (k, u): [k] + [u.get(c, None) for c in columns_ord]),
schema=StructType(
[StructField('ID', StringType())] +
[StructField(c, IntegerType()) for c in columns_ord]
)
)
print result.show()
Наконец, выход должен быть
+---+---+---+---+---+----+
| ID|Age| CA| UK| US| XX|
+---+---+---+---+---+----+
|X02| 72| 7| 6| 4| 8|
|X01| 41| 2| 1| 3|null|
+---+---+---+---+---+----+
В PIVOT есть JIRA, чтобы сделать это изначально, без огромного оператора CASE для каждого значения:
https://issues.apache.org/jira/browse/HIVE -3776
Прошу проголосовать за JIRA, чтобы она была реализована раньше. Как только в Hive SQL, Spark обычно не слишком сильно отстает, и в конечном итоге он будет реализован и в Spark.