Давайте предположим, что у нас есть большой список точек List<Point> pointList
(уже сохраненный в памяти), где каждый Point
содержит X, Y, и координата Z.
Теперь, я хотел бы выбрать, например, N % точек с самыми большими Z-значениями всех точек, сохраненных в pointList
. Прямо сейчас я делаю его как этот:
N = 0.05; // selecting only 5% of points
double cutoffValue = pointList
.OrderBy(p=> p.Z) // First bottleneck - creates sorted copy of all data
.ElementAt((int) pointList.Count * (1 - N)).Z;
List<Point> selectedPoints = pointList.Where(p => p.Z >= cutoffValue).ToList();
Но у меня есть здесь два узких места использования памяти: сначала во время OrderBy (более важного) и второго во время выбора точек (это менее важно, потому что мы обычно хотим выбрать только небольшое количество точек).
Есть ли какой-либо способ заменить OrderBy (или возможно другой способ найти этот предел) с чем-то, что использует меньше памяти?
Проблема довольно важна, потому что LINQ копирует целый набор данных, и для больших файлов я обрабатываю его, иногда поражает немного сотен MBS.
Вы можете отсортировать список на месте, используя List
, который использует алгоритм быстрой сортировки. Но, конечно, ваш исходный список будет отсортирован, что, возможно, не то, что вам нужно ...
pointList.Sort((a, b) => b.Z.CompareTo(a.Z));
var selectedPoints = pointList.Take((int)(pointList.Count * N)).ToList();
Если вы не возражаете против сортировки исходного списка, это, вероятно, лучший баланс между использованием памяти и скоростью
Напишите метод, который итерирует список один раз и сохраняет набор из M самых больших элементов. Каждый шаг потребует только O(log M) работы для поддержания набора, и вы можете иметь O(M) памяти и O(N log M) времени работы.
public static IEnumerable<TSource> TakeLargest<TSource, TKey>
(this IEnumerable<TSource> items, Func<TSource, TKey> selector, int count)
{
var set = new SortedDictionary<TKey, List<TSource>>();
var resultCount = 0;
var first = default(KeyValuePair<TKey, List<TSource>>);
foreach (var item in items)
{
// If the key is already smaller than the smallest
// item in the set, we can ignore this item
var key = selector(item);
if (first.Value == null ||
resultCount < count ||
Comparer<TKey>.Default.Compare(key, first.Key) >= 0)
{
// Add next item to set
if (!set.ContainsKey(key))
{
set[key] = new List<TSource>();
}
set[key].Add(item);
if (first.Value == null)
{
first = set.First();
}
// Remove smallest item from set
resultCount++;
if (resultCount - first.Value.Count >= count)
{
set.Remove(first.Key);
resultCount -= first.Value.Count;
first = set.First();
}
}
}
return set.Values.SelectMany(values => values);
}
Это будет включать больше, чем count
элементов, если есть связи, как это делает ваша реализация сейчас.
Если ваш список уже находится в памяти, я бы отсортировал его на месте вместо того, чтобы делать копию - если только он вам не понадобится снова несортированный, то есть в этом случае вам придется взвесить наличие двух копий в памяти и загрузка его снова из хранилища):
pointList.Sort((x,y) => y.Z.CompareTo(x.Z)); //this should sort it in desc. order
Также не уверен, насколько это поможет, но похоже, что вы просматриваете свой список дважды - один раз, чтобы найти значение отсечки, и еще раз, чтобы выбрать их. Я предполагаю, что вы делаете это, потому что хотите разрешить все связи, даже если это означает выбор более 5% точек. Однако, поскольку они уже отсортированы, вы можете использовать это в своих интересах и остановиться, когда закончите.
double cutoffValue = pointlist[(int) pointList.Length * (1 - N)].Z;
List<point> selectedPoints = pointlist.TakeWhile(p => p.Z >= cutoffValue)
.ToList();
Вы можете использовать что-то вроде этого:
pointList.Sort(); // Use you own compare here if needed
// Skip OrderBy because the list is sorted (and not copied)
double cutoffValue = pointList.ElementAt((int) pointList.Length * (1 - N)).Z;
// Skip ToList to avoid another copy of the list
IEnumerable<Point> selectedPoints = pointList.Where(p => p.Z >= cutoffValue);
Если только ваш список не чрезвычайно большой, мне кажется гораздо более вероятным, что узким местом вашей производительности является процессорное время. Да, ваш OrderBy()
может использовать много памяти, но это память, которая по большей части простаивает. Время вычислительной машины действительно является более серьезной проблемой.
Чтобы улучшить время работы процессора, наиболее очевидная вещь здесь - не использовать список. Вместо него используйте IEnumerable. Для этого нужно просто не вызывать .ToList()
в конце запроса where. Это позволит фреймворку объединить все в одну итерацию списка, которая выполняется только по мере необходимости. Это также улучшит использование памяти, поскольку позволит избежать загрузки всего запроса в память сразу, а вместо этого отложить его, чтобы загружать только один элемент за раз по мере необходимости. Также используйте .Take()
, а не .ElementAt()
. Это намного эффективнее.
double N = 0.05; // selecting only 5% of points
int count = (1-N) * pointList.Count;
var selectedPoints = pointList.OrderBy(p=>p.Z).Take(count);
На этом закончим, есть три случая, когда использование памяти действительно может быть проблемой:
Обновление:
Перечитав ваш вопрос еще раз, я вижу, что вы читаете очень большие файлы. В таком случае, наилучшую производительность можно получить, написав собственный код для разбора файлов. Если счетчик элементов хранится в верхней части файла, вы можете сделать намного лучше, или даже если вы можете оценить количество записей на основе размера файла (угадайте немного больше, чтобы быть уверенным, и затем усеките все лишнее после завершения), вы можете затем построить вашу окончательную коллекцию по мере чтения. Это значительно улучшит производительность процессора и использование памяти.
Если вам нужен небольшой процент точек, упорядоченных по какому-либо критерию, вам лучше использовать структуру данных Приоритетная очередь; создайте очередь с ограниченным размером (с размером, установленным на любое количество элементов), а затем просто сканируйте список, вставляя каждый элемент. После сканирования вы можете извлечь результаты в отсортированном порядке.
Преимущество этого способа в том, что он O(n log p)
вместо O(n log n)
, где p
- количество нужных вам точек, а дополнительные затраты на хранение также зависят от размера выходных данных, а не от всего списка.
Вы написали, что работаете с DataSet. Если это так, то вы можете использовать DataView для сортировки данных один раз и использовать их для всех последующих обращений к строкам.
Только что попробовал с 50,000 строк и 100-кратным доступом к 30% из них. Мои результаты производительности:
Попробуйте.
[TestClass]
public class UnitTest1 {
class MyTable : TypedTableBase<MyRow> {
public MyTable() {
Columns.Add("Col1", typeof(int));
Columns.Add("Col2", typeof(int));
}
protected override DataRow NewRowFromBuilder(DataRowBuilder builder) {
return new MyRow(builder);
}
}
class MyRow : DataRow {
public MyRow(DataRowBuilder builder) : base(builder) {
}
public int Col1 { get { return (int)this["Col1"]; } }
public int Col2 { get { return (int)this["Col2"]; } }
}
DataView _viewCol1Asc;
DataView _viewCol2Desc;
MyTable _table;
int _countToTake;
[TestMethod]
public void MyTestMethod() {
_table = new MyTable();
int count = 50000;
for (int i = 0; i < count; i++) {
_table.Rows.Add(i, i);
}
_countToTake = _table.Rows.Count / 30;
Console.WriteLine("SortWithLinq");
RunTest(SortWithLinq);
Console.WriteLine("Use DataViews");
RunTest(UseSoredDataViews);
}
private void RunTest(Action method) {
int iterations = 100;
Stopwatch watch = Stopwatch.StartNew();
for (int i = 0; i < iterations; i++) {
method();
}
watch.Stop();
Console.WriteLine(" {0}", watch.Elapsed);
}
private void UseSoredDataViews() {
if (_viewCol1Asc == null) {
_viewCol1Asc = new DataView(_table, null, "Col1 ASC", DataViewRowState.Unchanged);
_viewCol2Desc = new DataView(_table, null, "Col2 DESC", DataViewRowState.Unchanged);
}
var rows = _viewCol1Asc.Cast<DataRowView>().Take(_countToTake).Select(vr => (MyRow)vr.Row);
IterateRows(rows);
rows = _viewCol2Desc.Cast<DataRowView>().Take(_countToTake).Select(vr => (MyRow)vr.Row);
IterateRows(rows);
}
private void SortWithLinq() {
var rows = _table.OrderBy(row => row.Col1).Take(_countToTake);
IterateRows(rows);
rows = _table.OrderByDescending(row => row.Col2).Take(_countToTake);
IterateRows(rows);
}
private void IterateRows(IEnumerable<MyRow> rows) {
foreach (var row in rows)
if (row == null)
throw new Exception("????");
}
}
Вы можете использовать Indexed LINQ, чтобы поставить индекс на данные, которые вы обрабатываете. В некоторых случаях это может привести к заметным улучшениям.
Если вы объедините два, есть шанс, что будет сделано немного меньше работы:
List<Point> selectedPoints = pointList
.OrderByDescending(p=> p.Z) // First bottleneck - creates sorted copy of all data
.Take((int) pointList.Count * N);
Но в основном этот вид ранжирования требует сортировки, ваших самых больших затрат.
Еще несколько идей:
int resultSize = pointList.Count * (1-N);
FixedSizedPriorityQueue<Point> q =
new FixedSizedPriorityQueue<Point>(resultSize, p => p.Z);
q.AddEach(pointList);
List<Point> selectedPoints = q.ToList();
Теперь все, что вам нужно сделать, это реализовать FixedSizedPriorityQueue, которая добавляет элементы по одному за раз и отбрасывает самый большой элемент, когда она заполняется.
Я бы сделал это, реализовав "половину" квиксорта.
Рассмотрим ваш исходный набор точек, P, где вы ищете "верхние" N элементов по координате Z.
Выберите точку x в P.
Разделите P на L = {y в P | y < x} и U = {y в P | x <= y}.
Если N = |U|, то вы закончили.
Если N < |U|, то перебор с P := U.
В противном случае вам нужно добавить некоторые элементы в U: перебор с N := N - |U|, P := L для добавления оставшихся элементов.
Если вы грамотно выберете точку поворота (например, медиану, скажем, пяти случайных выборок), то это будет выполняться за время O(n log n).
Хмммм, подумав еще немного, вы можете вообще избежать создания новых наборов, поскольку, по сути, вы просто ищете O(n log n) способ найти N-ый наибольший элемент из исходного набора. Да, я думаю, это сработает, так что вот предложение номер 2:
Сделайте обход P, найдя наименьший и наибольший элементы, A и Z, соответственно.
Пусть M будет средним значением A и Z (помните, что здесь мы рассматриваем только координаты Z).
Подсчитаем, сколько элементов находится в диапазоне [M, Z], назовем это Q.
Если Q < N, то N-й наибольший элемент в P находится где-то в [A, M]. Пусть M := (A + M)/2.
Если N < Q, то N-й наибольший элемент в P находится где-то в [M, Z]. Попробуйте M := (M + Z)/2.
Повторяем, пока не найдем M такой, что Q = N.
Теперь обходим P, удаляя все элементы больше или равные M.
Это точно O(n log n) и не создает никаких дополнительных структур данных (кроме результата). Как вам?