Хотя ответ Чанга объясняет, как много раз рисовать на одном рисунке, в этом случае вам может быть лучше в этом случае с помощью groupby
и unstack
ing:
(Предположим, что у вас есть это в dataframe, с индексом datetime уже)
In [1]: df
Out[1]:
value
datetime
2010-01-01 1
2010-02-01 1
2009-01-01 1
# create additional month and year columns for convenience
df['Month'] = map(lambda x: x.month, df.index)
df['Year'] = map(lambda x: x.year, df.index)
In [5]: df.groupby(['Month','Year']).mean().unstack()
Out[5]:
value
Year 2009 2010
Month
1 1 1
2 NaN 1
Теперь легко построить (каждый год как отдельная строка):
df.groupby(['Month','Year']).mean().unstack().plot()
tl; dr Если вам действительно нужна операция, используйте groupByKey
, как предложено , по @MariusIon . Каждое предлагаемое здесь решение является либо неэффективным, либо, по меньшей мере, субоптимальным по сравнению с прямой группировкой.
reduceByKey
с конкатенацией списка не является приемлемым решением, потому что:
+
для пары списков требует полной копии обоих списков ( O (N) ), эффективно увеличивая общую сложность до O (N2) . groupByKey
. Количество данных, которые необходимо перетасовать, а также размер конечной структуры одинаковы. reduceByKey
и groupByKey
. combineByKey
с list.extend
является субоптимальным решением, потому что:
MergeValue
(это можно оптимизировать, используя list.append
непосредственно в новом элементе). list.append
, оно в точности эквивалентно (Spark & lt; = 1.3) реализации groupByKey
и игнорирует все оптимизации, введенные SPARK-3074, которые позволяют группировать внешние (на диске) структуры с большими размерами. Я немного опаздываю на разговор, но вот мое предложение:
>>> foo = sc.parallelize([(1, ('a','b')), (2, ('c','d')), (1, ('x','y'))])
>>> foo.map(lambda (x,y): (x, [y])).reduceByKey(lambda p,q: p+q).collect()
[(1, [('a', 'b'), ('x', 'y')]), (2, [('c', 'd')])]
equivalent Java code
для этого. Я хочу добиться аналогичной вещи в Java
– Mj1992
21 January 2017 в 18:19
Сообщение об ошибке связано с типом «a» в вашем закрытии.
My_KMV = My_KV.reduce(lambda a, b: a.append([b]))
Пусть pySpark явно оценивает a как список. Например,
My_KMV = My_KV.reduceByKey(lambda a,b:[a].extend([b]))
Во многих случаях reduceByKey будет предпочтительнее groupByKey, обратитесь к: http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/ best_practices / prefer_reducebykey_over_groupbykey.html
Хорошо. Надеюсь, у меня все получилось. Ваш вход выглядит примерно так:
kv_input = [("a", 1), ("a", 2), ("a", 3), ("b", 1), ("b", 5)]
, и вы хотите получить что-то вроде этого:
kmv_output = [("a", [1, 2, 3]), ("b", [1, 5])]
Тогда это может выполнить задание (см. здесь ):
d = dict()
for k, v in kv_input:
d.setdefault(k, list()).append(v)
kmv_output = list(d.items())
Если я ошибаюсь, скажите, пожалуйста, чтобы я мог настроить это на ваши нужды.
PS: a.append([b])
всегда возвращается None
, Возможно, вы захотите наблюдать либо [b]
, либо a
, но не результат append
.
extend
вместо append
, как и в моем ответе. См. Также Python - append vs. extend .
– Christian Strempfer
19 November 2014 в 08:30
Я попытался с помощью combByKey, вот мои шаги
combineddatardd=sc.parallelize([("A", 3), ("A", 9), ("A", 12),("B", 4), ("B", 10), ("B", 11)])
combineddatardd.combineByKey(lambda v:[v],lambda x,y:x+[y],lambda x,y:x+y).collect()
Выход:
[('A', [3, 9, 12]), ('B', [4, 10, 11])]
Вы можете использовать метод RDD groupByKey .
Вход:
data = [(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (2, 'e'), (3, 'f')]
rdd = sc.parallelize(data)
result = rdd.groupByKey().collect()
Выход:
[(1, ['a', 'b']), (2, ['c', 'd', 'e']), (3, ['f'])]
groupByKey
не рекомендуется, потому что это приводит к чрезмерному перетасовке. Вы должны использовать reduceByKey
( см. Эту ссылку ) или combineByKey
вместо этого, как было предложено @Christian_Strempfer
– nikosd
26 June 2015 в 05:11
Я ударил эту страницу, ища пример Java для той же проблемы. (Если ваш случай похож, вот мой пример)
Трюк - вам нужно группировать ключи.
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
public class SparkMRExample {
public static void main(String[] args) {
// spark context initialisation
SparkConf conf = new SparkConf()
.setAppName("WordCount")
.setMaster("local");
JavaSparkContext context = new JavaSparkContext(conf);
//input for testing;
List<String> input = Arrays.asList("Lorem Ipsum is simply dummy text of the printing and typesetting industry.",
"Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, when an unknown printer took a galley of type and scrambled it to make a type specimen book.",
"It has survived not only for centuries, but also the leap into electronic typesetting, remaining essentially unchanged.",
"It was popularised in the 1960s with the release of Letraset sheets containing Lorem Ipsum passages, and more recently with desktop publishing");
JavaRDD<String> inputRDD = context.parallelize(input);
// the map phase of word count example
JavaPairRDD<String, Integer> mappedRDD =
inputRDD.flatMapToPair( line -> // for this input, each string is a line
Arrays.stream(line.split("\\s+")) // splitting into words, converting into stream
.map(word -> new Tuple2<>(word, 1)) // each word is assigned with count 1
.collect(Collectors.toList())); // stream to iterable
// group the tuples by key
// (String,Integer) -> (String, Iterable<Integer>)
JavaPairRDD<String, Iterable<Integer>> groupedRDD = mappedRDD.groupByKey();
// the reduce phase of word count example
//(String, Iterable<Integer>) -> (String,Integer)
JavaRDD<Tuple2<String, Integer>> resultRDD =
groupedRDD.map(group -> //input is a tuple (String, Iterable<Integer>)
new Tuple2<>(group._1, // the output key is same as input key
StreamSupport.stream(group._2.spliterator(), true) // converting to stream
.reduce(0, (f, s) -> f + s))); // the sum of counts
//collecting the RRD so that we can print
List<Tuple2<String, Integer>> result = resultRDD.collect();
// print each tuple
result.forEach(System.out::println);
}
}
Если вы хотите сделать reduceByKey, где тип в приведенных парах KV отличается от типа исходных пар KV, то можно использовать функцию combineByKey
. То, что делает функция, - это взять пары KV и объединить их (по Key) в пары KC, где C - это другой тип, чем V.
В одном задаются 3 функции, createCombiner, mergeValue, mergeCombiners. Первый указывает, как преобразовать тип V в тип C, второй описывает, как объединить тип C с типом V, а последний указывает, как объединить тип C с другим типом C. Мой код создает пары KV:
Определите 3 функции следующим образом:
def Combiner(a): #Turns value a (a tuple) into a list of a single tuple.
return [a]
def MergeValue(a, b): #a is the new type [(,), (,), ..., (,)] and b is the old type (,)
a.extend([b])
return a
def MergeCombiners(a, b): #a is the new type [(,),...,(,)] and so is b, combine them
a.extend(b)
return a
Затем My_KMV = My_KV.combineByKey(Combiner, MergeValue, MergeCombiners)
Лучший ресурс, который я нашел при использовании этой функции: http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/
Как указывали другие, a.append(b)
или a.extend(b)
return None
. Таким образом, reduceByKey(lambda a, b: a.append(b))
возвращает None в первой паре пар KV, а затем не работает во второй паре, потому что None.append (b) терпит неудачу. Вы можете обойти это, указав отдельную функцию:
def My_Extend(a,b):
a.extend(b)
return a
Затем вызовите reduceByKey(lambda a, b: My_Extend(a,b))
(использование лямбда-функции здесь может быть ненужным, но я не проверял этот случай.)
+
заставляет растущий список копироваться на каждом добавлении, занимая квадратичное время в конечной длине каждого списка.extend()
- правильный ответ - вы завершаете его в функцию, которая возвращает (растущую) левую сторонуlist
. – Davis Herring 16 September 2017 в 02:50