Простой пример пула потоков Windows API

[EDIT: благодаря ответу MSalters и ответу Раймонда Чена на InterlockedIncrement vs EnterCriticalSection / counter ++ / LeaveCriticalSection , проблема решена, и приведенный ниже код работает правильно. Это должно предоставить интересный простой пример использования пула потоков в Windows]

Мне не удалось найти простой пример следующей задачи. Моя программа, например, должна увеличивать значения в огромном std :: vector на единицу, поэтому я хочу сделать это параллельно. Это необходимо делать несколько раз за время существования программы. Я знаю, как это сделать, используя CreateThread при каждом вызове подпрограммы, но мне не удается избавиться от CreateThread с помощью ThreadPool.

Вот что я делаю:

class Thread {
public:
    Thread(){}
    virtual void run() = 0 ; // I can inherit an "IncrementVectorThread"
};
class IncrementVectorThread: public Thread {
public:
   IncrementVectorThread(int threadID, int nbThreads, std::vector &vec) : id(threadID), nb(nbThreads), myvec(vec) { };

   virtual void run() {
        for (int i=(myvec.size()*id)/nb; i<(myvec.size()*(id+1))/nb; i++)
          myvec[i]++; //and let's assume myvec is properly sized
    }
   int id, nb;
   std::vector &myvec;
};

class ThreadGroup : public std::vector {
public:
    ThreadGroup() { 
         pool = CreateThreadpool(NULL);
         InitializeThreadpoolEnvironment(&cbe);
         cleanupGroup = CreateThreadpoolCleanupGroup();
         SetThreadpoolCallbackPool(&cbe, pool);
         SetThreadpoolCallbackCleanupGroup(&cbe, cleanupGroup, NULL);
         threadCount = 0;
    }
    ~ThreadGroup() {
         CloseThreadpool(pool);
}
    PTP_POOL pool;
    TP_CALLBACK_ENVIRON cbe;
    PTP_CLEANUP_GROUP cleanupGroup;
    volatile long threadCount;
} ;


static VOID CALLBACK runFunc(
                PTP_CALLBACK_INSTANCE Instance,
                PVOID Context,
                PTP_WORK Work) {

   ThreadGroup &thread = *((ThreadGroup*) Context);
   long id = InterlockedIncrement(&(thread.threadCount));
   DWORD tid = (id-1)%thread.size();
   thread[tid]->run();
}

void run_threads(ThreadGroup* thread_group) {
    SetThreadpoolThreadMaximum(thread_group->pool, thread_group->size());
    SetThreadpoolThreadMinimum(thread_group->pool, thread_group->size());

    TP_WORK *worker = CreateThreadpoolWork(runFunc, (void*) thread_group, &thread_group->cbe);
    thread_group->threadCount = 0;
    for (int i=0; isize(); i++) {
        SubmitThreadpoolWork(worker);
     }  
     WaitForThreadpoolWorkCallbacks(worker,FALSE);  
     CloseThreadpoolWork(worker);   
}       

void main() {

   ThreadGroup group;
   std::vector vec(10000, 0);
   for (int i=0; i<10; i++)
      group.push_back(new IncrementVectorThread(i, 10, vec));

   run_threads(&group);
   run_threads(&group);
   run_threads(&group);

   // now, vec should be == std::vector(10000, 3);       
}

Итак, если я хорошо понял:
- команда CreateThreadpool создает группу потоков (следовательно, вызов CreateThreadpoolWork дешев, поскольку не call CreateThread)
- я могу иметь столько пулов потоков, сколько хочу (если я хочу создать пул потоков для «IncrementVector» и один для моих потоков «DecrementVector», я могу).
- если мне нужно разделить мою задачу «вектор приращения» на 10 потоков, вместо того, чтобы вызывать 10 раз CreateThread, я создаю одного «worker» и отправляю его 10 раз в ThreadPool с тем же параметр (следовательно, мне нужен идентификатор потока в обратном вызове, чтобы знать, какую часть моего std :: vector нужно увеличивать). Здесь я не смог найти идентификатор потока, так как функция GetCurrentThreadId () возвращает реальный идентификатор потока (т.е. что-то вроде 1528, а не что-то между 0..nb_launched_threads).

Наконец, я не уверен, что хорошо понял концепцию: действительно ли мне нужен один рабочий процесс, а не 10, если я разделю свой std :: vector на 10 потоков?

Спасибо!

6
задан 15 revs 23 May 2017 в 12:33
поделиться