Неблокирование параллельного набора?

Система. Наборы. Параллельный имеет некоторые новые наборы, которые работают очень хорошо в многопоточных средах. Однако они немного ограничены. Или они блокируются, пока объект не становится доступным, или они возвращаются default(T) (Методы TryXXX).

Мне нужен набор, который ориентирован на многопотоковое исполнение, но вместо того, чтобы блокировать вызывающий поток он использует обратный вызов, чтобы сообщить мне, что по крайней мере один объект доступен.

Мое текущее решение состоит в том, чтобы использовать BlockingCollection, но использовать APM с делегатом для получения следующего элемента. Другими словами, я создаю делегата в методе это Takes от набора, и выполняют того делегата, использующего BeginInvoke.

К сожалению, я должен сохранить много состояния в моем классе для выполнения этого. Хуже, класс не ориентирован на многопотоковое исполнение; это может только использоваться единственным потоком. Я окаймляю край пригодности для обслуживания, которую я предпочел бы не делать.

Я знаю, что существуют некоторые библиотеки там, которые делают то, что я делаю здесь довольно простой (я полагаю, что Реактивная Платформа является одним из них), но я хотел бы выполнить свои цели, не добавляя ссылок за пределами версии 4 платформы.

Есть ли какие-либо лучшие шаблоны, которые я могу использовать, которые не требуют внешних ссылок, которые выполняют мою цель?


tl; доктор:

Есть ли любые шаблоны, которые удовлетворяют требование:

"Я должен предупредить о наборе, что я готов к следующему элементу и имею набор, выполняют обратный вызов, когда тот следующий элемент прибыл без любых заблокированных потоков".

7
задан 20 July 2010 в 13:44
поделиться

2 ответа

Я думаю, что у меня есть два возможных решения. Я не очень доволен ими, но они, по крайней мере, предоставляют разумную альтернативу подходу APM.

Первый не соответствует вашему требованию об отсутствии блокирующего потока, но я думаю, что он довольно элегантен, потому что вы можете регистрировать обратные вызовы, и они будут вызываться циклически, но у вас все еще есть возможность вызвать Take или TryTake , как обычно для BlockingCollection . Этот код заставляет регистрировать обратные вызовы каждый раз, когда запрашивается элемент. Это сигнальный механизм для сбора. Хорошая вещь в этом подходе заключается в том, что вызовы Take не прерываются, как в моем втором решении.

public class NotifyingBlockingCollection<T> : BlockingCollection<T>
{
    private Thread m_Notifier;
    private BlockingCollection<Action<T>> m_Callbacks = new BlockingCollection<Action<T>>();

    public NotifyingBlockingCollection()
    {
        m_Notifier = new Thread(Notify);
        m_Notifier.IsBackground = true;
        m_Notifier.Start();
    }

    private void Notify()
    {
        while (true)
        {
            Action<T> callback = m_Callbacks.Take();
            T item = Take();
            callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
        }
    }

    public void RegisterForTake(Action<T> callback)
    {
        m_Callbacks.Add(callback);
    }
}

Второй вариант соответствует вашему требованию об отсутствии блокирующего потока. Обратите внимание, как он передает вызов обратного вызова в пул потоков. Я сделал это, потому что думаю, что если он будет выполняться синхронно, то блокировки будут удерживаться дольше, что приведет к узким местам Add и RegisterForTake . Я внимательно просмотрел его и не думаю, что он может быть заблокирован в реальном времени (доступны и элемент, и обратный вызов, но обратный вызов никогда не выполняется), но вы можете просмотреть его самостоятельно, чтобы проверить.Единственная проблема здесь заключается в том, что вызов Take будет зависать, поскольку обратные вызовы всегда имеют приоритет.

public class NotifyingBlockingCollection<T>
{
    private BlockingCollection<T> m_Items = new BlockingCollection<T>();
    private Queue<Action<T>> m_Callbacks = new Queue<Action<T>>();

    public NotifyingBlockingCollection()
    {
    }

    public void Add(T item)
    {
        lock (m_Callbacks)
        {
            if (m_Callbacks.Count > 0)
            {
                Action<T> callback = m_Callbacks.Dequeue();
                callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
            }
            else
            {
                m_Items.Add(item);
            }
        }
    }

    public T Take()
    {
        return m_Items.Take();
    }

    public void RegisterForTake(Action<T> callback)
    {
        lock (m_Callbacks)
        {
            T item;
            if (m_Items.TryTake(out item))
            {
                callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
            }
            else
            {
                m_Callbacks.Enqueue(callback);
            }
        }
    }
}
4
ответ дан 7 December 2019 в 07:39
поделиться

Как насчет чего-то подобного? (Над именованием, вероятно, стоит поработать. И учтите, что это не проверено)

public class CallbackCollection<T>
{
    // Sychronization object to prevent race conditions.
    private object _SyncObject = new object();

    // A queue for callbacks that are waiting for items.
    private ConcurrentQueue<Action<T>> _Callbacks = new ConcurrentQueue<Action<T>>();

    // A queue for items that are waiting for callbacks.
    private ConcurrentQueue<T> _Items = new ConcurrentQueue<T>();

    public void Add(T item)
    {
        Action<T> callback;
        lock (_SyncObject)
        {
            // Try to get a callback. If no callback is available,
            // then enqueue the item to wait for the next callback
            // and return.
            if (!_Callbacks.TryDequeue(out callback))
            {
                _Items.Enqueue(item);
                return;
            }
        }

        ExecuteCallback(callback, item);
    }

    public void TakeAndCallback(Action<T> callback)
    {
        T item;
        lock(_SyncObject)
        {
            // Try to get an item. If no item is available, then
            // enqueue the callback to wait for the next item
            // and return.
            if (!_Items.TryDequeue(out item))
            {
                _Callbacks.Enqueue(callback);
                return;
            }
        }
        ExecuteCallback(callback, item);
    }

    private void ExecuteCallback(Action<T> callback, T item)
    {
        // Use a new Task to execute the callback so that we don't
        // execute it on the current thread.
        Task.Factory.StartNew(() => callback.Invoke(item));
    }
}
3
ответ дан 7 December 2019 в 07:39
поделиться
Другие вопросы по тегам:

Похожие вопросы: