Блок ThreadPoolExecutor, когда очередь заполнена?

Проблема в том, что вы вызываете

threadCount = omp_get_num_threads();

вне параллельного блока, так что это 1, и ваш цикл внутри

mulMatrAndVecMP(vec1, A, z, (threadNum * N) / threadCount, N / threadCount, N);

выходит за пределы.

Установка

threadCount = 4 

вместо этого должна решить вашу проблему.

Как вы можете прочитать здесь , вызов возвращает количество потоков в текущей команде, а текущей команды нет.

Редактировать: будьте осторожны, если N не делится на количество потоков: ваш код пропускает некоторые строки умножения.

53
задан Hearen 19 March 2019 в 18:02
поделиться

2 ответа

Довольно простая опция состоит в том, чтобы обернуть Ваш BlockingQueue с реализацией, которая звонит put(..), когда offer(..) вызывается:

public class BlockOnOfferAdapter<T> implements BlockingQueue<T> {

(..)

  public boolean offer(E o) {
        try {
            delegate.put(o);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
        return true;
  }

(.. implement all other methods simply by delegating ..)

}

Это работает, потому что значением по умолчанию put(..) ожидает, пока нет способность в очереди, когда это полно, см. :

    /**
     * Inserts the specified element into this queue, waiting if necessary
     * for space to become available.
     *
     * @param e the element to add
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    void put(E e) throws InterruptedException;

Никакая ловля RejectedExecutionException или сложная необходимая блокировка.

0
ответ дан 7 November 2019 в 08:48
поделиться

В некоторых очень узких обстоятельствах вы можете реализовать java.util.concurrent.RejectedExecutionHandler, который сделает то, что вам нужно.

RejectedExecutionHandler block = new RejectedExecutionHandler() {
  rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
     executor.getQueue().put( r );
  }
};

ThreadPoolExecutor pool = new ...
pool.setRejectedExecutionHandler(block);

Сейчас. Это очень плохая идея по следующим причинам

  • Она подвержена тупиковой ситуации, потому что все потоки в пуле могут умереть до того, как то, что вы поместили в очередь, станет видимым. Чтобы смягчить это, установите разумное время сохранения активности.
  • Задача не упакована так, как может ожидать ваш исполнитель. Многие реализации исполнителей перед выполнением оборачивают свои задачи в какой-то объект отслеживания. Посмотри на свой источник.
  • Добавление через getQueue () категорически не рекомендуется API и может быть запрещено в какой-то момент.

Почти всегда лучшая стратегия - установить ThreadPoolExecutor.CallerRunsPolicy, который будет ограничивать ваше приложение, выполняя задачу в потоке, вызывающем execute ().

Однако иногда стратегия блокировки со всеми присущими ей рисками - это действительно то, что вам нужно. Я бы сказал, что в этих условиях

  • у вас есть только один поток, вызывающий execute ()
  • Вы должны (или хотите) иметь очень маленькую длину очереди
  • Вам абсолютно необходимо ограничить количество потоков, выполняющих это работают (обычно по внешним причинам), и стратегия запуска вызывающего абонента нарушит это.
  • Ваши задачи имеют непредсказуемый размер, поэтому запуски вызывающего объекта могут привести к истощению, если пул на мгновение был занят 4 короткими задачами, а ваш единственный поток, вызывающий выполнение, застрял на большой.

Итак, как я сказал. Это редко нужно и может быть опасно, но вот и все.

Удачи.

62
ответ дан 7 November 2019 в 08:48
поделиться