Я просто получаю пакеты из сети и ставлю их в очередь в одном потоке, а затем использую эти пакеты (Извлекаю из очереди )в другом потоке.
Поэтому я решил использовать библиотеку boost для создания общей очереди на основеhttps://www.quantnet.com/cplusplus-multithreading-boost/
template
class SynchronisedQueue
{
private:
std::queue m_queue; // Use STL queue to store data
boost::mutex m_mutex; // The mutex to synchronise on
boost::condition_variable m_cond;// The condition to wait for
public:
// Add data to the queue and notify others
void Enqueue(const T& data)
{
// Acquire lock on the queue
boost::unique_lock lock(m_mutex);
// Add the data to the queue
m_queue.push(data);
// Notify others that data is ready
m_cond.notify_one();
} // Lock is automatically released here
// Get data from the queue. Wait for data if not available
T Dequeue()
{
// Acquire lock on the queue
boost::unique_lock lock(m_mutex);
// When there is no data, wait till someone fills it.
// Lock is automatically released in the wait and obtained
// again after the wait
while (m_queue.size()==0) m_cond.wait(lock);
// Retrieve the data from the queue
T result=m_queue.front(); m_queue.pop();
return result;
} // Lock is automatically released here
};
Проблема в том, что при отсутствии данных метод Dequeue ()блокирует мой потребительский поток, и когда я хочу завершить работу с потребителем поток я не могу закончить его или остановить его иногда.
Каков предлагаемый способ прекращения блокировки Dequeue (), чтобы я мог безопасно завершить поток, потребляющий пакеты? Любые идеи предложения?
PS :Сайтhttps://www.quantnet.com/cplusplus-multithreading-boost/использует "ускорить ::этот _поток ::прерывание _точку ();" для остановки потребительского потока... Из-за моей устаревшей структуры кода это невозможно для меня...
На основе ответа я обновляю общую очередь следующим образом:
#include
#include
template
class SynchronisedQueue
{
public:
SynchronisedQueue()
{
RequestToEnd = false;
EnqueueData = true;
}
void Enqueue(const T& data)
{
boost::unique_lock lock(m_mutex);
if(EnqueueData)
{
m_queue.push(data);
m_cond.notify_one();
}
}
bool TryDequeue(T& result)
{
boost::unique_lock lock(m_mutex);
while (m_queue.empty() && (! RequestToEnd))
{
m_cond.wait(lock);
}
if( RequestToEnd )
{
DoEndActions();
return false;
}
result= m_queue.front(); m_queue.pop();
return true;
}
void StopQueue()
{
RequestToEnd = true;
Enqueue(NULL);
}
int Size()
{
boost::unique_lock lock(m_mutex);
return m_queue.size();
}
private:
void DoEndActions()
{
EnqueueData = false;
while (!m_queue.empty())
{
m_queue.pop();
}
}
std::queue m_queue; // Use STL queue to store data
boost::mutex m_mutex; // The mutex to synchronise on
boost::condition_variable m_cond; // The condition to wait for
bool RequestToEnd;
bool EnqueueData;
};
И вот мой тест-драйв:
#include
#include
#include "SynchronisedQueue.h"
using namespace std;
SynchronisedQueue MyQueue;
void InsertToQueue()
{
int i= 0;
while(true)
{
MyQueue.Enqueue(++i);
}
}
void ConsumeFromQueue()
{
while(true)
{
int number;
cout << "Now try to dequeue" << endl;
bool success = MyQueue.TryDequeue(number);
if(success)
{
cout << "value is " << number << endl;
}
else
{
cout << " queue is stopped" << endl;
break;
}
}
cout << "Que size is : " << MyQueue.Size() << endl;
}
int main()
{
cout << "Test Started" << endl;
boost::thread startInsertIntoQueue = boost::thread(InsertToQueue);
boost::thread consumeFromQueue = boost::thread(ConsumeFromQueue);
boost::this_thread::sleep(boost::posix_time::seconds(5)); //After 5 seconds
MyQueue.StopQueue();
int endMain;
cin >> endMain;
return 0;
}
На данный момент кажется работать... На основе новых предложений:
я изменил метод остановки на:
void StopQueue()
{
boost::unique_lock lock(m_mutex);
RequestToEnd = true;
m_cond.notify_one();
}