Прежде всего, это, вероятно, не очень хорошая идея, потому что вы не получаете никакой дополнительной информации, но вы привязываетесь к фиксированной схеме (то есть вам нужно знать, сколько стран вы ожидаете, и, конечно же, дополнительная страна означает изменение кода)
Сказав это, это проблема SQL, которая показана ниже. Но если вы полагаете, что это не слишком «программное обеспечение» (серьезно, я это слышал !!), то вы можете отсылать первое решение.
Решение 1:
def reshape(t):
out = []
out.append(t[0])
out.append(t[1])
for v in brc.value:
if t[2] == v:
out.append(t[3])
else:
out.append(0)
return (out[0],out[1]),(out[2],out[3],out[4],out[5])
def cntryFilter(t):
if t[2] in brc.value:
return t
else:
pass
def addtup(t1,t2):
j=()
for k,v in enumerate(t1):
j=j+(t1[k]+t2[k],)
return j
def seq(tIntrm,tNext):
return addtup(tIntrm,tNext)
def comb(tP,tF):
return addtup(tP,tF)
countries = ['CA', 'UK', 'US', 'XX']
brc = sc.broadcast(countries)
reshaped = calls.filter(cntryFilter).map(reshape)
pivot = reshaped.aggregateByKey((0,0,0,0),seq,comb,1)
for i in pivot.collect():
print i
Теперь решение 2: Конечно, лучше, поскольку SQL является правильным инструментом для этого
callRow = calls.map(lambda t:
Row(userid=t[0],age=int(t[1]),country=t[2],nbrCalls=t[3]))
callsDF = ssc.createDataFrame(callRow)
callsDF.printSchema()
callsDF.registerTempTable("calls")
res = ssc.sql("select userid,age,max(ca),max(uk),max(us),max(xx)\
from (select userid,age,\
case when country='CA' then nbrCalls else 0 end ca,\
case when country='UK' then nbrCalls else 0 end uk,\
case when country='US' then nbrCalls else 0 end us,\
case when country='XX' then nbrCalls else 0 end xx \
from calls) x \
group by userid,age")
res.show()
данных:
data=[('X01',41,'US',3),('X01',41,'UK',1),('X01',41,'CA',2),('X02',72,'US',4),('X02',72,'UK',6),('X02',72,'CA',7),('X02',72,'XX',8)]
calls = sc.parallelize(data,1)
countries = ['CA', 'UK', 'US', 'XX']
Результат:
Из 1-го решения
(('X02', 72), (7, 6, 4, 8))
(('X01', 41), (2, 1, 3, 0))
Из второго решения:
root |-- age: long (nullable = true)
|-- country: string (nullable = true)
|-- nbrCalls: long (nullable = true)
|-- userid: string (nullable = true)
userid age ca uk us xx
X02 72 7 6 4 8
X01 41 2 1 3 0
Пожалуйста, дайте мне знать, если это работает, или нет:)
Best Ayan
Одним из способов перехвата необработанного сообщения является использование SoapExtensions .
Альтернативой SoapExtensions является реализация IHttpModule и захват входящего потока по мере его поступления.
public class LogModule : IHttpModule
{
public void Init(HttpApplication context)
{
context.BeginRequest += this.OnBegin;
}
private void OnBegin(object sender, EventArgs e)
{
HttpApplication app = (HttpApplication)sender;
HttpContext context = app.Context;
byte[] buffer = new byte[context.Request.InputStream.Length];
context.Request.InputStream.Read(buffer, 0, buffer.Length);
context.Request.InputStream.Position = 0;
string soapMessage = Encoding.ASCII.GetString(buffer);
// Do something with soapMessage
}
public void Dispose()
{
throw new NotImplementedException();
}
}
Нет простых способов сделать это. Вам нужно будет реализовать SoapExtension . В примере по предыдущей ссылке показано расширение, которое можно использовать для регистрации данных.
Если бы вы использовали WCF, вы могли бы просто настроить конфигурацию для создания журналов сообщений.
Согласно Стивену де Саласу , вы можете использовать свойство Request.InputStream
в веб-методе. Я не пробовал, но он говорит, что работает.
Я бы хотел протестировать это как с http, так и с https, а также с другими SoapExtensions, работающими одновременно, и без них. Это вещи, которые могут повлиять на тип потока, для которого установлен InputStream
. Например, некоторые потоки не могут выполнять поиск, что может привести к тому, что у вас будет поток, расположенный после конца данных, который вы не можете переместить в начало.