Прежде всего, попробуйте избежать оконных функций, если вы не можете предоставить предложение PARTITION BY
. Он перемещает данные в один раздел, поэтому большую часть времени это просто невозможно.
Что вы можете сделать, это заполнить пробелы на RDD
, используя mapPartitionsWithIndex
. Поскольку вы не представили пример данных или ожидаемого вывода, считайте, что это псевдокод не является настоящей программой Scala:
DataFrame
по дате и преобразовать в RDD
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
val rows: RDD[Row] = df.orderBy($"Date").rdd
def notMissing(row: Row): Boolean = ???
val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows
.mapPartitionsWithIndex{ case (i, iter) =>
Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
.collectAsMap
Map
в трансляцию val toCarryBd = sc.broadcast(toCarry)
def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = {
// If it is the beginning of partition and value is missing
// extract value to fill from toCarryBd.value
// Remember to correct for empty / only missing partitions
// otherwise take last not-null from the current partition
}
val imputed: RDD[Row] = rows
.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter) }
дьявол находится в деталях. Если ваши данные разбиты на разделы, тогда вся проблема может быть решена с помощью groupBy
. Предположим, что вы просто разделяете по столбцу «v» типа T
, а Date
- это целая временная метка:
def fill(iter: List[Row]): List[Row] = {
// Just go row by row and fill with last non-empty value
???
}
val groupedAndSorted = df.rdd
.groupBy(_.getAs[T]("k"))
.mapValues(_.toList.sortBy(_.getAs[Int]("Date")))
val rows: RDD[Row] = groupedAndSorted.mapValues(fill).values.flatMap(identity)
val dfFilled = sqlContext.createDataFrame(rows, df.schema)
Таким образом вы можете одновременно заполнять все столбцы.
Можно ли это сделать с помощью DataFrames вместо преобразования взад и вперед в RDD?
blockquote>Это зависит, хотя вряд ли это будет эффективно. Если максимальный зазор относительно невелик, вы можете сделать что-то вроде этого:
import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.{WindowSpec, Window} import org.apache.spark.sql.Column val maxGap: Int = ??? // Maximum gap between observations val columnsToFill: List[String] = ??? // List of columns to fill val suffix: String = "_" // To disambiguate between original and imputed // Take lag 1 to maxGap and coalesce def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = { // Generate lag values between 1 and maxGap val lags = (1 to maxGap).map(lag(col(c), _)over(w)) // Add current, coalesce and set alias coalesce(col(c) +: lags: _*).alias(s"$c$suffix") } // For each column you want to fill nulls apply makeCoalesce val lags: List[Column] = columnsToFill.map(makeCoalesce(w)(maxGap)("_")) // Finally select val dfImputed = df.select($"*" :: lags: _*)
Его можно легко настроить, чтобы использовать разный максимальный зазор на столбе.
Простейший способ добиться аналогичного результат последней версии Spark заключается в использовании
last
сignoreNulls
:import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window val w = Window.partitionBy($"k").orderBy($"Date") .rowsBetween(Window.unboundedPreceding, -1) df.withColumn("value", coalesce($"value", last($"value", true).over(w)))
. Хотя можно отказаться от предложения
partitionBy
и применить этот метод по всему миру, это будет чрезмерно дорогостоящим с большими наборами данных .
От Википедия :
Одна интересная причуда MFC является использованием "Afx" как префикс для многих функций, макросы и стандарт предварительно скомпилировали название заголовка "stdafx.h". Во время ранней разработки, что стало MFC, был назван "Расширениями Среды разработки приложения" и сокращен "Afx". Microsoft Foundation Classes (MFC) имени была принята слишком поздно в цикле выпуска для изменения этих ссылок.
старый скат MFC FAQ (больше не найденный на их веб-сайте, ссылка к древней версии):
В начале, Microsoft создала группу, названную группой
AFX (обозначает (A) складчатость (F) ramework (X)).
[...]
группа AFX была на самом деле ответственна за две вещи: библиотека MFC и поддержка IDE MFC (а именно, редактор ресурса и мастера). Имя AFX было отброшено в апреле 1994, и члены группы просто стали частью меньших команд в группе Visual C++. Одна из тех меньших команд является сегодняшней командой MFC.
http://en.wikipedia.org/wiki/Stdafx.h
"AFX в stdafx.h обозначает расширения Среды разработки приложения. AFX был исходным сокращением для Microsoft Foundation Classes (MFC)".