bool hasTwo = yourSequence.ElementAtOrDefault(1) != default(T);
... в случае класса, где значения могут быть нулевыми, это может быть полезно.
Библиотека Futures перешла в Boost и стандартную библиотеку C ++. В ACE есть что-то похожее, но я бы не хотел рекомендовать его кому-либо (как уже отмечал @lothar, это Active Object.)
Возможно, вас заинтересует Активный объект , один из паттернов ACE структуры ACE .
Как Николай указал фьючерсы , которые запланированы для стандартного C ++ когда-нибудь в будущем (каламбур).
В библиотеке POCO есть что-то похожее на то, что называется ActiveMethod (вместе с некоторыми связанными функциями, например, ActiveResult) в разделе потоков. Исходный код легко доступен и понятен.
You can solve this by using Boost's Thread -library. Something like this (half-pseudo):
class GThreadObject
{
...
public:
GThreadObject()
: _done(false)
, _newJob(false)
, _thread(boost::bind(>hreadObject::workerThread, this))
{
}
~GThreadObject()
{
_done = true;
_thread.join();
}
void functionOne(char *argOne, int argTwo)
{
...
_jobQueue.push(myEvent);
{
boost::lock_guard l(_mutex);
_newJob = true;
}
_cond.notify_one();
}
private:
void workerThread()
{
while (!_done) {
boost::unique_lock l(_mutex);
while (!_newJob) {
cond.wait(l);
}
Event *receivedEvent = _jobQueue.front();
...
}
}
private:
volatile bool _done;
volatile bool _newJob;
boost::thread _thread;
boost::mutex _mutex;
boost::condition_variable _cond;
std::queue<Event*> _jobQueue;
};
Also, please note how RAII allow us to get this code smaller and better to manage.
Для расширяемости и ремонтопригодности (и других возможностей) вы можете определить абстрактный класс (или интерфейс) для «работы», которую должен выполнять поток. Затем пользователи вашего пула потоков будут реализовывать этот интерфейс и давать ссылку на объект пулу потоков. Это очень похоже на дизайн активных объектов Symbian: каждый АО подклассов CActive и должен реализовывать такие методы, как Run () и Cancel ().
Для простоты ваш интерфейс (абстрактный класс) может быть таким простым, как:
class IJob
{
virtual Run()=0;
};
Then пул потоков или запросы, принимающие один поток, будут иметь что-то вроде:
class CThread
{
<...>
public:
void AddJob(IJob* iTask);
<...>
};
Естественно, у вас будет несколько задач, которые могут иметь всевозможные дополнительные сеттеры / получатели / атрибуты и все, что вам нужно в любой сфере жизни. Однако необходимо лишь реализовать метод Run (), который бы выполнял длительные вычисления:
class CDumbLoop : public IJob
{
public:
CDumbJob(int iCount) : m_Count(iCount) {};
~CDumbJob() {};
void Run()
{
// Do anything you want here
}
private:
int m_Count;
};
Вот класс, который я написал для той же цели (я использую его для обработки событий, но вы, конечно, можете переименовать его в ActionQueue - и переименовать его методы).
Вы используете его как это:
С функцией, которую вы хотите вызвать: void foo (const int x, const int y) {/*...*/}
И: EventQueue q;
q .AddEvent (boost :: bind (foo, 10, 20));
В рабочем потоке
q.PlayOutEvents ();
Примечание. Добавление кода для блокировки по условию в избегайте использования циклов ЦП.
Код (Visual Studio 2003 с ускорением 1.34.1):
#pragma once
#include <boost/thread/recursive_mutex.hpp>
#include <boost/function.hpp>
#include <boost/signals.hpp>
#include <boost/bind.hpp>
#include <boost/foreach.hpp>
#include <string>
using std::string;
// Records & plays out actions (closures) in a safe-thread manner.
class EventQueue
{
typedef boost::function <void ()> Event;
public:
const bool PlayOutEvents ()
{
// The copy is there to ensure there are no deadlocks.
const std::vector<Event> eventsCopy = PopEvents ();
BOOST_FOREACH (const Event& e, eventsCopy)
{
e ();
Sleep (0);
}
return eventsCopy.size () > 0;
}
void AddEvent (const Event& event)
{
Mutex::scoped_lock lock (myMutex);
myEvents.push_back (event);
}
protected:
const std::vector<Event> PopEvents ()
{
Mutex::scoped_lock lock (myMutex);
const std::vector<Event> eventsCopy = myEvents;
myEvents.clear ();
return eventsCopy;
}
private:
typedef boost::recursive_mutex Mutex;
Mutex myMutex;
std::vector <Event> myEvents;
};
Надеюсь, это поможет. :)
Мартин Билски
Ниже представлена реализация, для которой не требуется метод functionProxy. Даже несмотря на то, что добавлять новые методы проще, это все равно беспорядок.
Boost :: Bind и «Futures» действительно кажутся такими, как будто они бы многое из этого убрали. Думаю, я посмотрю на код повышения и посмотрю, как он работает. Всем спасибо за ваши предложения.
GThreadObject.h
#include <queue>
using namespace std;
class GThreadObject
{
template <int size>
class VariableSizeContainter
{
char data[size];
};
class event
{
public:
void (GThreadObject::*funcPtr)(void *);
int dataSize;
char * data;
};
public:
void functionOne(char * argOne, int argTwo);
void functionTwo(int argTwo, int arg2);
private:
void newEvent(void (GThreadObject::*)(void*), unsigned int argStart, int argSize);
void workerThread();
queue<GThreadObject::event*> jobQueue;
void functionTwoInternal(int argTwo, int arg2);
void functionOneInternal(char * argOne, int argTwo);
};
GThreadObject.cpp
#include <iostream>
#include "GThreadObject.h"
using namespace std;
/* On a continuous loop, reading tasks from queue
* When a new event is received it executes the attached function pointer
* Thread code removed to decrease clutter
*/
void GThreadObject::workerThread()
{
//New Event added, process it
GThreadObject::event * receivedEvent = jobQueue.front();
/* Create an object the size of the stack the function is expecting, then cast the function to accept this object as an argument.
* This is the bit i would like to remove
* Only supports 8 byte argument size e.g 2 int's OR pointer + int OR myObject8bytesSize
* Subsequent data sizes would need to be added with an else if
* */
if (receivedEvent->dataSize == 8)
{
const int size = 8;
void (GThreadObject::*newFuncPtr)(VariableSizeContainter<size>);
newFuncPtr = (void (GThreadObject::*)(VariableSizeContainter<size>))receivedEvent->funcPtr;
//Execute the function
(*this.*newFuncPtr)(*((VariableSizeContainter<size>*)receivedEvent->data));
}
//Clean up
free(receivedEvent->data);
delete receivedEvent;
}
void GThreadObject::newEvent(void (GThreadObject::*funcPtr)(void*), unsigned int argStart, int argSize)
{
//Malloc an object the size of the function arguments
void * myData = malloc(argSize);
//Copy the data passed to this function into the buffer
memcpy(myData, (char*)argStart, argSize);
//Create the event and push it on to the queue
GThreadObject::event * myEvent = new event;
myEvent->data = (char*)myData;
myEvent->dataSize = argSize;
myEvent->funcPtr = funcPtr;
jobQueue.push(myEvent);
//This would be send a thread condition signal, replaced with a simple call here
this->workerThread();
}
/*
* This is the public interface, Can be called from child threads
* Instead of executing the event directly it adds it to a job queue
* Then the workerThread picks it up and executes all tasks on the same thread
*/
void GThreadObject::functionOne(char * argOne, int argTwo)
{
newEvent((void (GThreadObject::*)(void*))>hreadObject::functionOneInternal, (unsigned int)&argOne, sizeof(char*)+sizeof(int));
}
/*
* This handles the actual event
*/
void GThreadObject::functionOneInternal(char * argOne, int argTwo)
{
cout << "We've made it to functionOne Internal char*:" << argOne << " int:" << argTwo << endl;
//Now do the work
}
void GThreadObject::functionTwo(int argOne, int argTwo)
{
newEvent((void (GThreadObject::*)(void*))>hreadObject::functionTwoInternal, (unsigned int)&argOne, sizeof(int)+sizeof(int));
}
/*
* This handles the actual event
*/
void GThreadObject::functionTwoInternal(int argOne, int argTwo)
{
cout << "We've made it to functionTwo Internal arg1:" << argOne << " int:" << argTwo << endl;
}
main.cpp
#include <iostream>
#include "GThreadObject.h"
int main()
{
GThreadObject myObj;
myObj.functionOne("My Message", 23);
myObj.functionTwo(456, 23);
return 0;
}
Редактировать: Для полноты картины я реализовал с Boost :: bind. Ключевые отличия:
queue<boost::function<void ()> > jobQueue;
void GThreadObjectBoost::functionOne(char * argOne, int argTwo)
{
jobQueue.push(boost::bind(>hreadObjectBoost::functionOneInternal, this, argOne, argTwo));
workerThread();
}
void GThreadObjectBoost::workerThread()
{
boost::function<void ()> func = jobQueue.front();
func();
}
Использование реализации ускорения для 10 000 000 итераций functionOne () заняло ~ 19 секунд. Однако реализация без ускорения заняла всего ~ 6.5 секунд. Так примерно в 3 раза медленнее. Я предполагаю, что поиск хорошей очереди без блокировки будет самым большим препятствием для производительности. Но это все равно довольно большая разница.
Вам следует взглянуть на библиотеку Boost ASIO. Он предназначен для асинхронной отправки событий. Его можно объединить с библиотекой Boost Thread для создания описанной вами системы.
Вам потребуется создать экземпляр одного объекта boost :: asio :: io_service
и запланировать серию асинхронных событий ( boost :: asio :: io_service :: post
или boost :: asio :: io_service :: dispatch
). Затем вы вызываете функцию-член run
из n потоков. Объект io_service
является потокобезопасным и гарантирует, что ваши асинхронные обработчики будут отправляться только в потоке, из которого вы вызвали io_service :: run
.
Повышение : Объект: asio :: strand
также полезен для простой синхронизации потоков.