|
using System;
using System.ComponentModel;
using System.ComponentModel.Design;
using System.Collections;
using System.Drawing;
using System.Linq;
using System.Workflow.ComponentModel.Compiler;
using System.Workflow.ComponentModel.Serialization;
using System.Workflow.ComponentModel;
using System.Workflow.ComponentModel.Design;
using System.Workflow.Runtime;
using System.Workflow.Activities;
using System.Workflow.Activities.Rules;
using System.Threading;
using System.Workflow.Runtime.Hosting;
using System.Reflection;
namespace CommonWFLibrary
{
/// <summary>
/// The base class which you can inherrit from to implement the long
/// running acitivity pattern found on Paul Andrew's blog:
/// http://blogs.msdn.com/pandrew/archive/2007/11/13/patterns-for-long-running-activities-in-windows-workflow-foundation.aspx
///
/// This implements a service and an activity that talks to the service.
///
/// </summary>
public partial class LongRunningActivityBase : Activity
{
/// <summary>
/// Call to register this service with the WF runtime.
/// If you don't, then nothing will work!! And you have to
/// call it for every single long-running activity you have! :-)
/// Call this only once per runtime -- there is no checking!
/// </summary>
public static void RegisterService(WorkflowRuntime rt)
{
rt.AddService(new LongRunningActivityBaseService());
}
/// <summary>
/// Add the service, but include save and restore functions.
/// </summary>
/// <param name="rt"></param>
/// <param name="?"></param>
/// <param name="?"></param>
public static void RegisterService(WorkflowRuntime rt, LongRunningActivityBaseService.SaveLRAState sav, LongRunningActivityBaseService.RestoreLRAState res)
{
rt.AddService(new LongRunningActivityBaseService(sav, res));
}
/// <summary>
/// The callback we are using for this guy.
/// </summary>
private MethodArgumentInterface _callback;
/// <summary>
/// Number of times to re-try this long running activity if the Host or machine crashes while
/// this long running activity is in progress. Note this makes no sense if you don't have a persistance
/// serice for the WF _and_ for the long running activity service!! Specify zero (the default) if you
/// don't want any retries. A crash then will cause the system to throw an exception in the WF.
/// </summary>
public int TimesToRetry { get; set; }
/// <summary>
/// Init the actual code activity
/// </summary>
public LongRunningActivityBase()
{
_callback = CreateActivityCallback();
///
/// Default to no retries when this guy crashes!
///
TimesToRetry = 0;
}
/// <summary>
/// Figure out what the callback activity pattern should be.
/// </summary>
/// <returns></returns>
private MethodArgumentInterface CreateActivityCallback()
{
///
/// We will be creating delegates for this - which are structured by
/// type. So we need to figure out what the types are of what we are calling
/// and then go from there. This is all goverened by the Run method, so find that first.
///
var all = from m in this.GetType().GetMethods(BindingFlags.Static | BindingFlags.Public | BindingFlags.FlattenHierarchy)
where m.GetParameters().Length <= 1
from a in m.GetCustomAttributes(false)
where a.GetType() == typeof(LongRunningMethodAttribute)
select m;
if (all.Count() > 1)
{
throw new LongRunningException("Found more than one method tagged with the LongRunningMethod attribute; only one is allowed");
}
if (all.Count() == 1)
{
MethodInfo mi = all.First();
///
/// Got the method. Does it take an argument? If so, then we need to be able to get the
/// argument from somewhere!
///
if (mi.GetParameters().Length == 1)
{
return CreateActivityWithArg(mi);
}
else
{
return CreateActivitySimple(mi);
}
}
///
/// Finally, it looks like nothing is there. Punt; we will bomb when we are actually
/// needed.
///
var all_async = from m in this.GetType().GetMethods(BindingFlags.Static | BindingFlags.Public | BindingFlags.FlattenHierarchy)
where m.GetParameters().Length <= 3
from a in m.GetCustomAttributes(false)
where a.GetType() == typeof(LongRunningMethodStarterAttribute)
select m;
if (all_async.Count() > 1)
{
throw new LongRunningException("Found more than one method tagged with the LongRunningMethodStarter attribute; only one is allowed");
}
if (all_async.Count() == 1)
{
MethodInfo mi = all_async.First();
return CreateAsyncActivity(mi);
}
return null;
}
/// <summary>
/// Creates the arg interface that will actually run the call-back.
/// </summary>
/// <param name="mi"></param>
/// <returns></returns>
private MethodArgumentInterface CreateAsyncActivity(MethodInfo asyncRunner)
{
///
/// First, do some argument checking. The first argument must be the LongRunningContext
/// guy!
///
if (asyncRunner.GetParameters().Length < 1 || asyncRunner.GetParameters().Length > 3)
{
throw new ArgumentException("The long running method must have one or two parameters");
}
if (asyncRunner.GetParameters()[0].ParameterType != typeof(LongRunningContext))
{
throw new ArgumentException("The first parameter for a async long running method must be LongRunningContext.");
}
Type argsType = null;
if (asyncRunner.GetParameters().Length > 1)
{
argsType = asyncRunner.GetParameters()[1].ParameterType;
}
Type contextType = null;
if (asyncRunner.GetParameters().Length == 3)
{
contextType = asyncRunner.GetParameters()[2].ParameterType;
if (contextType != asyncRunner.ReturnParameter.ParameterType)
{
throw new ArgumentException("The return parameter on the activity starter must be the same type as the third argument - the context type!");
}
}
///
/// Create the call back guy and load up the simple gather and run methods.
///
MAIAsyncWithArg result = new MAIAsyncWithArg();
result._gather_callback = GetGatherMethodInfo(argsType);
result._distribute_callback = GetDistributeMethodInfo(null);
result._run_async_callback = asyncRunner;
return result;
}
/// <summary>
/// We have to collect and return arguments. The gathering of arguments is
/// mandatory. We don't have to return them unless it is marked to.
/// </summary>
/// <param name="mi"></param>
/// <returns></returns>
private MethodArgumentInterface CreateActivityWithArg(MethodInfo mi)
{
MAIWithArg s = new MAIWithArg();
s._run_callback = mi;
Type ptype = mi.GetParameters()[0].ParameterType;
Type rtype = mi.ReturnParameter.ParameterType;
s._gather_callback = GetGatherMethodInfo(ptype);
s._distribute_callback = GetDistributeMethodInfo(rtype);
return s;
}
/// <summary>
/// Find the gather method and return a pointer to it.
/// </summary>
/// <param name="ptype"></param>
/// <returns></returns>
private MethodInfo GetGatherMethodInfo(Type ptype)
{
var gather = from m in this.GetType().GetMethods()
where m.GetParameters().Length == 0 && m.ReturnParameter.ParameterType == ptype
from a in m.GetCustomAttributes(false)
where a.GetType() == typeof(LongRunningGatherArgumentsAttribute)
select m;
MethodInfo gatherme = null;
if (gather.Count() != 0)
{
gatherme = gather.First();
}
return gatherme;
}
/// <summary>
/// Fetch the MethodInfo for the distribute callback.
/// </summary>
/// <param name="s"></param>
/// <param name="rtype"></param>
private MethodInfo GetDistributeMethodInfo(Type rtype)
{
if (rtype == typeof(void))
{
return null;
}
var dist = from m in this.GetType().GetMethods()
from a in m.GetCustomAttributes(false)
where a.GetType() == typeof(LongRunningDistributeArgumentsAttribute)
select m;
if (dist.Count() != 0)
{
MethodInfo mi = dist.First();
if (rtype != null)
{
if (mi.GetParameters()[0].ParameterType != rtype)
{
return null;
}
}
return mi;
}
return null;
}
/// <summary>
/// A simple Run call, with no extra work. Cool!
/// </summary>
/// <param name="mi"></param>
/// <returns></returns>
private MethodArgumentInterface CreateActivitySimple(MethodInfo mi)
{
MAIWithArg s = new MAIWithArg();
s._run_callback = mi;
Type rType = mi.ReturnParameter.ParameterType;
s._distribute_callback = GetDistributeMethodInfo(rType);
return s;
}
private static LongRunningActivityBaseService _defaultService = null;
/// <summary>
/// Execute the long running activity. Setup a queue to pass messages
/// back and forth on, and then call the service to get everything up
/// and going.
/// </summary>
/// <param name="executionContext"></param>
/// <returns></returns>
protected override ActivityExecutionStatus Execute(ActivityExecutionContext context)
{
///
/// Check that we are going to be able to do anything!
///
if (_callback == null)
{
throw new LongRunningException("This long running activity has no method to execute the long running task! (public, static, with the LongRunningMethod attribute)");
}
///
/// Get the base faculty. If there is none, use a defualt one.
///
LongRunningActivityBaseService factServ = context.GetService<LongRunningActivityBaseService>();
if (factServ == null)
{
throw new LongRunningException("The LongRunningActivityBaseService was not added to the workflow runtime. Please call the register method!");
}
///
/// Try to gather the object that we are going to send off
/// to the activity...
///
object activityArgs = _callback.PrepForCall(this);
///
/// Create a unique queue name. We will await the final result from here.
///
string qName = this.QualifiedName + "ResultQueue";
WorkflowQueuingService qServ = context.GetService<WorkflowQueuingService>();
WorkflowQueue q = qServ.CreateWorkflowQueue(qName, false);
q.QueueItemAvailable += new EventHandler<QueueEventArgs>(q_QueueItemAvailable);
///
/// Great, now start off the long running service!
///
factServ.StartAnActivity(qName, _callback, activityArgs, TimesToRetry);
///
/// Let the run-time know that this activity is "continued"... so this
/// WF should become idle waiting for a message now.
///
return ActivityExecutionStatus.Executing;
}
/// <summary>
/// Remove the item from the queue. This should be the finish state of the
/// long term work item we submitted in the Execute method above.
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
void q_QueueItemAvailable(object sender, QueueEventArgs e)
{
ActivityExecutionContext context = sender as ActivityExecutionContext;
if (context == null)
{
return;
}
///
/// Grab the result and get rid of the queue
///
WorkflowQueuingService qServ = context.GetService<WorkflowQueuingService>();
WorkflowQueue q = qServ.GetWorkflowQueue(e.QueueName);
object result = q.Dequeue();
qServ.DeleteWorkflowQueue(e.QueueName);
///
/// Deliver the arguments to the activity
///
if (typeof(Exception).IsInstanceOfType(result))
{
throw result as Exception;
}
_callback.FinishUpCall(this, result);
///
/// Let the WF engine know that we are now done with this activity and
/// we can move onto the next activity.
///
context.CloseActivity();
}
}
}
|
By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.
If a file you wish to view isn't highlighted, and is a text file (not binary), please
let us know and we'll add colourisation support for it.
I'm a professor of physics at the University of Washington - my field of research is particle physics. I went into this because of the intersection of physics, hardware, and computers. I've written large experiment data aquisition systems (I've done a lot of multi-thread programming). My hobby is writing tools and other things that tend to be off-shoots of work-related projects.