Прежде всего, попробуйте избежать оконных функций, если вы не можете предоставить предложение PARTITION BY
. Он перемещает данные в один раздел, поэтому большую часть времени это просто невозможно.
Что вы можете сделать, это заполнить пробелы на RDD
, используя mapPartitionsWithIndex
. Поскольку вы не представили пример данных или ожидаемого вывода, считайте, что это псевдокод не является настоящей программой Scala:
DataFrame
по дате и преобразовать в RDD
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
val rows: RDD[Row] = df.orderBy($"Date").rdd
def notMissing(row: Row): Boolean = ???
val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows
.mapPartitionsWithIndex{ case (i, iter) =>
Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
.collectAsMap
Map
в трансляцию val toCarryBd = sc.broadcast(toCarry)
def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = {
// If it is the beginning of partition and value is missing
// extract value to fill from toCarryBd.value
// Remember to correct for empty / only missing partitions
// otherwise take last not-null from the current partition
}
val imputed: RDD[Row] = rows
.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter) }
дьявол находится в деталях. Если ваши данные разбиты на разделы, тогда вся проблема может быть решена с помощью groupBy
. Предположим, что вы просто разделяете по столбцу «v» типа T
, а Date
- это целая временная метка:
def fill(iter: List[Row]): List[Row] = {
// Just go row by row and fill with last non-empty value
???
}
val groupedAndSorted = df.rdd
.groupBy(_.getAs[T]("k"))
.mapValues(_.toList.sortBy(_.getAs[Int]("Date")))
val rows: RDD[Row] = groupedAndSorted.mapValues(fill).values.flatMap(identity)
val dfFilled = sqlContext.createDataFrame(rows, df.schema)
Таким образом вы можете одновременно заполнять все столбцы.
Можно ли это сделать с помощью DataFrames вместо преобразования взад и вперед в RDD?
blockquote>Это зависит, хотя вряд ли это будет эффективно. Если максимальный зазор относительно невелик, вы можете сделать что-то вроде этого:
import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.{WindowSpec, Window} import org.apache.spark.sql.Column val maxGap: Int = ??? // Maximum gap between observations val columnsToFill: List[String] = ??? // List of columns to fill val suffix: String = "_" // To disambiguate between original and imputed // Take lag 1 to maxGap and coalesce def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = { // Generate lag values between 1 and maxGap val lags = (1 to maxGap).map(lag(col(c), _)over(w)) // Add current, coalesce and set alias coalesce(col(c) +: lags: _*).alias(s"$c$suffix") } // For each column you want to fill nulls apply makeCoalesce val lags: List[Column] = columnsToFill.map(makeCoalesce(w)(maxGap)("_")) // Finally select val dfImputed = df.select($"*" :: lags: _*)
Его можно легко настроить, чтобы использовать разный максимальный зазор на столбе.
Простейший способ добиться аналогичного результат последней версии Spark заключается в использовании
last
сignoreNulls
:import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window val w = Window.partitionBy($"k").orderBy($"Date") .rowsBetween(Window.unboundedPreceding, -1) df.withColumn("value", coalesce($"value", last($"value", true).over(w)))
. Хотя можно отказаться от предложения
partitionBy
и применить этот метод по всему миру, это будет чрезмерно дорогостоящим с большими наборами данных .
Существует также Моно версия библиотек классов, конечно:
Вы посмотрели наборы C5 ? Вы можете загружать источник , который включает хэш-таблицу.
Вы видите, как Хеш-таблица.NET реализована (например, в C#) использование отражателя
Вы могли также посмотреть на реализацию Хеш-таблицы от Моно здесь:
Долгое время после того, как вопрос был задан, так что я не ожидаю, что заработаю много откликов. Однако я решил, что будет интересно написать свой собственный базовый пример (менее чем в 90 строках кода):
public struct KeyValue<K, V>
{
public K Key { get; set; }
public V Value { get; set; }
}
public class FixedSizeGenericHashTable<K,V>
{
private readonly int size;
private readonly LinkedList<KeyValue<K,V>>[] items;
public FixedSizeGenericHashTable(int size)
{
this.size = size;
items = new LinkedList<KeyValue<K,V>>[size];
}
protected int GetArrayPosition(K key)
{
int position = key.GetHashCode() % size;
return Math.Abs(position);
}
public V Find(K key)
{
int position = GetArrayPosition(key);
LinkedList<KeyValue<K, V>> linkedList = GetLinkedList(position);
foreach (KeyValue<K,V> item in linkedList)
{
if (item.Key.Equals(key))
{
return item.Value;
}
}
return default(V);
}
public void Add(K key, V value)
{
int position = GetArrayPosition(key);
LinkedList<KeyValue<K, V>> linkedList = GetLinkedList(position);
KeyValue<K, V> item = new KeyValue<K, V>() { Key = key, Value = value };
linkedList.AddLast(item);
}
public void Remove(K key)
{
int position = GetArrayPosition(key);
LinkedList<KeyValue<K, V>> linkedList = GetLinkedList(position);
bool itemFound = false;
KeyValue<K, V> foundItem = default(KeyValue<K, V>);
foreach (KeyValue<K,V> item in linkedList)
{
if (item.Key.Equals(key))
{
itemFound = true;
foundItem = item;
}
}
if (itemFound)
{
linkedList.Remove(foundItem);
}
}
protected LinkedList<KeyValue<K, V>> GetLinkedList(int position)
{
LinkedList<KeyValue<K, V>> linkedList = items[position];
if (linkedList == null)
{
linkedList = new LinkedList<KeyValue<K, V>>();
items[position] = linkedList;
}
return linkedList;
}
}
Вот небольшое тестовое приложение:
static void Main(string[] args)
{
FixedSizeGenericHashTable<string, string> hash = new FixedSizeGenericHashTable<string, string>(20);
hash.Add("1", "item 1");
hash.Add("2", "item 2");
hash.Add("dsfdsdsd", "sadsadsadsad");
string one = hash.Find("1");
string two = hash.Find("2");
string dsfdsdsd = hash.Find("dsfdsdsd");
hash.Remove("1");
Console.ReadLine();
}
Это не самая лучшая реализация, но она работает для Add, Remove и Find. Оно использует chaining и простой алгоритм modulo, чтобы найти подходящее ведро.