В зависимости от вашей конкретной цели существует способ достижения полезности родительского селектора без использования одного (даже если бы он существовал) ...
Скажем, у нас есть:
<div>
<ul>
<li><a>Pants</a></li>
<li><a>Socks</a></li>
<ul>
<li><a>White socks</a></li>
<li><a>Blue socks</a></li>
</ul>
</ul>
</div>
Что мы можем сделать, чтобы блок Socks (включая цвета носка) выделялся визуально с использованием интервала?
Что было бы неплохо, но не существует:
ul li ul:parent {
margin-top: 15px;
margin-bottom: 15px;
}
Что существует:
li > a {
margin-top: 15px;
display: block;
}
li > a:only-child {
margin-top: 0px;
}
Это устанавливает, что все привязные ссылки имеют верхний край 15px и сбрасывают его обратно на 0 для тех, у которых нет элементов UL (или других тегов) внутри LI.
Вот что я получил с некоторой помощью от RX Forum:
Идея состоит в том, чтобы выпустить серию «билетов» для запуска исходной последовательности. Эти «билеты» задерживаются на время ожидания, за исключением самого первого, который сразу добавляется к последовательности билетов. Когда приходит событие и есть ожидающий тикет, событие запускается немедленно, в противном случае оно дожидается тикета, а затем запускается. Когда он срабатывает, выдается следующий билет и так далее ...
Чтобы объединить билеты и оригинальные мероприятия, нам понадобится комбинатор. К сожалению, "стандартный" .CombineLatest нельзя использовать здесь, потому что он срабатывает на тикетах и событиях, которые использовались ранее. Поэтому мне пришлось создать свой собственный комбинатор, который в основном представляет собой отфильтрованный .CombineLatest, который срабатывает только тогда, когда оба элемента в комбинации «свежие» - никогда не возвращались раньше. Я называю это .CombineVeryLatest или .BrokenZip;)
Используя .CombineVeryLatest, вышеупомянутая идея может быть реализована как таковая:
public static IObservable<T> SampleResponsive<T>(
this IObservable<T> source, TimeSpan delay)
{
return source.Publish(src =>
{
var fire = new Subject<T>();
var whenCanFire = fire
.Select(u => new Unit())
.Delay(delay)
.StartWith(new Unit());
var subscription = src
.CombineVeryLatest(whenCanFire, (x, flag) => x)
.Subscribe(fire);
return fire.Finally(subscription.Dispose);
});
}
public static IObservable<TResult> CombineVeryLatest
<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
{
var ls = leftSource.Select(x => new Used<TLeft>(x));
var rs = rightSource.Select(x => new Used<TRight>(x));
var cmb = ls.CombineLatest(rs, (x, y) => new { x, y });
var fltCmb = cmb
.Where(a => !(a.x.IsUsed || a.y.IsUsed))
.Do(a => { a.x.IsUsed = true; a.y.IsUsed = true; });
return fltCmb.Select(a => selector(a.x.Value, a.y.Value));
}
private class Used<T>
{
internal T Value { get; private set; }
internal bool IsUsed { get; set; }
internal Used(T value)
{
Value = value;
}
}
Изменить: вот еще один более компактный вариант CombineVeryLatest, предложенный Андреасом Кёпфом на форуме:
public static IObservable<TResult> CombineVeryLatest
<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
{
return Observable.Defer(() =>
{
int l = -1, r = -1;
return Observable.CombineLatest(
leftSource.Select(Tuple.Create<TLeft, int>),
rightSource.Select(Tuple.Create<TRight, int>),
(x, y) => new { x, y })
.Where(t => t.x.Item2 != l && t.y.Item2 != r)
.Do(t => { l = t.x.Item2; r = t.y.Item2; })
.Select(t => selector(t.x.Item1, t.y.Item1));
});
}
Хорошо,
у вас есть 3 сценария:
1) Я хотел бы получать одно значение потока событий каждую секунду. Это означает: если он производит больше событий в секунду, вы получите всегда больший буфер.
observableStream.Throttle(timeSpan)
2) Я хотел бы получить последнее событие, которое было произведено до того, как произойдет второе. означает: другие события отбрасываются.
observableStream.Sample(TimeSpan.FromSeconds(1))
3) вы хотите получить все события, которые произошли в последнюю секунду. и так каждую секунду
observableStream.BufferWithTime(timeSpan)
4) вы хотите выбрать то, что происходит между секундами со всеми значениями, пока не пройдет секунда, и ваш результат будет возвращен
observableStream.CombineLatest(Observable.Interval(1000), selectorOnEachEvent)
Это то, что я опубликовал в качестве ответа на этот вопрос на форуме Rx :
ОБНОВЛЕНИЕ : Вот новая версия, которая больше не задерживает пересылку событий, когда события происходят с разницей во времени более одной секунды:
public static IObservable<T> ThrottleResponsive3<T>(this IObservable<T> source, TimeSpan minInterval)
{
return Observable.CreateWithDisposable<T>(o =>
{
object gate = new Object();
Notification<T> last = null, lastNonTerminal = null;
DateTime referenceTime = DateTime.UtcNow - minInterval;
var delayedReplay = new MutableDisposable();
return new CompositeDisposable(source.Materialize().Subscribe(x =>
{
lock (gate)
{
var elapsed = DateTime.UtcNow - referenceTime;
if (elapsed >= minInterval && delayedReplay.Disposable == null)
{
referenceTime = DateTime.UtcNow;
x.Accept(o);
}
else
{
if (x.Kind == NotificationKind.OnNext)
lastNonTerminal = x;
last = x;
if (delayedReplay.Disposable == null)
{
delayedReplay.Disposable = Scheduler.ThreadPool.Schedule(() =>
{
lock (gate)
{
referenceTime = DateTime.UtcNow;
if (lastNonTerminal != null && lastNonTerminal != last)
lastNonTerminal.Accept(o);
last.Accept(o);
last = lastNonTerminal = null;
delayedReplay.Disposable = null;
}
}, minInterval - elapsed);
}
}
}
}), delayedReplay);
});
}
Это была моя предыдущая попытка:
var source = Observable.GenerateWithTime(1,
x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
.Timestamp();
source.Publish(o =>
o.Take(1).Merge(o.Skip(1).Sample(TimeSpan.FromSeconds(1)))
).Run(x => Console.WriteLine(x));
Пробовали ли вы метод расширения Throttle
?
Из документации:
Игнорирует значения из наблюдаемой последовательности, за которыми следует другое значение до dueTime
Мне не совсем ясно, будет ли это делать то, что вы хотите, или нет - в том смысле, что вы хотите игнорировать следующие значения, а не первое... но я бы ожидал, что это будет то, что вы хотите. Попробуйте :)
EDIT: Хммм... нет, я не думаю, что Throttle
это то, что нужно, в конце концов. Мне кажется, я понимаю, что вы хотите сделать, но я не вижу ничего в фреймворке для этого. Хотя, вполне возможно, я что-то упустил. Вы спрашивали на форуме Rx? Вполне возможно, что если этого нет сейчас, то они с радостью добавят это :)
Я подозреваю, что вы можете сделать это хитро с SkipUntil
и SelectMany
каким-то образом... но я думаю, что это должно быть в собственном методе.
Хорошо, вот одно решение. Мне оно не очень нравится, но... ладно.
Спасибо Джону за то, что указал мне на SkipWhile, и cRichter за BufferWithTime. Спасибо, ребята.
static void Main(string[] args)
{
Console.WriteLine("Running...");
var generator = Observable
.GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
.Timestamp();
var bufferedAtOneSec = generator.BufferWithTime(TimeSpan.FromSeconds(1));
var action = new Action<Timestamped<int>>(
feed => Console.WriteLine("Observed {0:000}, generated at {1}, observed at {2}",
feed.Value,
feed.Timestamp.ToString("mm:ss.fff"),
DateTime.Now.ToString("mm:ss.fff")));
var reactImmediately = true;
bufferedAtOneSec.Subscribe(list =>
{
if (list.Count == 0)
{
reactImmediately = true;
}
else
{
action(list.Last());
}
});
generator
.SkipWhile(item => reactImmediately == false)
.Subscribe(feed =>
{
if(reactImmediately)
{
reactImmediately = false;
action(feed);
}
});
Console.ReadKey();
}
То, что вы ищете, это CombineLatest.
public static IObservable<TResult> CombineLatest<TLeft, TRight, TResult>(
IObservable<TLeft> leftSource,
IObservable<TRight> rightSource,
Func<TLeft, TRight, TResult> selector
)
который объединяет 2 наблюдаемые таблицы и возвращает все значения, когда селектор (время) имеет значение.
edit: john is right, that is perhaps not the preferred solution