Общая очередь в C++

Я просто получаю пакеты из сети и ставлю их в очередь в одном потоке, а затем использую эти пакеты (Извлекаю из очереди )в другом потоке.

Поэтому я решил использовать библиотеку boost для создания общей очереди на основеhttps://www.quantnet.com/cplusplus-multithreading-boost/

template 
class SynchronisedQueue
{
private:
    std::queue m_queue;  // Use STL queue to store data
    boost::mutex m_mutex;   // The mutex to synchronise on
    boost::condition_variable m_cond;// The condition to wait for

public:

    // Add data to the queue and notify others
    void Enqueue(const T& data)
    {
        // Acquire lock on the queue
        boost::unique_lock lock(m_mutex);

        // Add the data to the queue
        m_queue.push(data);

        // Notify others that data is ready
        m_cond.notify_one();

    } // Lock is automatically released here

    // Get data from the queue. Wait for data if not available
    T Dequeue()
    {

        // Acquire lock on the queue
        boost::unique_lock lock(m_mutex);

        // When there is no data, wait till someone fills it.
        // Lock is automatically released in the wait and obtained 
        // again after the wait
        while (m_queue.size()==0) m_cond.wait(lock);

        // Retrieve the data from the queue
        T result=m_queue.front(); m_queue.pop();
        return result;

    } // Lock is automatically released here
};

Проблема в том, что при отсутствии данных метод Dequeue ()блокирует мой потребительский поток, и когда я хочу завершить работу с потребителем поток я не могу закончить его или остановить его иногда.

Каков предлагаемый способ прекращения блокировки Dequeue (), чтобы я мог безопасно завершить поток, потребляющий пакеты? Любые идеи предложения?

PS :Сайтhttps://www.quantnet.com/cplusplus-multithreading-boost/использует "ускорить ::этот _поток ::прерывание _точку ();" для остановки потребительского потока... Из-за моей устаревшей структуры кода это невозможно для меня...

На основе ответа я обновляю общую очередь следующим образом:

#include 
 #include   

template 
class SynchronisedQueue
{
public:

    SynchronisedQueue()
    {
        RequestToEnd = false;  
        EnqueueData = true;
    }
    void Enqueue(const T& data)
    {
        boost::unique_lock lock(m_mutex);

        if(EnqueueData)
        {
            m_queue.push(data);
            m_cond.notify_one();
        }

    } 


    bool TryDequeue(T& result)
    {
        boost::unique_lock lock(m_mutex);

        while (m_queue.empty() && (! RequestToEnd)) 
        { 
            m_cond.wait(lock);
        }

        if( RequestToEnd )
        {
             DoEndActions();
             return false;
        }

        result= m_queue.front(); m_queue.pop();

        return true;
    }

    void StopQueue()
    {
        RequestToEnd =  true;
        Enqueue(NULL);        
    }

    int Size()
    {
        boost::unique_lock lock(m_mutex);
        return m_queue.size();

    }

private:

    void DoEndActions()
    {
        EnqueueData = false;

        while (!m_queue.empty())  
        {
            m_queue.pop();
        }
    }



    std::queue m_queue;              // Use STL queue to store data
    boost::mutex m_mutex;               // The mutex to synchronise on
    boost::condition_variable m_cond;            // The condition to wait for

    bool RequestToEnd;
    bool EnqueueData;
};

И вот мой тест-драйв:

#include 
#include 

#include "SynchronisedQueue.h"

using namespace std;

SynchronisedQueue MyQueue;

void InsertToQueue()
{
    int i= 0;

    while(true)
    {
        MyQueue.Enqueue(++i);
    }

}

void ConsumeFromQueue()
{
    while(true)
    {
        int number;

        cout << "Now try to dequeue" << endl;

        bool success = MyQueue.TryDequeue(number);

        if(success)
        {

            cout << "value is " << number << endl;

        }

        else
        {
            cout << " queue is stopped" << endl;
            break;

        }
    }


    cout << "Que size is : " << MyQueue.Size() <<  endl;
}



int main()
{

    cout << "Test Started" << endl;

    boost::thread startInsertIntoQueue = boost::thread(InsertToQueue);
    boost::thread consumeFromQueue = boost::thread(ConsumeFromQueue);

    boost::this_thread::sleep(boost::posix_time::seconds(5)); //After 5 seconds

    MyQueue.StopQueue();

    int endMain;

    cin >> endMain;


    return 0;
}

На данный момент кажется работать... На основе новых предложений:

я изменил метод остановки на:

void StopQueue()
    {
        boost::unique_lock lock(m_mutex);
        RequestToEnd =  true;
        m_cond.notify_one();          
    }

6
задан MPelletier 14 February 2014 в 14:18
поделиться