Я удивлен переполнением стека в моей программе, основанной на асинхронности. Я подозреваю, что основная проблема связана со следующей функцией, которая должна составлять два асинхронных вычисления для параллельного выполнения и ожидания завершения обоих:
let ( <|> ) (a: Async<unit>) (b: Async<unit>) =
async {
let! x = Async.StartChild a
let! y = Async.StartChild b
do! x
do! y
}
После этого у меня есть следующая программа mapReduce
, которая пытается использовать параллелизм как в map
, так и в части reduce
. Неформально идея состоит в том, чтобы активировать N
мапперы и N-1
редукторы с использованием общего канала, дождаться их завершения и прочитать результат из канала. У меня была собственная реализация Channel
, здесь была заменена на ConcurrentBag
для более короткого кода (проблема затрагивает оба):
let mapReduce (map : 'T1 -> Async<'T2>)
(reduce : 'T2 -> 'T2 -> Async<'T2>)
(input : seq<'T1>) : Async<'T2> =
let bag = System.Collections.Concurrent.ConcurrentBag()
let rec read () =
async {
match bag.TryTake() with
| true, value -> return value
| _ -> do! Async.Sleep 100
return! read ()
}
let write x =
bag.Add x
async.Return ()
let reducer =
async {
let! x = read ()
let! y = read ()
let! r = reduce x y
return bag.Add r
}
let work =
input
|> Seq.map (fun x -> async.Bind(map x, write))
|> Seq.reduce (fun m1 m2 -> m1 <|> m2 <|> reducer)
async {
do! work
return! read ()
}
Теперь следующий базовый тест начинает генерировать исключение StackOverflowException при n = 10000 :
let test n =
let map x = async.Return x
let reduce x y = async.Return (x + y)
mapReduce map reduce [0..n]
|> Async.RunSynchronously
РЕДАКТИРОВАТЬ: Альтернативная реализация комбинатора <|>
делает тест успешным на N = 10000:
let ( <|> ) (a: Async<unit>) (b: Async<unit>) =
Async.FromContinuations(fun (ok, _, _) ->
let count = ref 0
let ok () =
lock count (fun () ->
match !count with
| 0 -> incr count
| _ -> ok ())
Async.Start <|
async {
do! a
return ok ()
}
Async.Start <|
async {
do! b
return ok ()
})
Это действительно удивительно для меня, потому что это то, что я предполагал Async. StartChild
делает. Есть мысли о том, какое решение было бы оптимальным?