Замена groupBykey () с помощью reduceByKey () [duplicate]

Хотя ответ Чанга объясняет, как много раз рисовать на одном рисунке, в этом случае вам может быть лучше в этом случае с помощью groupby и unstack ing:

(Предположим, что у вас есть это в dataframe, с индексом datetime уже)

In [1]: df
Out[1]:
            value  
datetime                         
2010-01-01      1  
2010-02-01      1  
2009-01-01      1  

# create additional month and year columns for convenience
df['Month'] = map(lambda x: x.month, df.index)
df['Year'] = map(lambda x: x.year, df.index)    

In [5]: df.groupby(['Month','Year']).mean().unstack()
Out[5]:
       value      
Year    2009  2010
Month             
1          1     1
2        NaN     1

Теперь легко построить (каждый год как отдельная строка):

df.groupby(['Month','Year']).mean().unstack().plot()

37
задан Community 17 August 2017 в 10:00
поделиться

9 ответов

tl; dr Если вам действительно нужна операция, используйте groupByKey , как предложено , по @MariusIon . Каждое предлагаемое здесь решение является либо неэффективным, либо, по меньшей мере, субоптимальным по сравнению с прямой группировкой.

reduceByKey с конкатенацией списка не является приемлемым решением, потому что:

  • Требуется инициализация из списков O (N) .
  • Каждое приложение + для пары списков требует полной копии обоих списков ( O (N) ), эффективно увеличивая общую сложность до O (N2) .
  • Не затрагивает ни одну из проблем, введенных в groupByKey. Количество данных, которые необходимо перетасовать, а также размер конечной структуры одинаковы.
  • В отличие от , предложенного одним из ответов , нет разницы в уровне параллелизм между реализацией с использованием reduceByKey и groupByKey.

combineByKey с list.extend является субоптимальным решением, потому что:

  • Создает O (N) в MergeValue (это можно оптимизировать, используя list.append непосредственно в новом элементе).
  • Если оптимизировано с помощью list.append, оно в точности эквивалентно (Spark & ​​lt; = 1.3) реализации groupByKey и игнорирует все оптимизации, введенные SPARK-3074, которые позволяют группировать внешние (на диске) структуры с большими размерами.
41
ответ дан Community 15 August 2018 в 20:34
поделиться
  • 1
    Является ли ReduceByKey в этом случае быстрее, чем GroupByKey? Он дает тот же результат, и что лучше? Есть ли способ удалить дубликаты из окончательного списка, созданного компанией ReduceByKey? – syfantid 18 December 2015 в 10:42
  • 2
    @Sofia: Как сказано, GroupByKey уменьшает распараллеливание, но если вы работаете с небольшими наборами данных, это может и не быть проблемой. Только тест производительности может дать вам конкретный ответ. Удаление дублирующихся значений не встроено при использовании ReduceByKey , но вы можете легко добавить еще один шаг, который делает это, или создать свой собственный Create метод, который позаботится об этом. – Christian Strempfer 4 January 2016 в 08:51
  • 3
    К сожалению, я имел в виду & quot; вы можете создать свои собственные методы Combine & quot ;. – Christian Strempfer 7 February 2016 в 08:15
  • 4
    Использование + заставляет растущий список копироваться на каждом добавлении, занимая квадратичное время в конечной длине каждого списка. extend() - правильный ответ - вы завершаете его в функцию, которая возвращает (растущую) левую сторону list. – Davis Herring 16 September 2017 в 02:50
41
ответ дан Community 5 September 2018 в 19:55
поделиться

Я немного опаздываю на разговор, но вот мое предложение:

>>> foo = sc.parallelize([(1, ('a','b')), (2, ('c','d')), (1, ('x','y'))])
>>> foo.map(lambda (x,y): (x, [y])).reduceByKey(lambda p,q: p+q).collect()
[(1, [('a', 'b'), ('x', 'y')]), (2, [('c', 'd')])]
12
ответ дан alreich 15 August 2018 в 20:34
поделиться
  • 1
    Привет, вы также можете помочь с equivalent Java code для этого. Я хочу добиться аналогичной вещи в Java – Mj1992 21 January 2017 в 18:19

Сообщение об ошибке связано с типом «a» в вашем закрытии.

 My_KMV = My_KV.reduce(lambda a, b: a.append([b]))

Пусть pySpark явно оценивает a как список. Например,

My_KMV = My_KV.reduceByKey(lambda a,b:[a].extend([b]))

Во многих случаях reduceByKey будет предпочтительнее groupByKey, обратитесь к: http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/ best_practices / prefer_reducebykey_over_groupbykey.html

1
ответ дан Brian Tompsett - 汤莱恩 15 August 2018 в 20:34
поделиться

Хорошо. Надеюсь, у меня все получилось. Ваш вход выглядит примерно так:

kv_input = [("a", 1), ("a", 2), ("a", 3), ("b", 1), ("b", 5)]

, и вы хотите получить что-то вроде этого:

kmv_output = [("a", [1, 2, 3]), ("b", [1, 5])]

Тогда это может выполнить задание (см. здесь ):

d = dict()
for k, v in kv_input:
    d.setdefault(k, list()).append(v)
kmv_output = list(d.items())

Если я ошибаюсь, скажите, пожалуйста, чтобы я мог настроить это на ваши нужды.

PS: a.append([b]) всегда возвращается None , Возможно, вы захотите наблюдать либо [b], либо a, но не результат append.

1
ответ дан Dave J 15 August 2018 в 20:34
поделиться
  • 1
    Таким образом, у вас есть правильная идея для того, что у меня есть, с точки зрения kv_input и того, что я хочу, kmv_output. Я считаю, что ваш код будет работать для серийного питона, но поскольку я использую Spark для параллельной работы, у моего kv_input есть тип RDD (Resilient Distributed Data) ... который не является итерируемым (поэтому я не могу делать что-то вроде k , v в kv_input). – TravisJ 18 November 2014 в 22:48
  • 2
    ааа. ОК. моя вина, не знаю искры. Я даю ответ здесь тем, кто этого не знает / замечает. как я: P – Dave J 18 November 2014 в 22:52
  • 3
    Не беспокойся. Я совершенно новичок в этом, и я ценю, что вы нашли время, чтобы продемонстрировать это решение. – TravisJ 18 November 2014 в 23:00
  • 4
    P.S. очень полезно. Я сделал быстрое изменение retList = a.append ([b]), а затем возвратил retList, и это исправляет первую проблему, но у меня есть новая небольшая проблема, которую я должен исправить (код генерирует список, содержащий оба кортежа и списки). – TravisJ 18 November 2014 в 23:29
  • 5
    @TravisJ: Вам нужно использовать extend вместо append, как и в моем ответе. См. Также Python - append vs. extend . – Christian Strempfer 19 November 2014 в 08:30

Я попытался с помощью combByKey, вот мои шаги

combineddatardd=sc.parallelize([("A", 3), ("A", 9), ("A", 12),("B", 4), ("B", 10), ("B", 11)])

combineddatardd.combineByKey(lambda v:[v],lambda x,y:x+[y],lambda x,y:x+y).collect()

Выход:

[('A', [3, 9, 12]), ('B', [4, 10, 11])]
  1. Определить функцию для объединителя, которая устанавливает аккумулятор в первую пару значений ключа который он встречает внутри раздела, преобразует значение, указанное на этом шаге
  2. Определяет функцию, которая слияния нового значения того же ключа с значением аккумулятора, захваченным на шаге 1. Примечание: -конвертировать значение для списка в эта функция как значение аккумулятора была преобразована в список на первом этапе
  3. Определить функцию для объединения выходов комбинированных отдельных разделов.
1
ответ дан krishna rachur 15 August 2018 в 20:34
поделиться

Вы можете использовать метод RDD groupByKey .

Вход:

data = [(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (2, 'e'), (3, 'f')]
rdd = sc.parallelize(data)
result = rdd.groupByKey().collect()

Выход:

[(1, ['a', 'b']), (2, ['c', 'd', 'e']), (3, ['f'])]
11
ответ дан Marius Ion 15 August 2018 в 20:34
поделиться
  • 1
    Использование groupByKey не рекомендуется, потому что это приводит к чрезмерному перетасовке. Вы должны использовать reduceByKey ( см. Эту ссылку ) или combineByKey вместо этого, как было предложено @Christian_Strempfer – nikosd 26 June 2015 в 05:11

Я ударил эту страницу, ища пример Java для той же проблемы. (Если ваш случай похож, вот мой пример)

Трюк - вам нужно группировать ключи.

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class SparkMRExample {

    public static void main(String[] args) {
        // spark context initialisation
        SparkConf conf = new SparkConf()
                .setAppName("WordCount")
                .setMaster("local");
        JavaSparkContext context = new JavaSparkContext(conf);

        //input for testing;
        List<String> input = Arrays.asList("Lorem Ipsum is simply dummy text of the printing and typesetting industry.",
                "Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, when an unknown printer took a galley of type and scrambled it to make a type specimen book.",
                "It has survived not only for centuries, but also the leap into electronic typesetting, remaining essentially unchanged.",
                "It was popularised in the 1960s with the release of Letraset sheets containing Lorem Ipsum passages, and more recently with desktop publishing");
        JavaRDD<String> inputRDD = context.parallelize(input);


        // the map phase of word count example
        JavaPairRDD<String, Integer> mappedRDD =
                inputRDD.flatMapToPair( line ->                      // for this input, each string is a line
                        Arrays.stream(line.split("\\s+"))            // splitting into words, converting into stream
                                .map(word -> new Tuple2<>(word, 1))  // each word is assigned with count 1
                                .collect(Collectors.toList()));      // stream to iterable

        // group the tuples by key
        // (String,Integer) -> (String, Iterable<Integer>)
        JavaPairRDD<String, Iterable<Integer>> groupedRDD = mappedRDD.groupByKey();

        // the reduce phase of word count example
        //(String, Iterable<Integer>) -> (String,Integer)
        JavaRDD<Tuple2<String, Integer>> resultRDD =
                groupedRDD.map(group ->                                      //input is a tuple (String, Iterable<Integer>)
                        new Tuple2<>(group._1,                              // the output key is same as input key
                        StreamSupport.stream(group._2.spliterator(), true)  // converting to stream
                                .reduce(0, (f, s) -> f + s)));              // the sum of counts
        //collecting the RRD so that we can print
        List<Tuple2<String, Integer>> result = resultRDD.collect();
        // print each tuple
        result.forEach(System.out::println);
    }
}
0
ответ дан Thamme Gowda 15 August 2018 в 20:34
поделиться

Если вы хотите сделать reduceByKey, где тип в приведенных парах KV отличается от типа исходных пар KV, то можно использовать функцию combineByKey. То, что делает функция, - это взять пары KV и объединить их (по Key) в пары KC, где C - это другой тип, чем V.

В одном задаются 3 функции, createCombiner, mergeValue, mergeCombiners. Первый указывает, как преобразовать тип V в тип C, второй описывает, как объединить тип C с типом V, а последний указывает, как объединить тип C с другим типом C. Мой код создает пары KV:

Определите 3 функции следующим образом:

def Combiner(a):    #Turns value a (a tuple) into a list of a single tuple.
    return [a]

def MergeValue(a, b): #a is the new type [(,), (,), ..., (,)] and b is the old type (,)
    a.extend([b])
    return a

def MergeCombiners(a, b): #a is the new type [(,),...,(,)] and so is b, combine them
    a.extend(b)
    return a

Затем My_KMV = My_KV.combineByKey(Combiner, MergeValue, MergeCombiners)

Лучший ресурс, который я нашел при использовании этой функции: http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/

Как указывали другие, a.append(b) или a.extend(b) return None. Таким образом, reduceByKey(lambda a, b: a.append(b)) возвращает None в первой паре пар KV, а затем не работает во второй паре, потому что None.append (b) терпит неудачу. Вы можете обойти это, указав отдельную функцию:

 def My_Extend(a,b):
      a.extend(b)
      return a

Затем вызовите reduceByKey(lambda a, b: My_Extend(a,b)) (использование лямбда-функции здесь может быть ненужным, но я не проверял этот случай.)

3
ответ дан TravisJ 15 August 2018 в 20:34
поделиться
Другие вопросы по тегам:

Похожие вопросы: