Я пытаюсь записать простую программу пула потоков в pthread. Однако это кажется этим pthread_cond_signal
не блокируется, который создает проблему. Например, скажем, у меня есть программа "производителя-потребителя":
pthread_cond_t my_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t my_cond_m = PTHREAD_MUTEX_INITIALIZER;
void * liberator(void * arg)
{
// XXX make sure he is ready to be freed
sleep(1);
pthread_mutex_lock(&my_cond_m);
pthread_cond_signal(&my_cond);
pthread_mutex_unlock(&my_cond_m);
return NULL;
}
int main()
{
pthread_t t1;
pthread_create(&t1, NULL, liberator, NULL);
// XXX Don't take too long to get ready. Otherwise I'll miss
// the wake up call forever
//sleep(3);
pthread_mutex_lock(&my_cond_m);
pthread_cond_wait(&my_cond, &my_cond_m);
pthread_mutex_unlock(&my_cond_m);
pthread_join(t1, NULL);
return 0;
}
Как описано в двух XXX
метки, если я устраняю sleep
вызовы, затем main()
может остановиться, потому что это пропустило вызов пробуждения от liberator()
. Конечно, sleep
очень устойчивый путь не состоит в том, чтобы гарантировать это также.
В реальной ситуации это было бы рабочим потоком, говоря потоку менеджера, что это готово к работе или потоку менеджера, объявляющему, что новая работа доступна.
Как Вы сделали бы это надежно в pthread?
Вид ответа @Borealid работ, но его объяснение проблемы мог быть лучше. Я предлагаю любого смотрящего на этот вопрос считать обсуждение в комментариях для понимания то, что продолжается.
В частности, я сам исправил бы его ответ и пример кода как это, для создания этого более ясным. (Так как исходный ответ Borealid, в то время как скомпилировано и работается, смутил меня много),
// In main
pthread_mutex_lock(&my_cond_m);
// If the flag is not set, it means liberator has not
// been run yet. I'll wait for him through pthread's signaling
// mechanism
// If it _is_ set, it means liberator has been run. I'll simply
// skip waiting since I've already synchronized. I don't need to
// use pthread's signaling mechanism
if(!flag) pthread_cond_wait(&my_cond, &my_cond_m);
pthread_mutex_unlock(&my_cond_m);
// In liberator thread
pthread_mutex_lock(&my_cond_m);
// Signal anyone who's sleeping. If no one is sleeping yet,
// they should check this flag which indicates I have already
// sent the signal. This is needed because pthread's signals
// is not like a message queue -- a sent signal is lost if
// nobody's waiting for a condition when it's sent.
// You can think of this flag as a "persistent" signal
flag = 1;
pthread_cond_signal(&my_cond);
pthread_mutex_unlock(&my_cond_m);
Использовать переменную синхронизации.
В main
:
pthread_mutex_lock(&my_cond_m);
while (!flag) {
pthread_cond_wait(&my_cond, &my_cond_m);
}
pthread_mutex_unlock(&my_cond_m);
В потоке:
pthread_mutex_lock(&my_cond_m);
flag = 1;
pthread_cond_broadcast(&my_cond);
pthread_mutex_unlock(&my_cond_m);
Для проблемы производитель-потребитель, это будет потребитель спит, когда буфер пуст, а производитель спит, когда он заполнен. Не забудьте получить блокировку перед обращением к глобальной переменной.
Я нашел решение здесь . Для меня сложность понимания проблемы заключается в следующем:
Чтобы проиллюстрировать, упомянутая выше запись в блоге продемонстрировала, что это действительно значимое и желательное поведение:
pthread_mutex_lock(&cond_mutex);
pthread_cond_broadcast(&cond):
pthread_cond_wait(&cond, &cond_mutex);
pthread_mutex_unlock(&cond_mutex);
Идея состоит в том, что, если и производители, и потребители используют эту логику, для любого из них будет безопасно сначала спать , так как каждый сможет разбудить другую роль. Другими словами, в типичном сценарии производитель-потребитель: если потребителю нужно спать, это потому, что производитель должен проснуться, и наоборот. Упаковать эту логику в одно условие pthread имеет смысл.
Конечно, приведенный выше код имеет непредвиденное поведение: рабочий поток также разбудит другой спящий рабочий поток, когда он на самом деле просто хочет разбудить производителя. Это может быть решено простой проверкой переменных, как предлагает @Borealid:
while(!work_available) pthread_cond_wait(&cond, &cond_mutex);
После рабочего вещания все рабочие потоки будут пробуждены, но один за другим (из-за неявной блокировки мьютекса в pthread_cond_wait
). Поскольку один из рабочих потоков будет использовать работу (установка work_available
обратно на false
), когда другие рабочие потоки проснутся и фактически приступят к работе, работа будет недоступна, поэтому рабочий будет спать снова.
Вот некоторые закомментированные коды, которые я тестировал, для всех, кто интересуется:
// gcc -Wall -pthread threads.c -lpthread
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <assert.h>
pthread_cond_t my_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t my_cond_m = PTHREAD_MUTEX_INITIALIZER;
int * next_work = NULL;
int all_work_done = 0;
void * worker(void * arg)
{
int * my_work = NULL;
while(!all_work_done)
{
pthread_mutex_lock(&my_cond_m);
if(next_work == NULL)
{
// Signal producer to give work
pthread_cond_broadcast(&my_cond);
// Wait for work to arrive
// It is wrapped in a while loop because the condition
// might be triggered by another worker thread intended
// to wake up the producer
while(!next_work && !all_work_done)
pthread_cond_wait(&my_cond, &my_cond_m);
}
// Work has arrived, cache it locally so producer can
// put in next work ASAP
my_work = next_work;
next_work = NULL;
pthread_mutex_unlock(&my_cond_m);
if(my_work)
{
printf("Worker %d consuming work: %d\n", (int)(pthread_self() % 100), *my_work);
free(my_work);
}
}
return NULL;
}
int * create_work()
{
int * ret = (int *)malloc(sizeof(int));
assert(ret);
*ret = rand() % 100;
return ret;
}
void * producer(void * arg)
{
int i;
for(i = 0; i < 10; i++)
{
pthread_mutex_lock(&my_cond_m);
while(next_work != NULL)
{
// There's still work, signal a worker to pick it up
pthread_cond_broadcast(&my_cond);
// Wait for work to be picked up
pthread_cond_wait(&my_cond, &my_cond_m);
}
// No work is available now, let's put work on the queue
next_work = create_work();
printf("Producer: Created work %d\n", *next_work);
pthread_mutex_unlock(&my_cond_m);
}
// Some workers might still be waiting, release them
pthread_cond_broadcast(&my_cond);
all_work_done = 1;
return NULL;
}
int main()
{
pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, worker, NULL);
pthread_create(&t2, NULL, worker, NULL);
pthread_create(&t3, NULL, worker, NULL);
pthread_create(&t4, NULL, worker, NULL);
producer(NULL);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
return 0;
}