Я считаю, что у меня гораздо более простое решение, чем принятое. Он также использует функции, но использует функцию, называемую «LAST», и игнорирует нули.
Давайте заново создадим нечто похожее на исходные данные:
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func
d = [{'session': 1, 'ts': 1}, {'session': 1, 'ts': 2, 'id': 109}, {'session': 1, 'ts': 3}, {'session': 1, 'ts': 4, 'id': 110}, {'session': 1, 'ts': 5}, {'session': 1, 'ts': 6}]
df = spark.createDataFrame(d)
Это означает:
+-------+---+----+
|session| ts| id|
+-------+---+----+
| 1| 1|null|
| 1| 2| 109|
| 1| 3|null|
| 1| 4| 110|
| 1| 5|null|
| 1| 6|null|
+-------+---+----+
Теперь, если мы используем оконную функцию LAST:
df.withColumn("id", func.last('id', True).over(Window.partitionBy('session').orderBy('ts').rowsBetween(-sys.maxsize, 0))).show()
Мы просто получаем:
+-------+---+----+
|session| ts| id|
+-------+---+----+
| 1| 1|null|
| 1| 2| 109|
| 1| 3| 109|
| 1| 4| 110|
| 1| 5| 110|
| 1| 6| 110|
+-------+---+----+
Надеемся, что это поможет!