Эффективные независимые синхронизируемые блоки?

У меня есть сценарий, где в определенные моменты в моей программе поток должен обновить несколько структур совместно используемых данных. Каждая структура данных может быть безопасно обновлена параллельно с любой другой структурой данных, но каждая структура данных может только быть обновлена одним потоком за один раз. Простым, наивным путем я выразил, это в моем коде:

synchronized updateStructure1();
synchronized updateStructure2();
// ...

Это кажется неэффективным, потому что, если несколько потоков пытаются обновить структуру 1, но никакой поток не пытается обновить структуру 2, они все заблокируют ожидание блокировки, которая защищает структуру 1, в то время как блокировка для структуры 2 находится невзятая.

Существует ли "стандартный" способ исправить это? Другими словами, существует ли стандартная поточная обработка, примитивная, который пытается обновить все структуры циклическим способом, блоки, только если все блокировки взяты, и возвраты, когда все структуры обновляются?

Это несколько вопрос об агностике языка, но в случае, если он помогает, язык, который я использую, является D.

5
задан dsimcha 12 July 2010 в 17:26
поделиться

5 ответов

Я не знаю, есть ли стандартный способ сделать это. Однако, я бы реализовал это примерно так:

do
{
    if (!updatedA && mutexA.tryLock())
    {
        scope(exit) mutexA.unlock();
        updateA();
        updatedA = true;
    }

    if (!updatedB && mutexB.tryLock())
    {
        scope(exit) mutexB.unlock();
        updateB();
        updatedB = true;
    }
}
while (!(updatedA && updatedB));

Некоторое умное метапрограммирование, вероятно, могло бы сократить количество повторений, но я оставляю это в качестве упражнения для вас.

1
ответ дан 15 December 2019 в 00:48
поделиться

Если ваш язык поддерживает легковесные потоки или Actors, вы всегда можете заставить поток обновления порождать новый поток для изменения каждого объекта, где каждый поток просто блокирует, изменяет и разблокирует каждый объект. Затем перед возвратом поток обновления присоединяется ко всем своим дочерним потокам. Это перекладывает проблему на расписание среды выполнения, и она вольна планировать эти дочерние потоки любым способом для достижения наилучшей производительности.

Вы можете сделать это в языках с более тяжелыми потоками, но порождение и объединение могут иметь слишком большие накладные расходы (хотя объединение потоков может смягчить некоторые из них).

2
ответ дан 15 December 2019 в 00:48
поделиться

Насколько мне известно, стандартного способа добиться этого не существует, и вам придется запачкать руки.

Если перефразировать ваши требования, у вас есть набор структур данных, и вам нужно выполнять над ними работу, но не в каком-то определенном порядке. Вы хотите блокировать ожидание структуры данных только в том случае, если все остальные объекты заблокированы. Вот псевдокод, на котором я бы основывал свое решение:

work = unshared list of objects that need updating
while work is not empty:
    found = false
    for each obj in work:
        try locking obj
        if successful:
            remove obj from work
            found = true
            obj.update()
            unlock obj
    if !found:
        // Everything is locked, so we have to wait
        obj = randomly pick an object from work
        remove obj from work
        lock obj
        obj.update()
        unlock obj

Обновляющий поток будет блокироваться, только если он обнаружит, что все объекты, которые ему нужно использовать, заблокированы. Тогда он должен чего-то ждать, поэтому он просто выбирает один объект и блокирует его. В идеале, он должен выбрать объект, который будет разблокирован раньше всех, но нет простого способа определить это.

Кроме того, можно предположить, что объект может стать свободным, пока программа обновления находится в цикле try, и тогда программа обновления пропустит его. Но если объем выполняемой вами работы достаточно велик по сравнению с затратами на итерации в этом цикле, то ложный конфликт должен быть редким, и он будет иметь значение только в случаях чрезвычайно высокой конкуренции.

0
ответ дан 15 December 2019 в 00:48
поделиться

Стандартного решения нет, скорее есть класс стандартных решений в зависимости от ваших потребностей.

http://en.wikipedia.org/wiki/Scheduling_algorithm

0
ответ дан 15 December 2019 в 00:48
поделиться

Я не знаю никакого "стандартного" способа сделать это, извините. Итак, это просто ThreadGroup, абстрагированный Swarm-классом, который "рубит" список заданий, пока все не будут выполнены, в стиле round-robin, и следит за тем, чтобы использовалось как можно больше потоков. Я не знаю, как это сделать без списка заданий.

Оговорка: я очень новичок в D и программировании параллелизма, поэтому код довольно дилетантский. Я рассматривал это скорее как забавное упражнение. (Я также понимаю, что это не совсем то, что вы ищете. Если у кого-то есть какие-либо рекомендации, я с удовольствием их выслушаю!

import  core.thread,
        core.sync.mutex,
        std.c.stdio,
        std.stdio;

class Swarm{
    ThreadGroup group;
    Mutex mutex;
    auto numThreads = 1;
    void delegate ()[int] jobs;
    this(void delegate()[int] aJobs, int aNumThreads){
        jobs = aJobs;
        numThreads = aNumThreads;
        group = new ThreadGroup;
        mutex = new Mutex();
    }
    void runBlocking(){
        run();
        group.joinAll();
    }
    void run(){
        foreach(c;0..numThreads)
            group.create( &swarmJobs );
    }
    void swarmJobs(){
        void delegate () myJob;
        do{
            myJob = null;
            synchronized(mutex){
                if(jobs.length > 0)
                    foreach(i,job;jobs){
                        myJob = job;
                        jobs.remove(i);
                        break;
                    }
            }
            if(myJob)
                myJob();
        }while(myJob)
    }
}
class Jobs{
    void job1(){
        foreach(c;0..1000){
            foreach(j;0..2_000_000){}
            writef("1");
            fflush(core.stdc.stdio.stdout);
        }
    }
    void job2(){
        foreach(c;0..1000){
            foreach(j;0..1_000_000){}
            writef("2");
            fflush(core.stdc.stdio.stdout);
        }
    }
}
void main(){
    auto jobs = new Jobs();
    void delegate ()[int] jobsList = 
         [1:&jobs.job1,2:&jobs.job2,3:&jobs.job1,4:&jobs.job2];
    int numThreads = 2;
    auto swarm = new Swarm(jobsList,numThreads);
    swarm.runBlocking();
    writefln("end");
}
0
ответ дан 15 December 2019 в 00:48
поделиться
Другие вопросы по тегам:

Похожие вопросы: