Как регулировать поток событий с помощью RX?

В зависимости от вашей конкретной цели существует способ достижения полезности родительского селектора без использования одного (даже если бы он существовал) ...

Скажем, у нас есть:

<div>
  <ul>
    <li><a>Pants</a></li>
    <li><a>Socks</a></li>
    <ul>
      <li><a>White socks</a></li>
      <li><a>Blue socks</a></li>
    </ul>
  </ul>
</div>

Что мы можем сделать, чтобы блок Socks (включая цвета носка) выделялся визуально с использованием интервала?

Что было бы неплохо, но не существует:

ul li ul:parent {
  margin-top: 15px;
  margin-bottom: 15px;
}

Что существует:

li > a {
  margin-top: 15px;
  display: block;
}
li > a:only-child {
  margin-top: 0px;
}

Это устанавливает, что все привязные ссылки имеют верхний край 15px и сбрасывают его обратно на 0 для тех, у которых нет элементов UL (или других тегов) внутри LI.

30
задан Daniel Fortunov 2 August 2010 в 08:04
поделиться

6 ответов

Вот что я получил с некоторой помощью от RX Forum:

Идея состоит в том, чтобы выпустить серию «билетов» для запуска исходной последовательности. Эти «билеты» задерживаются на время ожидания, за исключением самого первого, который сразу добавляется к последовательности билетов. Когда приходит событие и есть ожидающий тикет, событие запускается немедленно, в противном случае оно дожидается тикета, а затем запускается. Когда он срабатывает, выдается следующий билет и так далее ...

Чтобы объединить билеты и оригинальные мероприятия, нам понадобится комбинатор. К сожалению, "стандартный" .CombineLatest нельзя использовать здесь, потому что он срабатывает на тикетах и ​​событиях, которые использовались ранее. Поэтому мне пришлось создать свой собственный комбинатор, который в основном представляет собой отфильтрованный .CombineLatest, который срабатывает только тогда, когда оба элемента в комбинации «свежие» - никогда не возвращались раньше. Я называю это .CombineVeryLatest или .BrokenZip;)

Используя .CombineVeryLatest, вышеупомянутая идея может быть реализована как таковая:

    public static IObservable<T> SampleResponsive<T>(
        this IObservable<T> source, TimeSpan delay)
    {
        return source.Publish(src =>
        {
            var fire = new Subject<T>();

            var whenCanFire = fire
                .Select(u => new Unit())
                .Delay(delay)
                .StartWith(new Unit());

            var subscription = src
                .CombineVeryLatest(whenCanFire, (x, flag) => x)
                .Subscribe(fire);

            return fire.Finally(subscription.Dispose);
        });
    }

    public static IObservable<TResult> CombineVeryLatest
        <TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
        IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
    {
        var ls = leftSource.Select(x => new Used<TLeft>(x));
        var rs = rightSource.Select(x => new Used<TRight>(x));
        var cmb = ls.CombineLatest(rs, (x, y) => new { x, y });
        var fltCmb = cmb
            .Where(a => !(a.x.IsUsed || a.y.IsUsed))
            .Do(a => { a.x.IsUsed = true; a.y.IsUsed = true; });
        return fltCmb.Select(a => selector(a.x.Value, a.y.Value));
    }

    private class Used<T>
    {
        internal T Value { get; private set; }
        internal bool IsUsed { get; set; }

        internal Used(T value)
        {
            Value = value;
        }
    }

Изменить: вот еще один более компактный вариант CombineVeryLatest, предложенный Андреасом Кёпфом на форуме:

public static IObservable<TResult> CombineVeryLatest
  <TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
  IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
{
    return Observable.Defer(() =>
    {
        int l = -1, r = -1;
        return Observable.CombineLatest(
            leftSource.Select(Tuple.Create<TLeft, int>),
            rightSource.Select(Tuple.Create<TRight, int>),
                (x, y) => new { x, y })
            .Where(t => t.x.Item2 != l && t.y.Item2 != r)
            .Do(t => { l = t.x.Item2; r = t.y.Item2; })
            .Select(t => selector(t.x.Item1, t.y.Item1));
    });
}
13
ответ дан 28 November 2019 в 00:09
поделиться

Хорошо,

у вас есть 3 сценария:

1) Я хотел бы получать одно значение потока событий каждую секунду. Это означает: если он производит больше событий в секунду, вы получите всегда больший буфер.

observableStream.Throttle(timeSpan)

2) Я хотел бы получить последнее событие, которое было произведено до того, как произойдет второе. означает: другие события отбрасываются.

observableStream.Sample(TimeSpan.FromSeconds(1))

3) вы хотите получить все события, которые произошли в последнюю секунду. и так каждую секунду

observableStream.BufferWithTime(timeSpan)

4) вы хотите выбрать то, что происходит между секундами со всеми значениями, пока не пройдет секунда, и ваш результат будет возвращен

observableStream.CombineLatest(Observable.Interval(1000), selectorOnEachEvent)
15
ответ дан 28 November 2019 в 00:09
поделиться

Это то, что я опубликовал в качестве ответа на этот вопрос на форуме Rx :

ОБНОВЛЕНИЕ : Вот новая версия, которая больше не задерживает пересылку событий, когда события происходят с разницей во времени более одной секунды:

public static IObservable<T> ThrottleResponsive3<T>(this IObservable<T> source, TimeSpan minInterval)
{
    return Observable.CreateWithDisposable<T>(o =>
    {
        object gate = new Object();
        Notification<T> last = null, lastNonTerminal = null;
        DateTime referenceTime = DateTime.UtcNow - minInterval;
        var delayedReplay = new MutableDisposable();
        return new CompositeDisposable(source.Materialize().Subscribe(x =>
        {
            lock (gate)
            {
                var elapsed = DateTime.UtcNow - referenceTime;
                if (elapsed >= minInterval && delayedReplay.Disposable == null)
                {
                    referenceTime = DateTime.UtcNow;
                    x.Accept(o);
                }
                else
                {
                    if (x.Kind == NotificationKind.OnNext)
                        lastNonTerminal = x;
                    last = x;
                    if (delayedReplay.Disposable == null)
                    {
                        delayedReplay.Disposable = Scheduler.ThreadPool.Schedule(() =>
                        {
                            lock (gate)
                            {
                                referenceTime = DateTime.UtcNow;
                                if (lastNonTerminal != null && lastNonTerminal != last)
                                    lastNonTerminal.Accept(o);
                                last.Accept(o);
                                last = lastNonTerminal = null;
                                delayedReplay.Disposable = null;
                            }
                        }, minInterval - elapsed);
                    }
                }
            }
        }), delayedReplay);
    });
}

Это была моя предыдущая попытка:

var source = Observable.GenerateWithTime(1, 
    x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
    .Timestamp();

source.Publish(o =>
    o.Take(1).Merge(o.Skip(1).Sample(TimeSpan.FromSeconds(1)))
).Run(x => Console.WriteLine(x));
5
ответ дан 28 November 2019 в 00:09
поделиться

Пробовали ли вы метод расширения Throttle?

Из документации:

Игнорирует значения из наблюдаемой последовательности, за которыми следует другое значение до dueTime

Мне не совсем ясно, будет ли это делать то, что вы хотите, или нет - в том смысле, что вы хотите игнорировать следующие значения, а не первое... но я бы ожидал, что это будет то, что вы хотите. Попробуйте :)

EDIT: Хммм... нет, я не думаю, что Throttle это то, что нужно, в конце концов. Мне кажется, я понимаю, что вы хотите сделать, но я не вижу ничего в фреймворке для этого. Хотя, вполне возможно, я что-то упустил. Вы спрашивали на форуме Rx? Вполне возможно, что если этого нет сейчас, то они с радостью добавят это :)

Я подозреваю, что вы можете сделать это хитро с SkipUntil и SelectMany каким-то образом... но я думаю, что это должно быть в собственном методе.

0
ответ дан 28 November 2019 в 00:09
поделиться

Хорошо, вот одно решение. Мне оно не очень нравится, но... ладно.

Спасибо Джону за то, что указал мне на SkipWhile, и cRichter за BufferWithTime. Спасибо, ребята.

static void Main(string[] args)
{
    Console.WriteLine("Running...");

    var generator = Observable
        .GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
        .Timestamp();

    var bufferedAtOneSec = generator.BufferWithTime(TimeSpan.FromSeconds(1));

    var action = new Action<Timestamped<int>>(
        feed => Console.WriteLine("Observed {0:000}, generated at {1}, observed at {2}",
                                  feed.Value,
                                  feed.Timestamp.ToString("mm:ss.fff"),
                                  DateTime.Now.ToString("mm:ss.fff")));

    var reactImmediately = true;
    bufferedAtOneSec.Subscribe(list =>
                                   {
                                       if (list.Count == 0)
                                       {
                                           reactImmediately = true;
                                       }
                                       else
                                       {
                                           action(list.Last());
                                       }
                                   });
    generator
        .SkipWhile(item => reactImmediately == false)
        .Subscribe(feed =>
                       {
                           if(reactImmediately)
                           {
                               reactImmediately = false;
                               action(feed);
                           }
                       });

    Console.ReadKey();
}
2
ответ дан 28 November 2019 в 00:09
поделиться

То, что вы ищете, это CombineLatest.

public static IObservable<TResult> CombineLatest<TLeft, TRight, TResult>(
    IObservable<TLeft> leftSource,
    IObservable<TRight> rightSource,
    Func<TLeft, TRight, TResult> selector
)

который объединяет 2 наблюдаемые таблицы и возвращает все значения, когда селектор (время) имеет значение.

edit: john is right, that is perhaps not the preferred solution

0
ответ дан 28 November 2019 в 00:09
поделиться
Другие вопросы по тегам:

Похожие вопросы: