Click here to Skip to main content
15,867,453 members
Articles / Desktop Programming / WPF

A Generic Class for Wrapping Asynchronous Begin/End Operations, Using Reactive Extensions for .NET (Rx)

Rate me:
Please Sign up or sign in to vote.
4.65/5 (13 votes)
3 Feb 2010CPOL4 min read 67.3K   42   19
This article presents a reusable class and technique for easily doing Asynchronous Programming using the Begin/End Pattern and the new Reactive Extensions for .NET (Rx) library.

Introduction

One of the patterns you will come across as a .NET programmer is the Begin/End pattern for making asynchronous function calls. The purpose of the pattern is to allow long-running operations to execute on a different thread than the calling thread, leaving the calling thread free (non-blocked) to continue execution. This is an important technique for building responsive GUIs, as well as for making remote calls effectively (whether you are calling a WCF service, using .NET remoting, accessing some REST-based Web Service, etc.). In case you haven't seen it before, it looks like this:

C#
IAsyncResult BeginOperation(...some number of parameters as input, 
             AsyncCallback callback, object state);
SomeResult EndOperation(IAsyncResult);

While the BeginInvoke/EndInvoke pattern is powerful, it is awkward and non-intuitive to work with, especially if the EndOperation portion returns a value. If you look at the pattern above, you can see that the Begin portion takes in a callback delegate and an object "state", and returns an IAsyncResult. It is not readily apparent what you are supposed to pass in as the callback or the state, and it is not immediately clear what you are supposed to do with the IAsyncResult you get upon calling the function! Most importantly, it is not apparent how to get the result of the EndOperation.

To make it easier to use the Begin/End pattern, Microsoft recommends an approach called the Event-based Asynchronous Pattern. While this particular pattern is an improvement, we can do one step better using the new Reactive Extensions for .NET (Rx), or Reactive LINQ, library.

Background

Using the code

In order to use the wrapper, simply create a new instance of the generic type AsyncPatternWrapper. You must use at least one generic type parameter - that parameter will represent what will be returned in the EndOperation. If the BeginOperation takes in additional parameters (besides the callback and state parameters), then you must pass in additional generic parameters.

For example, if the BeginOperation takes in a string, and the EndOperation returns an int, then you would declare a new instance of AsyncPatternWrapper<string, int>(BeginOperation, EndOperation).

The constructor for any of the AsyncPatternWrapper classes takes in two parameters - the BeginOperation, and its corresponding EndOperation.

The AsyncPatternWrapper classes implement the interface IObservable<TResult>, where TResult is the result of the EndOperation call. IObservable is part of the new Reactive Extensions for .NET library.

Aside from implementing IObservable, the AsyncPatternWrapper only has one function - Invoke(). All that does is call the BeginOperation. When that completes, another result will come in on the IObservable stream. As long as you have subscribed to the IObservable stream (via the "Subscribe" function), you will automatically process the results of the EndOperation as they complete.

Example 1 (Consuming a REST Web Service)

C#
WebRequest request = HttpWebRequest.Create(
  @"http://services.digg.com/containers?appkey=http%3A%2F%2Fapidoc.digg.com");
var wrapper = new AsyncPatternWrapper<WebResponse>(
  request.BeginGetResponse, request.EndGetResponse);

wrapper.Subscribe(webResponse => Console.WriteLine(webResponse.ContentLength));
wrapper.Invoke();

The cool thing about the new Reactive Extensions library (which used to be called Reactive LINQ), is that you can use the standard LINQ querying operators over the stream of events. So in Example 1 above, instead of printing the ContentLength of every WebResponse that comes in, you can only print out the WebResponses where the ContentLength is greater than 3000, for example:

C#
wrapper
    .Where(webResponse => webResponse.ContentLength > 3000)
    .Subscribe(webResponse => Console.WriteLine(webResponse.ContentLength));

Example 2 (Consuming a WCF Service)

C#
// Some WCF service interface
[ServiceContract]
public interface IService
{
    [OperationContract(AsyncPattern = true)]
    IAsyncResult BeginGetCustomers(AsyncCallback callback, object state);

    List<Customer> EndGetCustomers(IAsyncResult result);
}

// Code snippet - illustrates consumption of the WCF service on the client-side
            
var cf = new ChannelFactory<IService>(new BasicHttpBinding(), 
         new EndpointAddress(@"http://localhost:8085"));
var service = cf.CreateChannel();

var wrapper = new AsyncPatternWrapper<List<Customer>>(
                  service.BeginGetCustomers, service.EndGetCustomers);
wrapper.ObserveOnDispatcher().Subscribe(customers => UpdateUI(customers));

// refreshButton is some button on the UI - every time you
// click it, you will asynchronously refresh the list of customers:
refreshButton.Click += (s,e) => wrapper.Invoke();

Notice that in Example 2 above, you don't have to switch back to the UI thread via the Dispatcher in order to update the UI! As a result, your code ends up being much cleaner and much more readable.

The other takeaway from this is that instead of having to rely on the WCF proxy generator, you can very easily roll your own proxy by using the AsyncPatternWrapper class. This is invaluable for times when the WCF service doesn't expose a .svc file, or in cases where there can't possibly be one, such as WCF REST.

AsyncPatternWrapper

This is the source code for AsyncPatternWrapper for no arguments in the Begin operation, and for one argument in the Begin operation. See the attached zipped file for AsyncPatternWrapper classes all the way up to four arguments in the Begin operation.

C#
using System;
using System.Linq;

namespace System
{
    #region No arguments

    public class AsyncPatternWrapper<TResult> : IObservable<TResult>
    {
        Func<AsyncCallback, object, IAsyncResult> beginOp;
        Func<IAsyncResult, TResult> endOp;
        IObservable<TResult> observable;
        event EventHandler<EventArgs<TResult>> done;

        public AsyncPatternWrapper(
            Func<AsyncCallback, object, IAsyncResult> beginOp,
            Func<IAsyncResult, TResult> endOp)
        {
            this.beginOp = beginOp;
            this.endOp = endOp;

            observable = Observable.FromEvent<EventArgs<TResult>>(
                e => this.done += e, e => this.done -= e)
                .Select(s => s.EventArgs.Item);
        }

        public void Invoke()
        {
            IAsyncResult result = null;
            result = beginOp(
                new AsyncCallback(notUsed =>
                {
                    try
                    {
                        var res = endOp(result);
                        if (done != null)
                            done(null, new EventArgs<TResult> { Item = res });
                    }
                    catch (Exception)
                    {
                        // Don't raise an event if there was an exception
                    }
                }),
                null);
        }

        #region IObservable<TResult> Members

        public IDisposable Subscribe(IObserver<TResult> observer)
        {
            return observable.Subscribe(observer);
        }

        #endregion
    }

    #endregion


    #region One argument

    public class AsyncPatternWrapper<T, TResult> : IObservable<TResult>
    {
        Func<T, AsyncCallback, object, IAsyncResult> beginOp;
        Func<IAsyncResult, TResult> endOp;
        IObservable<TResult> observable;
        event EventHandler<EventArgs<TResult>> done;

        public AsyncPatternWrapper(
            Func<T, AsyncCallback, object, IAsyncResult> beginOp,
            Func<IAsyncResult, TResult> endOp)
        {
            this.beginOp = beginOp;
            this.endOp = endOp;

            observable = Observable.FromEvent<EventArgs<TResult>>(
                e => this.done += e, e => this.done -= e)
                .Select(s => s.EventArgs.Item);
        }

        public void Invoke(T param)
        {
            IAsyncResult result = null;
            result = beginOp(
                param,
                new AsyncCallback(notUsed =>
                {
                    try
                    {
                        var res = endOp(result);
                        if (done != null)
                            done(null, new EventArgs<TResult> { Item = res });
                    }
                    catch (Exception)
                    {
                        // Don't raise an event if there was an exception
                    }
                }),
                null);
        }

        public IDisposable Subscribe(IObserver<TResult> observer)
        {
            return observable.Subscribe(observer);
        }
    }

    #endregion

    internal class EventArgs<T> : EventArgs
    {
        public T Item { get; set; }
    }
}

Note in the code above that exceptions are just discarded - if you find it more useful to re-throw the exception, or alternatively to expose another IObservable stream dedicated solely to exceptions, it's an easy enough change.

I hope you found this article useful, and enjoyed reading it!

History

  • February 2, 2010 - Version 1.0.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



Comments and Discussions

 
QuestionSource code unavailable? Pin
Michael D Good20-Aug-10 4:35
Michael D Good20-Aug-10 4:35 
AnswerRe: Source code unavailable? Pin
User 467791621-Aug-10 6:16
User 467791621-Aug-10 6:16 
GeneralRe: Source code unavailable? Pin
scnec22-Dec-10 10:01
scnec22-Dec-10 10:01 
GeneralRe: Source code unavailable? Pin
rhubka7-Mar-11 11:12
professionalrhubka7-Mar-11 11:12 
GeneralRe: Source code unavailable? Pin
jradxl326-Mar-12 23:39
jradxl326-Mar-12 23:39 
GeneralRe: Source code unavailable? Pin
User 467791626-Mar-12 23:44
User 467791626-Mar-12 23:44 
GeneralRe: Source code unavailable? Pin
stixoffire13-May-15 10:39
stixoffire13-May-15 10:39 
Generalsource code archive is empty/broken Pin
alexanderdev28-Jul-10 6:34
alexanderdev28-Jul-10 6:34 
General[My vote of 1] ... or just use Observable.FromAsyncPattern static function Pin
Selvin7-Feb-10 23:59
Selvin7-Feb-10 23:59 
GeneralRe: [My vote of 1] ... or just use Observable.FromAsyncPattern static function [modified] Pin
User 46779168-Feb-10 9:26
User 46779168-Feb-10 9:26 
GeneralRe: [My vote of 1] ... or just use Observable.FromAsyncPattern static function [modified] Pin
Selvin8-Feb-10 23:22
Selvin8-Feb-10 23:22 
GeneralRe: [My vote of 1] ... or just use Observable.FromAsyncPattern static function Pin
User 46779169-Feb-10 9:06
User 46779169-Feb-10 9:06 
GeneralRe: [My vote of 1] ... or just use Observable.FromAsyncPattern static function Pin
Miguel Hasse de Oliveira14-Feb-12 0:05
professionalMiguel Hasse de Oliveira14-Feb-12 0:05 
GeneralGreat work and a request... Pin
infitude4-Feb-10 16:20
infitude4-Feb-10 16:20 
Generalwcf example Pin
exponity3-Feb-10 2:35
professionalexponity3-Feb-10 2:35 
GeneralRe: wcf example Pin
User 46779163-Feb-10 18:49
User 46779163-Feb-10 18:49 
GeneralGood post - great topic Pin
alex turner3-Feb-10 2:01
alex turner3-Feb-10 2:01 
GeneralRe: Good post - great topic Pin
User 46779163-Feb-10 18:50
User 46779163-Feb-10 18:50 
GeneralRe: Good post - great topic Pin
alex turner4-Feb-10 0:09
alex turner4-Feb-10 0:09 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.