F # async stack overflow

Я удивлен переполнением стека в моей программе, основанной на асинхронности. Я подозреваю, что основная проблема связана со следующей функцией, которая должна составлять два асинхронных вычисления для параллельного выполнения и ожидания завершения обоих:

let ( <|> ) (a: Async<unit>) (b: Async<unit>) =
    async {
        let! x = Async.StartChild a
        let! y = Async.StartChild b
        do! x
        do! y
    }

После этого у меня есть следующая программа mapReduce , которая пытается использовать параллелизм как в map , так и в части reduce . Неформально идея состоит в том, чтобы активировать N мапперы и N-1 редукторы с использованием общего канала, дождаться их завершения и прочитать результат из канала. У меня была собственная реализация Channel , здесь была заменена на ConcurrentBag для более короткого кода (проблема затрагивает оба):

let mapReduce (map    : 'T1 -> Async<'T2>)
              (reduce : 'T2 -> 'T2 -> Async<'T2>)
              (input  : seq<'T1>) : Async<'T2> =
    let bag = System.Collections.Concurrent.ConcurrentBag()

    let rec read () =
        async {
            match bag.TryTake() with
            | true, value -> return value
            | _           -> do! Async.Sleep 100
                             return! read ()
        }

    let write x =
        bag.Add x
        async.Return ()

    let reducer =
        async {
            let! x = read ()
            let! y = read ()
            let! r = reduce x y
            return bag.Add r
        }

    let work =
        input
        |> Seq.map (fun x -> async.Bind(map x, write))
        |> Seq.reduce (fun m1 m2 -> m1 <|> m2 <|> reducer)

    async {
        do! work
        return! read ()
    }

Теперь следующий базовый тест начинает генерировать исключение StackOverflowException при n = 10000 :

let test n  =
    let map x      = async.Return x
    let reduce x y = async.Return (x + y)
    mapReduce map reduce [0..n]
    |> Async.RunSynchronously

РЕДАКТИРОВАТЬ: Альтернативная реализация комбинатора <|> делает тест успешным на N = 10000:

let ( <|> ) (a: Async<unit>) (b: Async<unit>) =
  Async.FromContinuations(fun (ok, _, _) ->
    let count = ref 0
    let ok () =
        lock count (fun () ->
            match !count with
            | 0 -> incr count
            | _ -> ok ())
    Async.Start <|
        async {
            do! a
            return ok ()
        }
    Async.Start <|
        async {
            do! b
            return ok ()
        })

Это действительно удивительно для меня, потому что это то, что я предполагал Async. StartChild делает. Есть мысли о том, какое решение было бы оптимальным?

9
задан t0yv0 6 August 2011 в 22:52
поделиться