Rx: Как я могу ответить немедленно и ограничить последующие запросы

Я хотел бы настроить подписку Rx, которая может сразу же реагировать на событие, а затем игнорировать последующие события, которые происходят в течение указанного периода «восстановления».

Стандартные методы Throttle / Buffer реагируют только по истечении тайм-аута, что не совсем то, что мне нужно.

Вот код, который настраивает сценарий и использует Throttle (это не то решение, которое мне нужно):

class Program
{
    static Stopwatch sw = new Stopwatch();

    static void Main(string[] args)
    {
        var subject = new Subject<int>();
        var timeout = TimeSpan.FromMilliseconds(500);

        subject
            .Throttle(timeout)
            .Subscribe(DoStuff);

        var factory = new TaskFactory();

        sw.Start();

        factory.StartNew(() =>
        {
            Console.WriteLine("Batch 1 (no delay)");
            subject.OnNext(1);
        });

        factory.StartNewDelayed(1000, () =>
        {
            Console.WriteLine("Batch 2 (1s delay)");
            subject.OnNext(2);
        });

        factory.StartNewDelayed(1300, () =>
        {
            Console.WriteLine("Batch 3 (1.3s delay)");
            subject.OnNext(3);
        });

        factory.StartNewDelayed(1600, () =>
        {
            Console.WriteLine("Batch 4 (1.6s delay)");
            subject.OnNext(4);
        });

        Console.ReadKey();
        sw.Stop();
    }

    private static void DoStuff(int i)
    {
        Console.WriteLine("Handling {0} at {1}ms", i, sw.ElapsedMilliseconds);
    }
}

Результат выполнения этого прямо сейчас:

Пакет 1 (без задержки)

Обработка 1 при 508 мс

Пакет 2 (задержка 1 с)

Пакет 3 (задержка 1,3 с)

Пакет 4 (1.Задержка 6 с)

Обработка 4 при 2114 мс

Обратите внимание, что пакет 2 не обрабатывается (и это нормально!), Потому что мы ждем 500 мс, чтобы пройти между запросами из-за природы дросселирования. Пакет 3 также не обрабатывается (что менее нормально, потому что это произошло более чем через 500 мс от пакета 2) из-за его близости к пакету 4.

Я ищу что-то вроде этого:

Пакет 1 (без задержки)

Обработка 1 при ~ 0 мс

Пакет 2 (задержка 1 с)

Обработка 2 при ~ 1000 с

Пакет 3 (задержка 1,3 с)

Пакет 4 (задержка 1,6 с)

Обработка 4 при ~ 1600 с

Обратите внимание, что пакет 3 не будет обрабатываться в этом сценарии (и это нормально!), Потому что это происходит в пределах 500 мс от пакета 2.

EDIT :

Вот это реализация для метода расширения "StartNewDelayed", который я использую:

/// <summary>Creates a Task that will complete after the specified delay.</summary>
/// <param name="factory">The TaskFactory.</param>
/// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param>
/// <returns>A Task that will be completed after the specified duration.</returns>
public static Task StartNewDelayed(
    this TaskFactory factory, int millisecondsDelay)
{
    return StartNewDelayed(factory, millisecondsDelay, CancellationToken.None);
}

/// <summary>Creates a Task that will complete after the specified delay.</summary>
/// <param name="factory">The TaskFactory.</param>
/// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param>
/// <param name="cancellationToken">The cancellation token that can be used to cancel the timed task.</param>
/// <returns>A Task that will be completed after the specified duration and that's cancelable with the specified token.</returns>
public static Task StartNewDelayed(this TaskFactory factory, int millisecondsDelay, CancellationToken cancellationToken)
{
    // Validate arguments
    if (factory == null) throw new ArgumentNullException("factory");
    if (millisecondsDelay < 0) throw new ArgumentOutOfRangeException("millisecondsDelay");

    // Create the timed task
    var tcs = new TaskCompletionSource<object>(factory.CreationOptions);
    var ctr = default(CancellationTokenRegistration);

    // Create the timer but don't start it yet.  If we start it now,
    // it might fire before ctr has been set to the right registration.
    var timer = new Timer(self =>
    {
        // Clean up both the cancellation token and the timer, and try to transition to completed
        ctr.Dispose();
        ((Timer)self).Dispose();
        tcs.TrySetResult(null);
    });

    // Register with the cancellation token.
    if (cancellationToken.CanBeCanceled)
    {
        // When cancellation occurs, cancel the timer and try to transition to cancelled.
        // There could be a race, but it's benign.
        ctr = cancellationToken.Register(() =>
        {
            timer.Dispose();
            tcs.TrySetCanceled();
        });
    }

    if (millisecondsDelay > 0)
    {
        // Start the timer and hand back the task...
        timer.Change(millisecondsDelay, Timeout.Infinite);
    }
    else
    {
        // Just complete the task, and keep execution on the current thread.
        ctr.Dispose();
        tcs.TrySetResult(null);
        timer.Dispose();
    }

    return tcs.Task;
}
27
задан Martin Liversage 10 February 2015 в 08:09
поделиться