В pthread, как надежно передать сигнал другому потоку?

Я пытаюсь записать простую программу пула потоков в pthread. Однако это кажется этим pthread_cond_signal не блокируется, который создает проблему. Например, скажем, у меня есть программа "производителя-потребителя":

pthread_cond_t my_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t my_cond_m = PTHREAD_MUTEX_INITIALIZER;

void * liberator(void * arg)
{
    // XXX make sure he is ready to be freed
    sleep(1);

    pthread_mutex_lock(&my_cond_m);
    pthread_cond_signal(&my_cond);
    pthread_mutex_unlock(&my_cond_m);

    return NULL;
}

int main()
{
    pthread_t t1;
    pthread_create(&t1, NULL, liberator, NULL);

    // XXX Don't take too long to get ready. Otherwise I'll miss 
    // the wake up call forever
    //sleep(3);

    pthread_mutex_lock(&my_cond_m);
    pthread_cond_wait(&my_cond, &my_cond_m);
    pthread_mutex_unlock(&my_cond_m);

    pthread_join(t1, NULL);

    return 0;
}

Как описано в двух XXX метки, если я устраняю sleep вызовы, затем main() может остановиться, потому что это пропустило вызов пробуждения от liberator(). Конечно, sleep очень устойчивый путь не состоит в том, чтобы гарантировать это также.

В реальной ситуации это было бы рабочим потоком, говоря потоку менеджера, что это готово к работе или потоку менеджера, объявляющему, что новая работа доступна.

Как Вы сделали бы это надежно в pthread?


Разработка

Вид ответа @Borealid работ, но его объяснение проблемы мог быть лучше. Я предлагаю любого смотрящего на этот вопрос считать обсуждение в комментариях для понимания то, что продолжается.

В частности, я сам исправил бы его ответ и пример кода как это, для создания этого более ясным. (Так как исходный ответ Borealid, в то время как скомпилировано и работается, смутил меня много),

// In main
pthread_mutex_lock(&my_cond_m);

// If the flag is not set, it means liberator has not 
// been run yet. I'll wait for him through pthread's signaling 
// mechanism

// If it _is_ set, it means liberator has been run. I'll simply 
// skip waiting since I've already synchronized. I don't need to 
// use pthread's signaling mechanism
if(!flag) pthread_cond_wait(&my_cond, &my_cond_m);

pthread_mutex_unlock(&my_cond_m);

// In liberator thread
pthread_mutex_lock(&my_cond_m);

// Signal anyone who's sleeping. If no one is sleeping yet, 
// they should check this flag which indicates I have already 
// sent the signal. This is needed because pthread's signals 
// is not like a message queue -- a sent signal is lost if 
// nobody's waiting for a condition when it's sent.
// You can think of this flag as a "persistent" signal
flag = 1;
pthread_cond_signal(&my_cond);
pthread_mutex_unlock(&my_cond_m);
12
задан kizzx2 11 August 2010 в 16:35
поделиться

2 ответа

Использовать переменную синхронизации.

В main:

pthread_mutex_lock(&my_cond_m);
while (!flag) {
    pthread_cond_wait(&my_cond, &my_cond_m);
}
pthread_mutex_unlock(&my_cond_m);

В потоке:

pthread_mutex_lock(&my_cond_m);
flag = 1;
pthread_cond_broadcast(&my_cond);
pthread_mutex_unlock(&my_cond_m);

Для проблемы производитель-потребитель, это будет потребитель спит, когда буфер пуст, а производитель спит, когда он заполнен. Не забудьте получить блокировку перед обращением к глобальной переменной.

7
ответ дан 2 December 2019 в 22:03
поделиться

Я нашел решение здесь . Для меня сложность понимания проблемы заключается в следующем:

  1. Производители и потребители должны иметь возможность общаться в обоих направлениях. В любом случае недостаточно.
  2. Эта двусторонняя связь может быть упакована в одно состояние pthread.

Чтобы проиллюстрировать, упомянутая выше запись в блоге продемонстрировала, что это действительно значимое и желательное поведение:

pthread_mutex_lock(&cond_mutex);
pthread_cond_broadcast(&cond):
pthread_cond_wait(&cond, &cond_mutex);
pthread_mutex_unlock(&cond_mutex);

Идея состоит в том, что, если и производители, и потребители используют эту логику, для любого из них будет безопасно сначала спать , так как каждый сможет разбудить другую роль. Другими словами, в типичном сценарии производитель-потребитель: если потребителю нужно спать, это потому, что производитель должен проснуться, и наоборот. Упаковать эту логику в одно условие pthread имеет смысл.

Конечно, приведенный выше код имеет непредвиденное поведение: рабочий поток также разбудит другой спящий рабочий поток, когда он на самом деле просто хочет разбудить производителя. Это может быть решено простой проверкой переменных, как предлагает @Borealid:

while(!work_available) pthread_cond_wait(&cond, &cond_mutex);

После рабочего вещания все рабочие потоки будут пробуждены, но один за другим (из-за неявной блокировки мьютекса в pthread_cond_wait ). Поскольку один из рабочих потоков будет использовать работу (установка work_available обратно на false ), когда другие рабочие потоки проснутся и фактически приступят к работе, работа будет недоступна, поэтому рабочий будет спать снова.

Вот некоторые закомментированные коды, которые я тестировал, для всех, кто интересуется:

// gcc -Wall -pthread threads.c -lpthread

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <assert.h>

pthread_cond_t my_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t my_cond_m = PTHREAD_MUTEX_INITIALIZER;

int * next_work = NULL;
int all_work_done = 0;

void * worker(void * arg)
{
    int * my_work = NULL;

    while(!all_work_done)
    {
        pthread_mutex_lock(&my_cond_m);

        if(next_work == NULL)
        {
            // Signal producer to give work
            pthread_cond_broadcast(&my_cond);

            // Wait for work to arrive
            // It is wrapped in a while loop because the condition 
            // might be triggered by another worker thread intended 
            // to wake up the producer
            while(!next_work && !all_work_done)
                pthread_cond_wait(&my_cond, &my_cond_m);
        }

        // Work has arrived, cache it locally so producer can 
        // put in next work ASAP
        my_work = next_work;
        next_work = NULL;
        pthread_mutex_unlock(&my_cond_m);

        if(my_work)
        {
            printf("Worker %d consuming work: %d\n", (int)(pthread_self() % 100), *my_work);
            free(my_work);
        }
    }

    return NULL;
}

int * create_work()
{
    int * ret = (int *)malloc(sizeof(int));
    assert(ret);
    *ret = rand() % 100;
    return ret;
}

void * producer(void * arg)
{
    int i;

    for(i = 0; i < 10; i++)
    {
        pthread_mutex_lock(&my_cond_m);
        while(next_work != NULL)
        {
            // There's still work, signal a worker to pick it up
            pthread_cond_broadcast(&my_cond);

            // Wait for work to be picked up
            pthread_cond_wait(&my_cond, &my_cond_m);
        }

        // No work is available now, let's put work on the queue
        next_work = create_work();
        printf("Producer: Created work %d\n", *next_work);

        pthread_mutex_unlock(&my_cond_m);
    }

    // Some workers might still be waiting, release them
    pthread_cond_broadcast(&my_cond);

    all_work_done = 1;
    return NULL;
}

int main()
{
    pthread_t t1, t2, t3, t4;

    pthread_create(&t1, NULL, worker, NULL);
    pthread_create(&t2, NULL, worker, NULL);
    pthread_create(&t3, NULL, worker, NULL);
    pthread_create(&t4, NULL, worker, NULL);

    producer(NULL);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_join(t3, NULL);
    pthread_join(t4, NULL);
    return 0;
}
4
ответ дан 2 December 2019 в 22:03
поделиться
Другие вопросы по тегам:

Похожие вопросы: