F#27 : Asynchronous Workflows





5.00/5 (1 vote)
Last time we looked at reactive programming, and this time we will look at a very cool feature of F# which is called asynchronous workflows. It is fair to say that the new Async-Await syntax bears more than a passing resemblance to F# async workflows.
Last time we looked at reactive programming, and this time we will look at a very cool feature of F# which is called asynchronous workflows. It is fair to say that the new Async-Await syntax bears more than a passing resemblance to F# async workflows.
Async workflows offer a easy way to write asynchronous code in F# that perform some background task, and have a lot of helper functions to manage them using the Async class.
The Async Class
The Async class has a whole load of functions that allow you to write async code. Here is a table which shows the functions that you are free to use. We will be looking at some of these
And here are an example or 2 of how you might use some of these functions
Async.AwaitEvent
In this small example, we create a Timer, and then using the Async class’s Async.AwaitEvent(..)
, which will return a Async<T>
. This essentially created a thread pool thread to do the work, and then we carry on, and do some more stuff, and finally we use Async.RunSynchronously
to await the original Async<T>
created by the Async.AwaitEvent(..)
call.
Here is the code:
open System open System.IO open System open System.Linq open System.Collections.Generic open Microsoft.FSharp.Control [<EntryPoint>] let main argv = let waitForTimerEvent(timeout) = // create a timer and associated async event let timer = new System.Timers.Timer(timeout) //async computation that waits of a particualt CLI Event to occur let timerEvent = Async.AwaitEvent (timer.Elapsed) |> Async.Ignore printfn "Time now: %O" DateTime.Now.TimeOfDay timer.Start() printfn "do other stuff" //blocks waiting for the timer tick, this is pretty //much the same as await someTask in async/await Async.RunSynchronously timerEvent printfn "Time Is Done at time: %O" DateTime.Now.TimeOfDay Console.ReadLine() |> ignore waitForTimerEvent 5000.0 //return 0 for main method 0
The output may help to clarify this a bit:
Async.AwaitIAsyncResult
Here is another example (from MSDN) that shows how the Async class also offers function for creating Async<T>
values from the common APM (BeginSomeMethod()
/EndSomeMethod(IAsyncResult…)
). It can be seen that this example uses the Async class to both write and read to a text file asynchronously. It does introduce a bit of extra syntax by way of the “Let!” which I will be getting to in just a minute
open System open System.IO open System open System.Linq open System.Collections.Generic open Microsoft.FSharp.Control [<EntryPoint>] let main argv = let streamWriter1 = File.CreateText("test1.txt") let count = 10000000 let buffer = Array.init count (fun index -> byte (index % 256)) printfn "Writing to file test1.txt." let asyncResult = streamWriter1.BaseStream.BeginWrite(buffer, 0, count, null, null) // Read a file, but use AwaitIAsyncResult to wait for the write operation // to be completed before reading. let readFile filename asyncResult count = async { //write the file but do not continue until we have done so let! returnValue = Async.AwaitIAsyncResult(asyncResult) //wait for the file to be written before we read it, again this is like //await File.WriteAllTextAync(…) in async/await land printfn "Reading from file test1.txt." // Close the file. streamWriter1.Close() // Now open the same file for reading. let streamReader1 = File.OpenText(filename) let! newBuffer = streamReader1.BaseStream.AsyncRead(count) return newBuffer } let bufferResult = readFile "test1.txt" asyncResult count |> Async.RunSynchronously Console.ReadLine() |> ignore //return 0 for main method 0
Which when run will give the following results:
A Rather Nice UI Example
Credit where credit is due, I do a lot of UI work, and I have to say that the MSDN example on how to use some of the Async class functions in this area is first rate. This small example shows how to use the following Async class functions
Async.SwitchToThreadPool()
Async.SwitchToContext()
Async.StartImmediate(…)
Here is the relevant code:
open System.Windows.Forms [<EntryPoint>] let main argv = let bufferData = Array.zeroCreate<byte> 100000000 let async1 (button : Button) = async { button.Text <- "Busy" button.Enabled <- false do! Async.Sleep 5000 let context = System.Threading.SynchronizationContext.Current do! Async.SwitchToThreadPool() use outputFile = System.IO.File.Create("longoutput.dat") do! outputFile.AsyncWrite(bufferData) do! Async.SwitchToContext(context) button.Text <- "Start" button.Enabled <- true MessageBox.Show("Done") |> ignore } let form = new Form(Text = "Test Form") let button = new Button(Text = "Start") form.Controls.Add(button) button.Click.Add(fun args -> Async.StartImmediate(async1 button)) Application.Run(form) //return 0 for main method 0
So what exactly is going on here? Well as any winforms/WPF/Silverlight or Window8 developer knows, you must update UI controls on the thread that created them, so this code deals with that, by way of a SychnorizationContext
which is the standard winforms way of posting delegates/actions to the correct thread. Lets walk through it
- Button is clicked on UI thread, which starts workflow using
Async.StartImmediate
which is in current thread (the UI thread) - Async workflow starts, and sets some form control values, which is ok as it is still the UI thread at this point
- We then store the windows forms
SynchronizationContext
to allow us to marshall calls back to the UI thread later - We then switch to a threadpool thread using
Async.SwitchToThreadPool()
- We do some async work where we write to a file
- We then switch back to the UI thread using the
SynchronizationContext
we stored earlier, and use theAsync.SwitchToContext()
function - We then set some form control values, which is ok at this point as we are now back on the UI thread thanks to the switch to the previously stored
SynchronizationContext
Anyway here is the results of this code, After we click the button and trigger the async workflow
And when the async workflow has completed
This example did introduce a bit too much syntax, but we are just about to look into this so I hope you can forgive me that slight indulgence.
Starting Async Workflows Manually
As well as the inbuilt functions of the Async class, you can also manually create your own async workflows. Async workflows generally follow this syntax when created manually.
async { expression }
Using this syntax, the expression is some computation that will run asynchronously. This means it will not block the current computation/thread. The type of the expression is Async<’a>. There are many different ways of creating asynchronous code in F#, and you may use any of the types/functions which are available . The Async class is the best place to start.
F# async workflows allow synchronous and asynchronous operations to be used within the same workflow. They also come with their own set of syntax that can be used in the construction of workflows. Using thee following keywords (be careful though they are quite similar to the non async versions that you have already seen), you are able to create complete workflows, which may/may not contain a mixture of async code and synchronous code.
let!
Allows you to effectively wait for async results. It has the effect that the right hand side of the Let! must return before the rest of the async workflow continues
use!
The object is disposed of at the close of the current scope.
do!
The same as its counter part “do”, but is intended to be used inside async workflows
Here is a small manually created async workflow, that does nothing more than sleep inside of it
open System [<EntryPoint>] let main argv = let sleepAsync(timeout) = //this is the manual async workflow let sleeper = async { printfn "Before sleep %O" (DateTime.Now.ToLongTimeString()) do! Async.Sleep timeout printfn "After sleep %O" (DateTime.Now.ToLongTimeString()) } //wait on the sleeper (where sleeper is Async<T>) Async.RunSynchronously sleeper printfn "Async worflow completed" //call the function that contains the async workflow sleepAsync(5000)a Console.ReadLine() |> ignore //return 0 for main method 0
Which produces the following results when run
You may also nest workflows, which means that you may manually create or trigger async workflows from within workflows (kind of like inception, if you have seen it). Anyway here is an example of that, where we create a child workflow using our previous timer example, and then have a parent workflow use this child workflow.
open System [<EntryPoint>] let main argv = //Child workflow let timerWorkflow = async { let timer = new System.Timers.Timer(5000.0) let timerEvent = Async.AwaitEvent (timer.Elapsed) |> Async.Ignore timer.Start() //wait for the timer Async<T> Async.RunSynchronously timerEvent } //Parent workflow let parentWorkflow = async{ printfn "Starting parent at : %O" (DateTime.Now.ToLongTimeString()) let! childTimerWorkflow = Async.StartChild timerWorkflow printfn "parentWorkflow is about to wait for a bit at : %O" (DateTime.Now.ToLongTimeString()) do! Async.Sleep 2000 printfn "parentWorkflow continues to do stuff at : %O" (DateTime.Now.ToLongTimeString()) // use let! to wait for the childTimerWorkflow let! result = childTimerWorkflow printfn "parentWorkflow completed at : %O" (DateTime.Now.ToLongTimeString()) } // run the parentWorkflow Async.RunSynchronously parentWorkflow Console.ReadLine() |> ignore //return 0 for main method 0
Which when run gives the following result
Cancellation Of Workflows
You may also use the standard TPL type of cancellation to cancel an async workfow. That is one where you use a CancellationTokenSource
to provide a CancellationToken
to the async workflow, which you may then use to cancel the work flow with.
Here is some code that demonstrates this technique.
open System open System.Threading [<EntryPoint>] let main argv = //Child workflow let timerWorkflow = async { for i in 0..9999 do printfn "Start is %d" i do! Async.Sleep(1000) printfn "Done is %d" i } let cts = new CancellationTokenSource() // start the timerWorkflow but pass in the CancellationToken Async.Start (timerWorkflow,cts.Token) //wait 1 second then cancel the token Thread.Sleep(2000) printfn "Cancelling" |> ignore cts.Cancel() printfn "Cancelled" |> ignore Console.ReadLine() |> ignore //return 0 for main method 0
Which when run gives the follow results
The eagle eyed amongst you will notice that I did not have to do anything specific with the CancellationToken
inside the workflow itself, in F# this is all just handled for you. If you compare that to the C# equivalent (Ok we could use Parrallel.For(..) but for this demo I did not) you will see there is a lot more C# code
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication5
{
class Program
{
static void Main(string[] args)
{
var tcs = new CancellationTokenSource();
var token = tcs.Token;
var task = Task.Factory.StartNew(() =>
{
token.ThrowIfCancellationRequested();
for (int i = 0; i < 9999; i++)
{
Console.WriteLine(string.Format("Start is : {0}", i));
Thread.Sleep(1000);
Console.WriteLine(string.Format("Done is : {0}", i));
if (token.IsCancellationRequested)
token.ThrowIfCancellationRequested();
}
}, token);
tcs.Cancel();
try
{
task.Wait();
}
catch (AggregateException aex)
{
foreach (var ex in aex.InnerExceptions)
{
Console.WriteLine(aex.Message + " " + ex.Message);
}
}
Console.ReadKey();
}
}
}
Waiting For Cancelled Async WorkFlow
One thing that struck me as very odd is that if I was using C# I would still be able to know about a cancelled Task in TPL land, by wait of a continuation that only runs on Cancelled State, or I could use a Wait with a Try-Catch which caught AggregateException
(much like the sample shown above), and using async/await it is even easier.
Anyway my point being in C#, I would have no problems waiting on a cancellable task, there are many ways, but I could not seem to find a way to do that in F#. So I did some googling and low and behold Tomas Petricek has done some code that allows you to do that which you can find at this url: http://www.fssnip.net/d4, I got to this link from this Stack Overflow discussion : http://stackoverflow.com/questions/11609089/waiting-on-the-cancellation-of-an-asynchronous-workflow
His code is complex and probably not best suited to a beginners guide, but it is worth looking at if that is what you are after.
Serial Workflows
You can of course run async workflows in series, where you just wait for one workflow to complete using “let!”, here is a small example:
open System open System.Threading [<EntryPoint>] let main argv = let looper = async { for i in 0..3 do printfn "Start is %d" i do! Async.Sleep(500) printfn "Done is %d" i } //runs 2 async workflows one after another let run2WorkflowsInSeries = async { let! loop1 = looper printfn "Done loop1" let! loop2 = looper printfn "Done loop2" } Async.RunSynchronously run2WorkflowsInSeries printfn "All done" Console.ReadLine() |> ignore //return 0 for main method 0
Which when runs gives the following output:
Parallel Workflows
It is also entirely possible to run async workflows using a parallel type arrangement. If I was using TPL in C#, this would be the equivalent of Task.WhenAll(..)
/ Task.WaitAll(..)
. I have shameless stolen the following code snippet from Tomas Petriceks blog, which you can find right here, I urge you to read this read this blog, it is very very interesting : http://tomasp.net/blog/csharp-fsharp-async-intro.aspx/
Here is the code, which demonstrates how to run async workflows in parallel
open System open System.Threading open System.Text.RegularExpressions open System.Net open System.IO [<EntryPoint>] let main argv = let regTitle = new Regex(@"\<title\>([^\<]+)\</title\>") /// Asynchronously downloads a web page and returns /// title of the page and size in bytes let downloadPage(url:string) = async { let request = HttpWebRequest.Create(url) // Asynchronously get response and dispose it when we're done use! response = request.AsyncGetResponse() use stream = response.GetResponseStream() let temp = new MemoryStream() let buffer = Array.zeroCreate 4096 // Loop that downloads page into a buffer (could use 'while' // but recursion is more typical for functional language) let rec download() = async { let! count = stream.AsyncRead(buffer, 0, buffer.Length) do! temp.AsyncWrite(buffer, 0, count) if count > 0 then return! download() } // Start the download asynchronously and handle results do! download() temp.Seek(0L, SeekOrigin.Begin) |> ignore let html = (new StreamReader(temp)).ReadToEnd() return regTitle.Match(html).Groups.[1].Value, html.Length } // Downloads pages in parallel and prints all results let comparePages = async { //wait for all results, similar to Task.WhenAll let! results = [| "http://www.google.com"; "http://www.bing.com"; "http://www.yahoo.com" |] |> Array.map downloadPage |> Async.Parallel for title, length in results do Console.WriteLine("{0} (length {1})", title, length) } // Start the computation on current thread do comparePages |> Async.RunSynchronously Console.ReadLine() |> ignore //return 0 for main method 0
Which when run produces the following results:
Further Reading : The F# Asynchronous Programming Model
Read more from the F# language creator, and main researcher, Don Syme and Tomas Petricek, I urge you all to go off and read the PDF at this link : http://research.microsoft.com/apps/pubs/default.aspx?id=147194