Click here to Skip to main content
15,897,371 members
Articles / Programming Languages / C#

Long Running Work Flow Activities

Rate me:
Please Sign up or sign in to vote.
5.00/5 (4 votes)
11 Dec 2008CPOL18 min read 45.3K   385   35  
A generic way to write long running work flow activities
using System;
using System.ComponentModel;
using System.ComponentModel.Design;
using System.Collections;
using System.Drawing;
using System.Linq;
using System.Workflow.ComponentModel.Compiler;
using System.Workflow.ComponentModel.Serialization;
using System.Workflow.ComponentModel;
using System.Workflow.ComponentModel.Design;
using System.Workflow.Runtime;
using System.Workflow.Activities;
using System.Workflow.Activities.Rules;
using System.Threading;
using System.Workflow.Runtime.Hosting;
using System.Reflection;

namespace CommonWFLibrary
{
    /// <summary>
    /// The base class which you can inherrit from to implement the long
    /// running acitivity pattern found on Paul Andrew's blog:
    /// http://blogs.msdn.com/pandrew/archive/2007/11/13/patterns-for-long-running-activities-in-windows-workflow-foundation.aspx
    /// 
    /// This implements a service and an activity that talks to the service.
    /// 
    /// </summary>
    public partial class LongRunningActivityBase : Activity
    {


        /// <summary>
        /// Call to register this service with the WF runtime.
        /// If you don't, then nothing will work!! And you have to
        /// call it for every single long-running activity you have! :-)
        /// Call this only once per runtime -- there is no checking!
        /// </summary>
        public static void RegisterService(WorkflowRuntime rt)
        {
            rt.AddService(new LongRunningActivityBaseService());
        }

        /// <summary>
        /// Add the service, but include save and restore functions.
        /// </summary>
        /// <param name="rt"></param>
        /// <param name="?"></param>
        /// <param name="?"></param>
        public static void RegisterService(WorkflowRuntime rt, LongRunningActivityBaseService.SaveLRAState sav, LongRunningActivityBaseService.RestoreLRAState res)
        {
            rt.AddService(new LongRunningActivityBaseService(sav, res));
        }

        /// <summary>
        /// The callback we are using for this guy.
        /// </summary>
        private MethodArgumentInterface _callback;

        /// <summary>
        /// Number of times to re-try this long running activity if the Host or machine crashes while
        /// this long running activity is in progress. Note this makes no sense if you don't have a persistance
        /// serice for the WF _and_ for the long running activity service!! Specify zero (the default) if you
        /// don't want any retries. A crash then will cause the system to throw an exception in the WF.
        /// </summary>
        public int TimesToRetry { get; set; }

        /// <summary>
        /// Init the actual code activity
        /// </summary>
        public LongRunningActivityBase()
        {
            _callback = CreateActivityCallback();

            ///
            /// Default to no retries when this guy crashes!
            /// 

            TimesToRetry = 0;
        }

        /// <summary>
        /// Figure out what the callback activity pattern should be.
        /// </summary>
        /// <returns></returns>
        private MethodArgumentInterface CreateActivityCallback()
        {
            ///
            /// We will be creating delegates for this - which are structured by
            /// type. So we need to figure out what the types are of what we are calling
            /// and then go from there. This is all goverened by the Run method, so find that first.
            /// 

            var all = from m in this.GetType().GetMethods(BindingFlags.Static | BindingFlags.Public | BindingFlags.FlattenHierarchy)
                      where m.GetParameters().Length <= 1
                      from a in m.GetCustomAttributes(false)
                      where a.GetType() == typeof(LongRunningMethodAttribute)
                      select m;

            if (all.Count() > 1)
            {
                throw new LongRunningException("Found more than one method tagged with the LongRunningMethod attribute; only one is allowed");
            }

            if (all.Count() == 1)
            {
                MethodInfo mi = all.First();

                ///
                /// Got the method. Does it take an argument? If so, then we need to be able to get the
                /// argument from somewhere!
                /// 

                if (mi.GetParameters().Length == 1)
                {
                    return CreateActivityWithArg(mi);
                }
                else
                {
                    return CreateActivitySimple(mi);
                }
            }

            ///
            /// Finally, it looks like nothing is there. Punt; we will bomb when we are actually
            /// needed.
            /// 

            var all_async = from m in this.GetType().GetMethods(BindingFlags.Static | BindingFlags.Public | BindingFlags.FlattenHierarchy)
                            where m.GetParameters().Length <= 3
                            from a in m.GetCustomAttributes(false)
                            where a.GetType() == typeof(LongRunningMethodStarterAttribute)
                            select m;

            if (all_async.Count() > 1)
            {
                throw new LongRunningException("Found more than one method tagged with the LongRunningMethodStarter attribute; only one is allowed");
            }

            if (all_async.Count() == 1)
            {
                MethodInfo mi = all_async.First();

                return CreateAsyncActivity(mi);
            }

            return null;
        }

        /// <summary>
        /// Creates the arg interface that will actually run the call-back.
        /// </summary>
        /// <param name="mi"></param>
        /// <returns></returns>
        private MethodArgumentInterface CreateAsyncActivity(MethodInfo asyncRunner)
        {
            ///
            /// First, do some argument checking. The first argument must be the LongRunningContext
            /// guy!
            /// 

            if (asyncRunner.GetParameters().Length < 1 || asyncRunner.GetParameters().Length > 3)
            {
                throw new ArgumentException("The long running method must have one or two parameters");
            }
            if (asyncRunner.GetParameters()[0].ParameterType != typeof(LongRunningContext))
            {
                throw new ArgumentException("The first parameter for a async long running method must be LongRunningContext.");
            }

            Type argsType = null;
            if (asyncRunner.GetParameters().Length > 1)
            {
                argsType = asyncRunner.GetParameters()[1].ParameterType;
            }

            Type contextType = null;
            if (asyncRunner.GetParameters().Length == 3)
            {
                contextType = asyncRunner.GetParameters()[2].ParameterType;
                if (contextType != asyncRunner.ReturnParameter.ParameterType)
                {
                    throw new ArgumentException("The return parameter on the activity starter must be the same type as the third argument - the context type!");
                }
            }

            ///
            /// Create the call back guy and load up the simple gather and run methods.
            /// 

            MAIAsyncWithArg result = new MAIAsyncWithArg();
            result._gather_callback = GetGatherMethodInfo(argsType);
            result._distribute_callback = GetDistributeMethodInfo(null);
            result._run_async_callback = asyncRunner;

            return result;
        }

        /// <summary>
        /// We have to collect and return arguments. The gathering of arguments is
        /// mandatory. We don't have to return them unless it is marked to.
        /// </summary>
        /// <param name="mi"></param>
        /// <returns></returns>
        private MethodArgumentInterface CreateActivityWithArg(MethodInfo mi)
        {
            MAIWithArg s = new MAIWithArg();

            s._run_callback = mi;

            Type ptype = mi.GetParameters()[0].ParameterType;
            Type rtype = mi.ReturnParameter.ParameterType;

            s._gather_callback = GetGatherMethodInfo(ptype);
            s._distribute_callback = GetDistributeMethodInfo(rtype);

            return s;
        }

        /// <summary>
        /// Find the gather method and return a pointer to it.
        /// </summary>
        /// <param name="ptype"></param>
        /// <returns></returns>
        private MethodInfo GetGatherMethodInfo(Type ptype)
        {
            var gather = from m in this.GetType().GetMethods()
                         where m.GetParameters().Length == 0 && m.ReturnParameter.ParameterType == ptype
                         from a in m.GetCustomAttributes(false)
                         where a.GetType() == typeof(LongRunningGatherArgumentsAttribute)
                         select m;

            MethodInfo gatherme = null;
            if (gather.Count() != 0)
            {
                gatherme = gather.First();
            }
            return gatherme;
        }

        /// <summary>
        /// Fetch the MethodInfo for the distribute callback.
        /// </summary>
        /// <param name="s"></param>
        /// <param name="rtype"></param>
        private MethodInfo GetDistributeMethodInfo(Type rtype)
        {
            if (rtype == typeof(void))
            {
                return null;
            }

            var dist = from m in this.GetType().GetMethods()
                       from a in m.GetCustomAttributes(false)
                       where a.GetType() == typeof(LongRunningDistributeArgumentsAttribute)
                       select m;

            if (dist.Count() != 0)
            {
                MethodInfo mi = dist.First();
                if (rtype != null)
                {
                    if (mi.GetParameters()[0].ParameterType != rtype)
                    {
                        return null;
                    }
                }
                return mi;
            }

            return null;
        }

        /// <summary>
        /// A simple Run call, with no extra work. Cool!
        /// </summary>
        /// <param name="mi"></param>
        /// <returns></returns>
        private MethodArgumentInterface CreateActivitySimple(MethodInfo mi)
        {
            MAIWithArg s = new MAIWithArg();
            s._run_callback = mi;

            Type rType = mi.ReturnParameter.ParameterType;
            s._distribute_callback = GetDistributeMethodInfo(rType);

            return s;
        }

        private static LongRunningActivityBaseService _defaultService = null;

        /// <summary>
        /// Execute the long running activity. Setup a queue to pass messages
        /// back and forth on, and then call the service to get everything up
        /// and going.
        /// </summary>
        /// <param name="executionContext"></param>
        /// <returns></returns>
        protected override ActivityExecutionStatus Execute(ActivityExecutionContext context)
        {
            ///
            /// Check that we are going to be able to do anything!
            /// 

            if (_callback == null)
            {
                throw new LongRunningException("This long running activity has no method to execute the long running task! (public, static, with the LongRunningMethod attribute)");
            }

            ///
            /// Get the base faculty. If there is none, use a defualt one.
            /// 

            LongRunningActivityBaseService factServ = context.GetService<LongRunningActivityBaseService>();
            if (factServ == null)
            {
                throw new LongRunningException("The LongRunningActivityBaseService was not added to the workflow runtime. Please call the register method!");
            }

            ///
            /// Try to gather the object that we are going to send off
            /// to the activity...
            /// 

            object activityArgs = _callback.PrepForCall(this);

            ///
            /// Create a unique queue name. We will await the final result from here.
            /// 

            string qName = this.QualifiedName + "ResultQueue";
            WorkflowQueuingService qServ = context.GetService<WorkflowQueuingService>();
            WorkflowQueue q = qServ.CreateWorkflowQueue(qName, false);
            q.QueueItemAvailable += new EventHandler<QueueEventArgs>(q_QueueItemAvailable);

            ///
            /// Great, now start off the long running service!
            /// 

            factServ.StartAnActivity(qName, _callback, activityArgs, TimesToRetry);

            ///
            /// Let the run-time know that this activity is "continued"... so this
            /// WF should become idle waiting for a message now.
            /// 

            return ActivityExecutionStatus.Executing;
        }

        /// <summary>
        /// Remove the item from the queue. This should be the finish state of the
        /// long term work item we submitted in the Execute method above.
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        void q_QueueItemAvailable(object sender, QueueEventArgs e)
        {
            ActivityExecutionContext context = sender as ActivityExecutionContext;
            if (context == null)
            {
                return;
            }

            ///
            /// Grab the result and get rid of the queue
            /// 

            WorkflowQueuingService qServ = context.GetService<WorkflowQueuingService>();
            WorkflowQueue q = qServ.GetWorkflowQueue(e.QueueName);

            object result = q.Dequeue();
            qServ.DeleteWorkflowQueue(e.QueueName);

            ///
            /// Deliver the arguments to the activity
            /// 

            if (typeof(Exception).IsInstanceOfType(result))
            {
                throw result as Exception;
            }
            _callback.FinishUpCall(this, result);

            ///
            /// Let the WF engine know that we are now done with this activity and
            /// we can move onto the next activity.
            /// 

            context.CloseActivity();
        }
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Other University of Washington
United States United States
I'm a professor of physics at the University of Washington - my field of research is particle physics. I went into this because of the intersection of physics, hardware, and computers. I've written large experiment data aquisition systems (I've done a lot of multi-thread programming). My hobby is writing tools and other things that tend to be off-shoots of work-related projects.

Comments and Discussions