Click here to Skip to main content
15,886,199 members
Articles / All Topics

Investigating Asynchronous Workflows in F#

Rate me:
Please Sign up or sign in to vote.
0.00/5 (No votes)
4 Jan 2011CPOL5 min read 7K   1  
Investigating Asynchronous Workflows in F#

This article examines 4 common basic paradigms for implementing request response behavior using F#. These are the:

  1. Single-threaded synchronous model
  2. Multi-threaded synchronous model
  3. Single-threaded asynchronous model
  4. Multi-threaded asynchronous model.

We use the Net.WebRequest class to exercise the various I/O models. The source code for the article is here.

Single-threaded Synchronous Model

let request_url (url : string) =
  let tid = string(Threading.Thread.CurrentThread.ManagedThreadId)
  Console.WriteLine("Thread id = " + tid + ", Request for " + url)
  let wr = Net.WebRequest.Create(url)
  use response = wr.GetResponse()
  let tid = string(Threading.Thread.CurrentThread.ManagedThreadId)
  Console.WriteLine("Thread id = " + tid + ", Response for " + url)

///run the single threaded synchronous calls
let ss_run urls =
  Console.WriteLine("Single-threaded synchronous requests ...")
  List.iter request_url urls

The fundamental properties to note here are that it’s single-threaded and that the responses to a request happen in sequence. Below is the example output when this code is run.

Single-threaded synchronous requests ...
Thread id = 1, Request for http://www.google.com
Thread id = 1, Response for http://www.google.com
Thread id = 1, Request for http://www.microsoft.com
Thread id = 1, Response for http://www.microsoft.com
Thread id = 1, Request for http://www.yahoo.com
Thread id = 1, Response for http://www.yahoo.com
Thread id = 1, Request for http://www.wordpress.com
Thread id = 1, Response for http://www.wordpress.com
Thread id = 1, Request for http://www.blizzard.com
Thread id = 1, Response for http://www.blizzard.com
Thread id = 1, Request for http://www.valvesoftware.com
Thread id = 1, Response for http://www.valvesoftware.com

Multi-threaded Synchronous Model

In this model, multiple threads handle the initiation of the request. The request is made synchronously and the thread is blocked while waiting for the response.

let request_url (url : string) =
  let tid = string(Threading.Thread.CurrentThread.ManagedThreadId)
  Console.WriteLine("Thread id = " + tid + ", Request for " + url)
  let wr = Net.WebRequest.Create(url)
  use response = wr.GetResponse()
  let tid = string(Threading.Thread.CurrentThread.ManagedThreadId)
  Console.WriteLine("Thread id = " + tid + ", Response for " + url)

let create_thread request url =
  let helper request url =
    let tid = string(Threading.Thread.CurrentThread.ManagedThreadId)
    request url

  let t = new Threading.Thread(Threading.ThreadStart(fun _ -> helper request url))
  t

///run the multi-threaded synchronous version
let ms_run urls =
  Console.WriteLine("Multi-threaded synchronous requests ...")
  let tasks = List.map (fun url -> create_thread request_url url) urls
  List.iter (fun (thread : Threading.Thread) -> thread.Start()) tasks
  //wait for all threads to complete
  List.iter (fun (thread : Threading.Thread) -> thread.Join()) tasks

Single-threaded Asynchronous Model

Here, I attempted to implement a single-threaded process to perform request response behaviour asynchronously and ended up bashing my head up against the problem. Eventually, I gave up and came to the conclusion that one can’t do this easily under .NET. Apparently, you have to invoke some magic using a SynchronizationContext class mixed in with some other .NET voodoo to do it and even then I’m not sure if that would work.

Am I being overly pedantic? No, I don’t think so since single-threaded programs performing asynchronous I/O have been in use for longer than .NET has been in existence. In fact, programs that behave in this fashion are currently in use in network appliances running Linux in various companies in all parts of the world. A use case for .NET needing to support this model is outlined. Imagine you were given the task of porting some code written on Linux platform where this model was employed onto Windows. Typically, that code would be using select or libevent to multiplex input from various sources into a single thread. The safest way to port it would be to keep using the same single-threaded asynchronous model and not introduce a multi-threaded framework for the sake of implementing asynchronous I/O. Other reasons are listed below:

  • More threads increase the amount of time the OS spends context switching thus degrading performance
  • Multi-threaded programs are harder to write correctly because for one your program is now non-deterministic.
  • If .NET supported this model, then they wouldn’t have to advise having to do silly things like switching the context back to the UI thread so that you can update a variable owned by that thread.
  • It’s clean and elegant and aesthetically pleasing.

The example program below was my attempt to put together a single-threaded program that makes multiple requests and handles the response in the same thread. However, the output shows that the response comes back in a different thread from the original. The program also uses asynchronous workflows. This is a neat language feature that no other .NET language currently supports and will be explained in the final section.

let st_request (url : string) max_urls (count : int ref)
  (finished : Threading.ManualResetEvent) =
  async {
    let tid = string(Threading.Thread.CurrentThread.ManagedThreadId)
    let wr = Net.WebRequest.Create url
    Console.WriteLine("Thread id = " + tid + ", Request for " + url)
    use! response = wr.AsyncGetResponse()
    count := !count + 1
    let tid = string(Threading.Thread.CurrentThread.ManagedThreadId)
    Console.WriteLine("Thread id = " + tid + ", Response for " + url)
    if !count = 6 then
      finished.Set() |> ignore
    ()
  }

///run the single threaded asynchronous version
let sa_run =
  let finished = new Threading.ManualResetEvent(false)
  let count = ref 0
  (fun (urls : list<string>) ->
    Console.WriteLine("Single-threaded asynchronous requests ...")
    List.iter (fun url ->
      Async.StartImmediate(st_request url urls.Length count finished)) urls (*** <2> ***)
    finished.WaitOne() |> ignore)

The output from running this code is shown below:

Single-threaded asynchronous requests ...
Thread id = 1, Request for http://www.google.com
Thread id = 1, Request for http://www.microsoft.com
Thread id = 1, Request for http://www.yahoo.com
Thread id = 1, Request for http://www.wordpress.com
Thread id = 1, Request for http://www.blizzard.com
Thread id = 1, Request for http://www.valvesoftware.com
Thread id = 13, Response for http://www.microsoft.com
Thread id = 13, Response for http://www.yahoo.com
Thread id = 11, Response for http://www.google.com
Thread id = 14, Response for http://www.valvesoftware.com
Thread id = 10, Response for http://www.blizzard.com
Thread id = 13, Response for http://www.wordpress.com

It’s clearly not single-threaded.

Multi-threaded Asynchronous Model

Like the multi-threaded synchronous model, a thread is spawned for each request. The important thing to note here is that, because the program is now multi-threaded, we have to take care of shared variables. In the sample below, the count reference is shared between multiple threads therefore increments to it must be atomic. There is nothing inherently special about an async workflow that makes it auto-magically thread safe.

In other languages, handling the response from a request is often the responsibility of a callback function. It’s named callback because it is called back when the response is received and ready for processing. F# brings a sanitized model for dealing with asynchronous I/O through the use of asynchronous workflows to the table. These get rid of the control flow inversion [async_model] associated with the callback style of programming. This is a big thing in the .NET world because no other .NET language can do this although OCaml has had support for this for a while via lwt Anyone that has had to work with programs that have complicated interactions with each other via message passing will see the value in what F# does here.

let mt_request (url : string) max_urls (count : int ref)
  (finished : Threading.ManualResetEvent) =
  async {
    let tid = string(Threading.Thread.CurrentThread.ManagedThreadId)
    let wr = Net.WebRequest.Create url
    Console.WriteLine("Thread id = " + tid + ", Request for " + url)
    use! response = wr.AsyncGetResponse() (*** <1> ***)
    Threading.Interlocked.Increment(count) |> ignore (*** <2> ***)
    let tid = string(Threading.Thread.CurrentThread.ManagedThreadId)
    Console.WriteLine("Thread id = " + tid + ", Response for " + url)
    if !count = 6 then
      finished.Set() |> ignore
    ()
  }

let ma_run =
  let count = ref 0
  let finished = new Threading.ManualResetEvent(false)
  (fun (urls : list<string>) ->
    Console.WriteLine("Multi-threaded asynchronous requests ... ")
    List.iter (fun url ->
      Async.Start(mt_request url urls.Length count finished)) urls (*** <3> ***)
    finished.WaitOne() |> ignore)
  1. This line is the heart and soul of the async workflow and there’s quite a bit going on this single line. There’s the use of the keyword use, a ! and a call to AsyncGetResponse. The use keyword indicates that the resource bound to the name response implements the IDisposable interface. Anything that implements IDisposable needs to call Dispose to release resources allocated to it back to the operating system. Binding it with use indicates we want the lifetime of the resource tied to the lexical scope of the name, effectively freeing the programmer from the need to manually call Dispose since this will be automatically handled for us when the name response goes out of scope. The ! operator here tells it to:
    1. initiate the request, and
    2. wait without blocking the current thread for the response
      Note that there is no registering of a callback function to handle the response. Control flow is not inverted, and variables that were in scope prior to the request being sent are still in scope when the response arrives. These properties are not maintained in a language that requires the use of a callback function to handle the response.
  2. We use Interlocked.Increment to increment the count. This is atomic and therefore thread safe.
  3. Async.Start will spawn a new thread for each async workflow.

Output from running code that uses the functions above is shown below:

Multi-threaded asynchronous requests ...
Thread id = 17, Request for http://www.google.com
Thread id = 20, Request for http://www.blizzard.com
Thread id = 17, Request for http://www.valvesoftware.com
Thread id = 21, Request for http://www.wordpress.com
Thread id = 19, Request for http://www.yahoo.com
Thread id = 18, Request for http://www.microsoft.com
Thread id = 14, Response for http://www.microsoft.com
Thread id = 16, Response for http://www.google.com
Thread id = 16, Response for http://www.yahoo.com
Thread id = 10, Response for http://www.valvesoftware.com
Thread id = 14, Response for http://www.blizzard.com
Thread id = 13, Response for http://www.wordpress.com

Note the thread hopping that occurs. The response is not guaranteed to be in the same thread as the request was made from.

Conclusion

In summary, we’ve shown how to implement various common models using F# for request response style programming. Unfortunately, it’s not straightforward to implement single-threaded asynchronous code in .NET because it keeps wanting to create threads behind your back. Regardless, the neat thing about F# and asynchronous workflows is that they get rid of the flow of control inversion associated with callback style programming resulting in other useful properties such as exception propagation and resource lifetime management via lexical scoping being maintained.

References

This article was originally posted at http://www.zenskg.net/wordpress?p=79

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Australia Australia
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions

 
-- There are no messages in this forum --