Файловый фрейм PySpark - Замените последовательные значения NaN в столбце с предыдущим допустимым значением [duplicate]

CGI указан в RFC 3875 , хотя это более поздняя «официальная» кодификация исходного документа NCSA . В принципе, CGI определяет протокол для передачи данных о HTTP-запросе с веб-сервера в программу для обработки - любую программу на любом языке. В то время, когда спецификация была написана (1993), большинство веб-серверов содержали только статические страницы, «веб-приложения» были редкой и новой вещью, поэтому казалось естественным отделить их от «нормального» статического контента, например, в cgi-bin, кроме статического содержимого, и заканчивая их .cgi.

В это время здесь также не было выделенных «языков веб-программирования», таких как PHP, а C было доминирующим переносным программированием язык - так много людей написали свои сценарии CGI в C. Но Perl быстро оказался более подходящим для такого рода вещей, и CGI стал почти синонимом Perl на некоторое время. Затем появились Java Servlets, PHP и множество других и заняли большую часть доли рынка Perl.

9
задан Oleksiy 4 April 2016 в 15:35
поделиться

3 ответа

Это похоже на трюк с использованием функций Window :

import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func

def fill_nulls(df):
    df_na = df.na.fill(-1)
    lag = df_na.withColumn('id_lag', func.lag('id', default=-1)\
                           .over(Window.partitionBy('session')\
                                 .orderBy('timestamp')))

    switch = lag.withColumn('id_change',
                            ((lag['id'] != lag['id_lag']) &
                             (lag['id'] != -1)).cast('integer'))


    switch_sess = switch.withColumn(
        'sub_session',
        func.sum("id_change")
        .over(
            Window.partitionBy("session")
            .orderBy("timestamp")
            .rowsBetween(-sys.maxsize, 0))
    )

    fid = switch_sess.withColumn('nn_id',
                           func.first('id')\
                           .over(Window.partitionBy('session', 'sub_session')\
                                 .orderBy('timestamp')))

    fid_na = fid.replace(-1, 'null')

    ff = fid_na.drop('id').drop('id_lag')\
                          .drop('id_change')\
                          .drop('sub_session').\
                          withColumnRenamed('nn_id', 'id')

    return ff

Вот полный null_test.py .

7
ответ дан Oleksiy 19 August 2018 в 08:10
поделиться
  • 1
    @eliasah: можете ли вы просмотреть ответ? – Oleksiy 4 April 2016 в 15:39
  • 2
    Я сейчас смотрю на это. – eliasah 4 April 2016 в 15:39
  • 3
    добавлен тест, чтобы облегчить жизнь, если он помогает – Oleksiy 4 April 2016 в 15:48
  • 4
    Я писал свой тест! Благодарю. Ответ кажется мне очень чистым. Очень важно иметь сеансы, которые позволили разделить и использовать функцию окна! – eliasah 4 April 2016 в 15:57
  • 5
    Спасибо за обзор кода! – Oleksiy 4 April 2016 в 16:00
Ответ

@Oleksiy велик, но не полностью работает для моих требований. В течение сеанса, если наблюдаются несколько null s, все заполняются первым не null для сеанса. Мне понадобилось значение last non null для распространения вперед.

Следующая настройка была использована для моего использования:

def fill_forward(df, id_column, key_column, fill_column):

    # Fill null's with last *non null* value in the window
    ff = df.withColumn(
        'fill_fwd',
        func.last(fill_column, True) # True: fill with last non-null
        .over(
            Window.partitionBy(id_column)
            .orderBy(key_column)
            .rowsBetween(-sys.maxsize, 0))
        )

    # Drop the old column and rename the new column
    ff_out = ff.drop(fill_column).withColumnRenamed('fill_fwd', fill_column)

    return ff_out
0
ответ дан brett 19 August 2018 в 08:10
поделиться

Я считаю, что у меня гораздо более простое решение, чем принятое. Он также использует функции, но использует функцию, называемую «LAST», и игнорирует нули.

Давайте заново создадим нечто похожее на исходные данные:

import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func

d = [{'session': 1, 'ts': 1}, {'session': 1, 'ts': 2, 'id': 109}, {'session': 1, 'ts': 3}, {'session': 1, 'ts': 4, 'id': 110}, {'session': 1, 'ts': 5},  {'session': 1, 'ts': 6}]
df = spark.createDataFrame(d)

Это означает:

+-------+---+----+
|session| ts|  id|
+-------+---+----+
|      1|  1|null|
|      1|  2| 109|
|      1|  3|null|
|      1|  4| 110|
|      1|  5|null|
|      1|  6|null|
+-------+---+----+

Теперь, если мы используем оконную функцию LAST:

df.withColumn("id", func.last('id', True).over(Window.partitionBy('session').orderBy('ts').rowsBetween(-sys.maxsize, 0))).show()

Мы просто получаем:

+-------+---+----+
|session| ts|  id|
+-------+---+----+
|      1|  1|null|
|      1|  2| 109|
|      1|  3| 109|
|      1|  4| 110|
|      1|  5| 110|
|      1|  6| 110|
+-------+---+----+

Надеемся, что это поможет!

3
ответ дан elmosca 19 August 2018 в 08:10
поделиться
  • 1
    Предупреждение: этот ответ будет собирать все строки каждой сессии на какой-то узел-исполнитель. Это приведет к неудачным заданиям, если количество строк в некотором сеансе больше, чем память ваших узлов-исполнителей. – Jordan P 19 September 2017 в 22:39