Я пытаюсь параллелизировать функцию свертки в C. Вот исходная функция, которая скручивает два массива 64-разрядных плаваний:
void convolve(const Float64 *in1,
UInt32 in1Len,
const Float64 *in2,
UInt32 in2Len,
Float64 *results)
{
UInt32 i, j;
for (i = 0; i < in1Len; i++) {
for (j = 0; j < in2Len; j++) {
results[i+j] += in1[i] * in2[j];
}
}
}
Для обеспечения параллелизма (без семафоров), я создал функцию, которая вычисляет результат для особого положения в results
массив:
void convolveHelper(const Float64 *in1,
UInt32 in1Len,
const Float64 *in2,
UInt32 in2Len,
Float64 *result,
UInt32 outPosition)
{
UInt32 i, j;
for (i = 0; i < in1Len; i++) {
if (i > outPosition)
break;
j = outPosition - i;
if (j >= in2Len)
continue;
*result += in1[i] * in2[j];
}
}
Проблема, с помощью convolveHelper
замедляет код приблизительно 3,5 раза (при работе единственного потока).
Любые идеи о том, как я могу ускорение convolveHelper
, при поддержании потокобезопасности?
Свертки во временной области становятся умножениями в области Фурье. Я предлагаю вам взять быструю библиотеку БПФ (например, FFTW ) и использовать ее. Вы перейдете от O (n ^ 2) к O (n log n).
Алгоритмические оптимизации почти всегда превосходят микрооптимизации.
Если ваши массивы не очень большие, использование потока вряд ли действительно поможет, поскольку накладные расходы на запуск потока будут больше, чем стоимость циклов. Однако предположим, что ваши массивы велики, и многопоточность - это чистый выигрыш. В этом случае я бы сделал следующее:
convolveHelper
, который слишком сложен и мало поможет. Разделите внутреннюю часть цикла на функцию потока. Т.е. просто сделайте
for (j = 0; j
в свою собственную функцию, которая принимает i
в качестве параметра вместе со всем остальным.
convolve
просто запускало потоки. Для максимальной эффективности используйте семафор, чтобы убедиться, что вы никогда не создаете больше потоков, чем у вас есть ядер. Вместо двух if
в цикле, вы можете вычислить правильные минимальные / максимальные значения для i
перед циклом.
Вы рассчитываете каждую позицию результата отдельно. Вместо этого вы можете разделить массив results
на блоки и заставить каждый поток вычислять блок. Расчет для блока будет выглядеть как функция convolve
.
Я наконец понял, как правильно предварительно вычислить начальный / конечный индексы (предложение, данное как Тайлером МакГенри , так и Интерджеем ):
if (in1Len > in2Len) {
if (outPosition < in2Len - 1) {
start = 0;
end = outPosition + 1;
} else if (outPosition >= in1Len) {
start = 1 + outPosition - in2Len;
end = in1Len;
} else {
start = 1 + outPosition - in2Len;
end = outPosition + 1;
}
} else {
if (outPosition < in1Len - 1) {
start = 0;
end = outPosition + 1;
} else if (outPosition >= in2Len) {
start = 1 + outPosition - in2Len;
end = in1Len;
} else {
start = 0;
end = in1Len;
}
}
for (i = start; i < end; i++) {
*result = in1[i] * in2[outPosition - i];
}
К сожалению, предварительное вычисление индексов дает нет заметного уменьшения времени выполнения : (
Вот почему ...
рассмотрите a b + a c
Вы можете оптимизировать это как a * (b + c) (на одно умножение меньше)
В вашем случае есть in2Len ненужных умножений во внутреннем цикле . Что можно устранить.
Следовательно, изменение кода следующим образом должно дать нам требуемую свертку:
( ПРИМЕЧАНИЕ: Следующий код возвращает круговую свертку , которую необходимо развернуть, чтобы получить результат linear-convolution .
void convolve(const Float64 *in1,
UInt32 in1Len,
const Float64 *in2,
UInt32 in2Len,
Float64 *results)
{
UInt32 i, j;
for (i = 0; i < in1Len; i++) {
for (j = 0; j < in2Len; j++) {
results[i+j] += in2[j];
}
results[i] = results[i] * in1[i];
}
}
Это должно дать U максимальный скачок производительности больше, чем что-либо еще. Попробуйте сами и убедитесь !!
GoodLUCK !!
CVS @ 2600Hertz
{{ 1}}Позвольте помощнику convolve работать с большими наборами, вычисляя несколько результатов, используя короткий внешний цикл.
Ключ к распараллеливанию - найти хороший баланс между распределением работы между потоками. Не используйте больше потоков, чем количество ядер ЦП.
Распределите работу поровну между всеми потоками. С такого рода проблемами сложность работы каждого потока должна быть одинаковой.
Самым очевидным, что могло бы помочь, было бы предварительное вычисление начального и конечного индексов цикла и удаление лишних тестов на i
и j
(и связанные с ними скачки). Это:
for (i = 0; i < in1Len; i++) {
if (i > outPosition)
break;
j = outPosition - i;
if (j >= in2Len)
continue;
*result += in1[i] * in2[j];
}
можно переписать как:
UInt32 start_i = (in2Len < outPosition) ? outPosition - in2Len + 1 : 0;
UInt32 end_i = (in1Len < outPosition) ? in1Len : outPosition + 1;
for (i = start_i; i < end_i; i++) {
j = outPosition - i;
*result += in1[i] * in2[j];
}
Таким образом, условие j> = in2Len
никогда не будет истинным, и проверка цикла по сути представляет собой комбинацию тестов i
i
Теоретически вы также можете избавиться от присвоения j
и превратить i ++
в ++ i
, но компилятор, вероятно, выполняет эту оптимизацию для ты уже.