Как правильно использовать TryScan в F #

Я пытался найти пример использования TryScan , но не нашел ни одного , не могли бы вы мне помочь?

Что я хотел бы сделать (довольно упрощенный пример): у меня есть MailboxProcessor , который принимает два типа сообщений.

  • Первый GetState возвращает текущее состояние. GetState сообщения отправляются довольно часто

  • Другое UpdateState очень дорого (требует много времени), например загружает что-то из Интернета, а затем обновляет состояние соответствующим образом. UpdateState вызывается очень редко.

Моя проблема в том, что сообщения GetState заблокированы и ждут, пока не будут обслужены предыдущие UpdateState . Вот почему я попытался использовать TryScan для обработки всех сообщений GetState , но безуспешно.

Мой пример кода:

type Msg = GetState  of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
             let rec loop state = async {
                // this TryScan doesn't work as expected
                // it should process GetState messages and then continue
                mbox.TryScan(fun m ->
                    match m with 
                    | GetState(chnl) -> 
                        printfn "G processing TryScan"
                        chnl.Reply(state)
                        Some(async { return! loop state})
                    | _ -> None
                ) |> ignore

                let! msg = mbox.Receive()
                match msg with
                | UpdateState ->
                    printfn "U processing"
                    // something very time consuming here...
                    async { do! Async.Sleep(1000) } |> Async.RunSynchronously
                    return! loop (state+1)
                | GetState(chnl) ->
                    printfn "G processing"
                    chnl.Reply(state)
                    return! loop state
             }
             loop 0
)

[async { for i in 1..10 do 
          printfn " U"
          mbox.Post(UpdateState)
          async { do! Async.Sleep(200) } |> Async.RunSynchronously
};
async { // wait some time so that several `UpdateState` messages are fired
        async { do! Async.Sleep(500) } |> Async.RunSynchronously
        for i in 1..20 do 
          printfn "G"
          printfn "%d" (mbox.PostAndReply(GetState))
}] |> Async.Parallel |> Async.RunSynchronously

Если вы попытаетесь запустить код, вы увидите, что сообщение GetState почти не обрабатывается, потому что оно ожидает результата. С другой стороны, UpdateState только запускает и забывает, тем самым эффективно блокируя получение состояния.

Править

Текущее решение, которое мне подходит:

type Msg = GetState  of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
             let rec loop state = async {
                // this TryScan doesn't work as expected
                // it should process GetState messages and then continue
                let! res = mbox.TryScan((function
                    | GetState(chnl) -> Some(async {
                            chnl.Reply(state)
                            return state
                        })
                    | _ -> None
                ), 5)

                match res with
                | None ->
                    let! msg = mbox.Receive()
                    match msg with
                        | UpdateState ->
                            async { do! Async.Sleep(1000) } |> Async.RunSynchronously
                            return! loop (state+1)
                        | _ -> return! loop state
                | Some n -> return! loop n
             }
             loop 0
)

Реакции на комментарии: идея с другим MailboxProcessor или ThreadPool , который выполняет UpdateState параллельно - это здорово, но в настоящее время мне это не нужно. Все, что я хотел сделать, это обработать все сообщения GetState , а затем и другие сообщения. Меня не волнует, что при обработке UpdateState агент заблокирован.

Я покажу вам, в чем проблема на выходе:

// GetState messages are delayed 500 ms - see do! Async.Sleep(500)
// each UpdateState is sent after 200ms
// each GetState is sent immediatelly! (not real example, but illustrates the problem)
 U            200ms   <-- issue UpdateState
U processing          <-- process UpdateState, it takes 1sec, so other 
 U            200ms       5 requests are sent; sent means, that it is
 U            200ms       fire-and-forget message - it doesn't wait for any result
                          and therefore it can send every 200ms one UpdateState message
G                     <-- first GetState sent, but waiting for reply - so all 
                          previous UpdateState messages have to be processed! = 3 seconds
                          and AFTER all the UpdateState messages are processed, result
                          is returned and new GetState can be sent. 
 U            200ms
 U            200ms       because each UpdateState takes 1 second
 U            200ms
U processing
 U
 U
 U
 U
U processing
G processing          <-- now first GetState is processed! so late? uh..
U processing          <-- takes 1sec
3
G
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
G processing          <-- after MANY seconds, second GetState is processed!
10
G
G processing
// from this line, only GetState are issued and processed, because 
// there is no UpdateState message in the queue, neither it is sent
5
задан stej 3 February 2011 в 17:46
поделиться