Событие / Многопоточность Очереди Задачи C++

bool hasTwo = yourSequence.ElementAtOrDefault(1) != default(T);

... в случае класса, где значения могут быть нулевыми, это может быть полезно.

8
задан Ben Reeves 29 May 2009 в 00:43
поделиться

8 ответов

Библиотека Futures перешла в Boost и стандартную библиотеку C ++. В ACE есть что-то похожее, но я бы не хотел рекомендовать его кому-либо (как уже отмечал @lothar, это Active Object.)

6
ответ дан 5 December 2019 в 12:12
поделиться

Возможно, вас заинтересует Активный объект , один из паттернов ACE структуры ACE .

Как Николай указал фьючерсы , которые запланированы для стандартного C ++ когда-нибудь в будущем (каламбур).

1
ответ дан 5 December 2019 в 12:12
поделиться

В библиотеке POCO есть что-то похожее на то, что называется ActiveMethod (вместе с некоторыми связанными функциями, например, ActiveResult) в разделе потоков. Исходный код легко доступен и понятен.

2
ответ дан 5 December 2019 в 12:12
поделиться

You can solve this by using Boost's Thread -library. Something like this (half-pseudo):


class GThreadObject
{
        ...

        public:
                GThreadObject()
                : _done(false)
                , _newJob(false)
                , _thread(boost::bind(>hreadObject::workerThread, this))
                {
                }

                ~GThreadObject()
                {
                        _done = true;

                        _thread.join();
                }

                void functionOne(char *argOne, int argTwo)
                {
                        ...

                        _jobQueue.push(myEvent);

                        {
                                boost::lock_guard l(_mutex);

                                _newJob = true;
                        }

                        _cond.notify_one();
                }

        private:
                void workerThread()
                {
                        while (!_done) {
                                boost::unique_lock l(_mutex);

                                while (!_newJob) {
                                        cond.wait(l);
                                }

                                Event *receivedEvent = _jobQueue.front();

                                ...
                        }
                }

        private:
                volatile bool             _done;
                volatile bool             _newJob;
                boost::thread             _thread;
                boost::mutex              _mutex;
                boost::condition_variable _cond;
                std::queue<Event*>        _jobQueue;
};

Also, please note how RAII allow us to get this code smaller and better to manage.

2
ответ дан 5 December 2019 в 12:12
поделиться

Для расширяемости и ремонтопригодности (и других возможностей) вы можете определить абстрактный класс (или интерфейс) для «работы», которую должен выполнять поток. Затем пользователи вашего пула потоков будут реализовывать этот интерфейс и давать ссылку на объект пулу потоков. Это очень похоже на дизайн активных объектов Symbian: каждый АО подклассов CActive и должен реализовывать такие методы, как Run () и Cancel ().

Для простоты ваш интерфейс (абстрактный класс) может быть таким простым, как:

class IJob
{
    virtual Run()=0;
};

Then пул потоков или запросы, принимающие один поток, будут иметь что-то вроде:

class CThread
{
   <...>
public:
   void AddJob(IJob* iTask);
   <...>
};

Естественно, у вас будет несколько задач, которые могут иметь всевозможные дополнительные сеттеры / получатели / атрибуты и все, что вам нужно в любой сфере жизни. Однако необходимо лишь реализовать метод Run (), который бы выполнял длительные вычисления:

class CDumbLoop : public IJob
{
public:
    CDumbJob(int iCount) : m_Count(iCount) {};
    ~CDumbJob() {};
    void Run()
    {
        // Do anything you want here
    }
private:
    int m_Count;
};
1
ответ дан 5 December 2019 в 12:12
поделиться

Вот класс, который я написал для той же цели (я использую его для обработки событий, но вы, конечно, можете переименовать его в ActionQueue - и переименовать его методы).

Вы используете его как это:

С функцией, которую вы хотите вызвать: void foo (const int x, const int y) {/*...*/}

И: EventQueue q;

q .AddEvent (boost :: bind (foo, 10, 20));

В рабочем потоке

q.PlayOutEvents ();

Примечание. Добавление кода для блокировки по условию в избегайте использования циклов ЦП.

Код (Visual Studio 2003 с ускорением 1.34.1):

#pragma once

#include <boost/thread/recursive_mutex.hpp>
#include <boost/function.hpp>
#include <boost/signals.hpp>
#include <boost/bind.hpp>
#include <boost/foreach.hpp>
#include <string>
using std::string;


// Records & plays out actions (closures) in a safe-thread manner.

class EventQueue
{
    typedef boost::function <void ()> Event;

public:

    const bool PlayOutEvents ()
    {
        // The copy is there to ensure there are no deadlocks.
        const std::vector<Event> eventsCopy = PopEvents ();

        BOOST_FOREACH (const Event& e, eventsCopy)
        {
            e ();
            Sleep (0);
        }

        return eventsCopy.size () > 0;
    }

    void AddEvent (const Event& event)
    {
        Mutex::scoped_lock lock (myMutex);

        myEvents.push_back (event);
    }

protected:

    const std::vector<Event> PopEvents ()
    {
        Mutex::scoped_lock lock (myMutex);

        const std::vector<Event> eventsCopy = myEvents;
        myEvents.clear ();

        return eventsCopy;
    }

private:

    typedef boost::recursive_mutex Mutex;
    Mutex myMutex;

    std::vector <Event> myEvents;

};

Надеюсь, это поможет. :)

Мартин Билски

1
ответ дан 5 December 2019 в 12:12
поделиться

Ниже представлена ​​реализация, для которой не требуется метод functionProxy. Даже несмотря на то, что добавлять новые методы проще, это все равно беспорядок.

Boost :: Bind и «Futures» действительно кажутся такими, как будто они бы многое из этого убрали. Думаю, я посмотрю на код повышения и посмотрю, как он работает. Всем спасибо за ваши предложения.

GThreadObject.h

#include <queue>

using namespace std;

class GThreadObject
{

    template <int size>
    class VariableSizeContainter
    {
        char data[size];
    };

    class event
    {
        public:
        void (GThreadObject::*funcPtr)(void *);
        int dataSize;
        char * data;
    };

public:
    void functionOne(char * argOne, int argTwo);
    void functionTwo(int argTwo, int arg2);


private:
    void newEvent(void (GThreadObject::*)(void*), unsigned int argStart, int argSize);
    void workerThread();
    queue<GThreadObject::event*> jobQueue;
    void functionTwoInternal(int argTwo, int arg2);
    void functionOneInternal(char * argOne, int argTwo);

};

GThreadObject.cpp

#include <iostream>
#include "GThreadObject.h"

using namespace std;

/* On a continuous loop, reading tasks from queue
 * When a new event is received it executes the attached function pointer
 * Thread code removed to decrease clutter
 */
void GThreadObject::workerThread()
{
    //New Event added, process it
    GThreadObject::event * receivedEvent = jobQueue.front();

    /* Create an object the size of the stack the function is expecting, then cast the function to accept this object as an argument.
     * This is the bit i would like to remove
     * Only supports 8 byte argument size e.g 2 int's OR pointer + int OR myObject8bytesSize
     * Subsequent data sizes would need to be added with an else if
     * */
    if (receivedEvent->dataSize == 8)
    {
        const int size = 8;

        void (GThreadObject::*newFuncPtr)(VariableSizeContainter<size>);
        newFuncPtr = (void (GThreadObject::*)(VariableSizeContainter<size>))receivedEvent->funcPtr;

        //Execute the function
        (*this.*newFuncPtr)(*((VariableSizeContainter<size>*)receivedEvent->data));
    }

    //Clean up
    free(receivedEvent->data);
    delete receivedEvent;

}

void GThreadObject::newEvent(void (GThreadObject::*funcPtr)(void*), unsigned int argStart, int argSize)
{

    //Malloc an object the size of the function arguments
    void * myData = malloc(argSize);
    //Copy the data passed to this function into the buffer
    memcpy(myData, (char*)argStart, argSize);

    //Create the event and push it on to the queue
    GThreadObject::event * myEvent = new event;
    myEvent->data = (char*)myData;
    myEvent->dataSize = argSize;
    myEvent->funcPtr = funcPtr;
    jobQueue.push(myEvent);

    //This would be send a thread condition signal, replaced with a simple call here
    this->workerThread();

}

/*
 * This is the public interface, Can be called from child threads
 * Instead of executing the event directly it adds it to a job queue
 * Then the workerThread picks it up and executes all tasks on the same thread
 */
void GThreadObject::functionOne(char * argOne, int argTwo)
{
    newEvent((void (GThreadObject::*)(void*))&GThreadObject::functionOneInternal, (unsigned int)&argOne, sizeof(char*)+sizeof(int));
}

/*
 * This handles the actual event
 */
void GThreadObject::functionOneInternal(char * argOne, int argTwo)
{
    cout << "We've made it to functionOne Internal char*:" << argOne << " int:" << argTwo << endl;

    //Now do the work
}

void GThreadObject::functionTwo(int argOne, int argTwo)
{
    newEvent((void (GThreadObject::*)(void*))&GThreadObject::functionTwoInternal, (unsigned int)&argOne, sizeof(int)+sizeof(int));
}

/*
 * This handles the actual event
 */
void GThreadObject::functionTwoInternal(int argOne, int argTwo)
{
    cout << "We've made it to functionTwo Internal arg1:" << argOne << " int:" << argTwo << endl;
}

main.cpp

#include <iostream>
#include "GThreadObject.h"

int main()
{

    GThreadObject myObj;

    myObj.functionOne("My Message", 23);
    myObj.functionTwo(456, 23);


    return 0;
}

Редактировать: Для полноты картины я реализовал с Boost :: bind. Ключевые отличия:

queue<boost::function<void ()> > jobQueue;

void GThreadObjectBoost::functionOne(char * argOne, int argTwo)
{
    jobQueue.push(boost::bind(&GThreadObjectBoost::functionOneInternal, this, argOne, argTwo));

    workerThread();
}

void GThreadObjectBoost::workerThread()
{
    boost::function<void ()> func = jobQueue.front();
    func();
}

Использование реализации ускорения для 10 000 000 итераций functionOne () заняло ~ 19 секунд. Однако реализация без ускорения заняла всего ~ 6.5 секунд. Так примерно в 3 раза медленнее. Я предполагаю, что поиск хорошей очереди без блокировки будет самым большим препятствием для производительности. Но это все равно довольно большая разница.

1
ответ дан 5 December 2019 в 12:12
поделиться

Вам следует взглянуть на библиотеку Boost ASIO. Он предназначен для асинхронной отправки событий. Его можно объединить с библиотекой Boost Thread для создания описанной вами системы.

Вам потребуется создать экземпляр одного объекта boost :: asio :: io_service и запланировать серию асинхронных событий ( boost :: asio :: io_service :: post или boost :: asio :: io_service :: dispatch ). Затем вы вызываете функцию-член run из n потоков. Объект io_service является потокобезопасным и гарантирует, что ваши асинхронные обработчики будут отправляться только в потоке, из которого вы вызвали io_service :: run .

Повышение : Объект: asio :: strand также полезен для простой синхронизации потоков.

0
ответ дан 5 December 2019 в 12:12
поделиться
Другие вопросы по тегам:

Похожие вопросы: