Реализация задания перечисляет с внутренней синхронизацией

Я работаю над простой платформой поточной обработки задания, которая очень похожа на тот, описанный в идентификационной Технологии 5 проблем. На наиболее базовом уровне у меня есть ряд списков заданий, и я хочу запланировать, они перечисляют через набор потоков ЦП (использующий пул стандартной резьбы для фактической диспетчеризации.) Однако интересно, как это предупреждает/ожидает, что материал в списке ожидания может быть эффективно реализован. Насколько я понимаю маркер ожидания блокирует выполнение списка, если маркер сигнала не был выполнен. Это неявно означает, что все перед сигналом должно закончиться, прежде чем сигнал может быть повышен. Так скажем, у нас есть список как это:

J1, J2, S, J3, W, J4

затем диспетчеризация может пойти как это:

#1: J1, J2, J3

#2: J4

Однако это не настолько легко, как это кажется, как данное ряд списков, я должен был бы переместить некоторых из них между ready и waiting и также имейте специальный код, чтобы собрать все задания перед сигналом и отметить что-то на них, так, чтобы они могли инициировать сигнал, если и только если они все закончили (подразумевать, например, что больше не возможно добавить задания к списку, в то время как он выполняется, поскольку после сигналов получают доступ к ранее вставленным заданиям.)

Есть ли какой-либо "стандартный" способ реализовать это эффективно? Я также задаюсь вопросом, как лучше всего запланировать выполнение списка задания, прямо сейчас, каждое ядро захватывает список задания и планирует все задания в нем, который дает довольно хорошее масштабирование (для 32k заданий à 0,7 мс, я получаю 101%, который я предполагаю, частично вследствие того, что единственная потоковая версия планируется на различные ядра несколько раз.)

7
задан Anteru 15 January 2010 в 12:31
поделиться

2 ответа

Это является относительно алгоритмом планирования планирования. Пару вопросов, по-видимому, поначалу сложно, но на самом деле не (Сигнал / ждать и местом кэша). Я объясню методы, а затем дайте какой-нибудь код, который я написал, чтобы проиллюстрировать концепции, а затем дать некоторые окончательные заметки на тюнинг.

Алгоритмы использования

Обработка сигнала / ждать эффективно, поначалу кажется сложно, но на самом деле оказывается чрезвычайно легким. Поскольку пары сигнала / ожидания не могут гнездиться или перекрываться, действительно могут быть только два удовлетворены, и в любое время ждали время. Просто указывая указатель «CurrentGignal» на самый последний неудовлетворенный сигнал, это все, что необходимо сделать для бухгалтерии.

Убедившись, что ядра не прыгают между списками слишком много, и что данный список не совпадает между слишком многими ядрами, также относительно проста: каждое ядро ​​продолжает принимать работу из одного и того же списка до тех пор, пока он не включает в себя. другой список. Чтобы сохранить все ядра от банды в одном списке, для каждого списка хранится рабочая запись, которая сообщает, сколько ядер используют его, и списки организованы, поэтому ядра выбирают списки с меньшим количеством работников.

Блокировка может быть простым путем блокировки только планировщика или списка, на которой вы работаете в любое время, никогда не оба.

Вы выразили обеспокоенность по поводу добавления заданий в список после того, как список уже запущен. Оказывается, что поддерживающее это почти тривиально: все, что ему нужно, это вызов из списка к планировке, когда задание добавляется в список, который в данный момент завершен, поэтому планировщик может запланировать новую работу.

Структуры данных

Вот основные структуры данных, которые вам понадобится:

class Scheduler
{
  LinkedList<JobList>[] Ready; // Indexed by number of cores working on list
  LinkedList<JobList> Blocked;
  int ReadyCount;
  bool Exit;

  public:
    void AddList(JobList* joblist);
    void DoWork();

  internal:
    void UpdateQueues(JobList* joblist);

    void NotifyBlockedCores();
    void WaitForNotifyBlockedCores();
}

class JobList
{
  Scheduler Scheduler;
  LinkedList<JobList> CurrentQueue;

  LinkedList<Job> Jobs;            // All jobs in the job list
  LinkedList<SignalPoint> Signals; // All signal/wait pairs in the job list,
                                      plus a dummy

  Job* NextJob;                    // The next job to schedule, if any
  int NextJobIndex;                // The index of NextJob

  SignalPoint* CurrentSignal;      // First signal not fully satisfied

  int WorkerCount;                 // # of cores executing in this list

  public:
    void AddJob(Job* job);
    void AddSignal();
    void AddWait();

  internal:
    void Ready { get; }
    void GetNextReadyJob(Job& job, int& jobIndex);
    void MarkJobCompleted(Job job, int jobIndex);
}
class SignalPoint
{
  int SignalJobIndex = int.MaxValue;
  int WaitJobIndex = int.MaxValue;
  int IncompleteCount = 0;
}

Обратите внимание, что точки сигнала для данного доступа, наиболее удобно хранятся отдельно от фактического списка рабочих мест.

Реализация планировщика

Планировщик сохраняет отслеживание списков заданий, назначает их к ядрам и выполняет задания из списков заданий.

addList добавляет задание на планировщик. Он должен быть помещен на готовую или заблокированную очередь в зависимости от того, есть ли она какая-либо работа (т. Е. Независимо от того, были ли какие-либо работы были добавлены к нему еще), поэтому просто вызовите Usersqueues.

void Scheduler.AddList(JobList* joblist)
{
  joblist.Scheduler = this;
  UpdateQueues(joblist);
}

UssentQueues централизует логику обновления очереди. Обратите внимание на алгоритм для выбора новой очереди, а также уведомление о простых ядрах, когда работа становится доступной:

void Scheduler.UpdateQueues(JobList* joblist)
{
  lock(this)
  {
    // Remove from prior queue, if any
    if(joblist.CurrentQueue!=null)
    {
      if(joblist.CurrentQueue!=Blocked) ReadyCount--;
      joblist.CurrentQueue.Remove(joblist);
    }

    // Select new queue
    joblist.CurrentQueue = joblist.Ready ? Ready[joblist.WorkerCount] : Blocked;

    // Add to new queue
    joblist.CurrentQueue.Add(joblist);
    if(joblist.CurrentQueue!=Blocked)
      if(++ReadyCount==1) NotifyBlockedCores();
  }
}

Dowork является нормальной работой планировщика, за исключением случаев: 1. Это выбирает Joblist с наименьшим количеством работников, 2. Работает рабочие места от Данный срок, пока он не может больше не может, и 3. Он хранит Jobindex, а также работу, поэтому Joblist может легко обновить состояние завершения (деталь реализации).

void Scheduler.DoWork()
{
  while(!Exit)
  {
    // Get a job list to work on
    JobList *list = null;
    lock(this)
    {
      for(int i=0; i<Ready.Length; i++)
        if(!Ready[i].Empty)
        {
          list = Ready[i].First;
          break;
        }
      if(list==null)  // No work to do
      {
        WaitForNotifyBlockedCores();
        continue;
      }
      list.WorkerCount++;
      UpdateQueues(list);
    }

    // Execute jobs in the list as long as possible
    while(true)
    {
      int jobIndex;
      Job job;
      if(!GetNextReadyJob(&job, &jobIndex)) break;

      job.Execute();

      list.MarkJobCompleted(job, jobIndex);
    }

    // Release the job list
    lock(this)
    {
      list.WorkerCount--;
      UpdateQueues(list);
    }
  }
}

Драйверная реализация

Дрогатор отслеживает, как сигнал / ожидания перемежаются с заданиями и отслеживают, какие пары сигнала / ожидания уже завершили все до их сигнала.

Конструктор создает пункт фиктивного сигнала, чтобы добавить задания. Эта точка сигнала становится реальной сигнальной точкой (и добавляется новый манекен) всякий раз, когда добавляется новый «сигнал».

JobList.JobList()
{
  // Always have a dummy signal point at the end
  Signals.Add(CurrentSignal = new SignalPoint());
}

AddJob добавляет задание в список. Это помечено как неполное на точке сигнализации. Когда работа на самом деле выполняется, неполноцентом одинаковой сигнализации уменьшается. Также необходимо сказать планировцу, что может быть изменено, поскольку новая работа может быть немедленно исполняемой. Обратите внимание, что планировщик вызывается после того, как блокировка на «это» выпущено, чтобы избежать тупика.

void JobList.AddJob(Job job)
{
  lock(this)
  {
    Jobs.Add(job);
    Signals.Last.IncompleteCount++;
    if(NextJob == null)
      NextJob = job;
  }
  if(Scheduler!=null)
    Scheduler.UpdateQueues(this);
}

AddSignal и Addwait добавляют сигналы и ждет списка заданий. Обратите внимание, что AddSignal на самом деле создает новую сигнальную точку, и Addwait просто заполняет информацию о точке ожидания в ранее созданной точке сигнализации.

void JobList.AddSignal()
{
  lock(this)
  {
    Signals.Last.SignalJobIndex = Jobs.Count;  // Reify dummy signal point
    Signals.Add(new SignalPoint());            // Create new dummy signal point
  }
}


void JobList.AddWait()
{
  lock(this)
  {
    Signals.Last.Previous.WaitJobIndex = Jobs.Count;
  }
}

Готовое свойство определяет, готов ли список для дополнительных ядер, назначенных ему. Может быть два или три ядра, работающие в списке без списка «готов», если следующая задача ждет сигнала, прежде чем он может начать.

bool JobList.Ready
{
  get
  {
    lock(this)
    {
      return NextJob!=null &&
        (CurrentSignal==Signals.Last ||
         NextJobIndex < CurrentSignal.WaitJobIndex);
    }
  }
}

getnextreadyjob очень прост: если мы будем готовы, просто верните следующую работу в списке.

void JobList.GetNextReadyJob(Job& job, int& jobIndex)
{
  lock(this)
  {
    if(!Ready) return false;
    jobIndex = list.NextJobIndex++;
    job = list.NextJob; list.NextJob = job.Next;
    return true;

  }
}

MarkJobCompleted, вероятно, самый интересный из всех. Из-за структуры сигналов и ожидания текущая задача представляет собой либо перед CurrentGignal, либо между CurrentSignal и CurrentGignal.Next (если оно после последнего фактического сигнала, он будет учитываться как между CurrentSignal и фиктивной точкой в ​​конце ). Нам нужно уменьшить количество неполных рабочих мест. Мы также можем понадобиться перейти к следующему сигналу, если этот счет идет до нуля. Конечно, мы никогда не проходим фиктивную сигнальную точку в конце.

Обратите внимание, что этот код не имеет звонка для планирования .updateQueue, потому что мы знаем, что планировщик будет вызывать GetNextreadyJob всего за секунду, и если он возвращает FALSE, он будет позвонить в любом случае.

void JobList.MarkJobCompleted(Job job, int jobIndex)
{
  lock(this)
  {
    if(jobIndex >= CurrentSignal.SignalJobIndex)
      CurrentSignal.Next.IncompleteCount--;
    else
    {
      CurrentSignal.IncompleteCount--;
      if(CurrentSignal.IncompleteCount==0)
        if(CurrentSignal.WaitJobIndex < int.MaxValue)
          CurrentSignal = CurrentSignal.Next;
    }
  }
}

Настройка на основе длины списка, оценки длины рабочих мест и т. Д.

Код выше не уделяет никакого внимания к тому, как длились списки рабочих мест,Поэтому, если есть сто крошечных списков рабочих мест и один огромный один, можно принимать отдельный крошечный список работы, а затем все собираются на огромное, что привело к неэффективности. Это можно решить, сделав готовый [] массив очередей приоритетов, приоритетных на (Joblist.jobs.Count - Joblist.nextjobindex) , но при приоритете фактически обновляется только в обычных ситуациях обновления для эффективности.

Это может быть еще более сложным, создавая эвристический, который учитывает количество и интервал комбинаций сигналов / ждать, чтобы определить приоритет. Эта эвристика будет лучше настроена с использованием распределения рабочих долговечников и использование ресурсов.

Если отдельные долгосрочные работы известны, или если для них доступны хорошие оценки, то эвристика может использовать оценочную оставшуюся продолжительность вместо просто длины списка.

Окончательные заметки

Это довольно стандартное решение проблемы, которую вы представляете. Вы можете использовать алгоритмы, которые я дал, и они будут работать, включая блокировку, но вы не сможете скомпилировать код, который я написал выше по нескольким причинам:

  1. Это сумасшедшая смесь синтаксиса C ++ и C #. Я первоначально начал писать в C #, затем изменил кучу синтаксиса в стиль C ++, поскольку я подумал, что это было более вероятным, что вы будете использовать для такого проекта. Но я оставил в довольно нескольких C # мозмах. К счастью, нет LINQ ;-).

  2. Детали LinkedList имеют размахивание рук. Я предполагаю, что список может сделать первое, последнее, добавить и удалять, и что элементы в списке могут делать предыдущие и дальше. Но я не использовал фактическую API для любого настоящего класса связанного списка, о котором я знаю.

  3. Я не скомпилировался и не проверил. Я гарантирую, что там есть ошибка или два.

Нижняя строка: Вам следует обрабатывать код выше как псевдокод, даже если он выглядит как настоящая Маккой.

Наслаждайтесь!

4
ответ дан 7 December 2019 в 12:20
поделиться

Если у вас есть доступ к фреймворку для кражи работы в вашем окружении (например, Cilk, если вы находитесь на C, или fork/join framework Doug Lea на Java), вы можете легко получить простое и чистое решение (по сравнению с низкоуровневыми специальными попытками, которые вам, вероятно, придется сделать, если вы не можете использовать что-то подобное), которое даст вам автоматическую балансировку нагрузки и хорошую локализацию данных.

Вот описание решения на высоком уровне: вы запускаете один поток на ядро. Каждому из них присваивается список до тех пор, пока он не будет исчерпан (много способов сделать это - это задача очень хороших механизмов одновременной очереди, и это причина, по которой Вы хотели бы по возможности обойтись без самостоятельных решений). Каждый работник проходит по строкам списков один за другим: - Поддерживаются две очереди, одна для этих заданий перед токеном сигнала , и одна или те, которые следуют за ним. - Когда встречается задание, оно является вилочным и добавляется в соответствующую очередь (в зависимости от того, видели ли мы маркер сигнала или нет). - Когда встречается токен wait, мы присоединяем все вакансии перед сигналом (это семантика, которую вы описываете, если я правильно понял). Обратите внимание, что в коде я использую helpJoin(), это означает, что поток будет действительно помогать (вытаскивая вилочные задачи и выполняя их до тех пор, пока соединение не сможет продолжаться)

"Вилка" означает помещение задачи в потоковую локальную очередь, которая будет либо выполняться самим потоком позднее, либо может быть украдена другим потоком, который ищет какую-либо работу.

Для иллюстрации приведена рабочая симуляция этого сценария в ~80 строк с использованием вышеупомянутой java-фреймворка. Она создает столько же потоков, сколько и доступных ядер, и некоторые списки, и начинает их выполнять. Обратите внимание, насколько прост метод run() - в то время как он все еще имеет преимущества балансировки нагрузки, и что потоки в основном выполняют задачи из своего собственного списка, если только они не заканчивают работу и не начинают воровать, чтобы получить некоторые из них. Конечно, если вы не работаете на Java или C, вам придется найти аналогичный фреймворк, но тот же самый набор основных идей также упростит ваш код, независимо от языка.

import java.util.*;
import java.util.concurrent.*;
import jsr166y.ForkJoinPool;
import jsr166y.ForkJoinTask;
import jsr166y.RecursiveTask;

public class FJTest {
    public static void main(String[] args) throws Exception {
        Iterable<List<TaskType>> lists = createLists(10);

        ForkJoinPool pool = new ForkJoinPool();

        for (final List<TaskType> list : lists) {
            pool.submit(new Runnable() {
                public void run() {
                    List<ForkJoinTask> beforeSignal = new ArrayList<ForkJoinTask>();
                    List<ForkJoinTask> afterSignal = new ArrayList<ForkJoinTask>();
                    boolean signaled = false;
                    for (TaskType task : list) {
                        switch (task) {
                            case JOB:
                                ForkJoinTask job = new Job();
                                if (signaled == false)
                                    beforeSignal.add(job);
                                else
                                    afterSignal.add(job);
                                job.fork();
                                break;
                            case SIGNAL:
                                signaled = true;
                                break;
                            case WAIT:
                                signaled = false;
                                for (ForkJoinTask t : beforeSignal) {
                                    t.helpJoin();
                                }
                                beforeSignal = afterSignal;
                                afterSignal = new ArrayList<ForkJoinTask>();
                        }
                    }
                }
            });
        }

        pool.shutdown();
        pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
    }

    private static Iterable<List<TaskType>> createLists(int size) {
        List<List<TaskType>> tasks = new ArrayList<List<TaskType>>();
        for (int i = 0; i < size; i++) {
            tasks.add(createSomeList());
        }
        return tasks;
    }

    private static List<TaskType> createSomeList() {
        return Arrays.asList(
                TaskType.JOB,
                TaskType.JOB,
                TaskType.SIGNAL,
                TaskType.JOB,
                TaskType.WAIT,
                TaskType.JOB);
    }

}

enum TaskType {
    JOB, SIGNAL, WAIT;
}
class Job extends RecursiveTask<Void> {
    @Override
    protected Void compute() {
        long x = 1;
        for (long i = 1; i < 200000001; i++) {
            x = i * x;
        }
        System.out.println(x); //just to use x
        return null;
    }
}
1
ответ дан 7 December 2019 в 12:20
поделиться
Другие вопросы по тегам:

Похожие вопросы: