Click here to Skip to main content
Click here to Skip to main content
Go to top

Async database polling with EntityFramework 5

, 30 Oct 2012
Rate this:
Please Sign up or sign in to vote.
How to asynchronously poll a database for an Entity with EntityFramework 5

Introduction 

I tasked my self with developing a generic Entity Framework based, asynchronous poller with the following requirements:

  • Must poll for a specific entity type which has been configured within a DbContext 
  • Must except a lambda expression in the form of a poll query
  • Must implement a exponential back off retry pattern for connection failures 
  • Must run asynchronously and be event based  

Background  

Many of us have had to use some kind of polling mechanism in our code as it's generally required to listen for changes/new entries and do something when they arrive. Pollers generally range from the basic while() loop with a Thread.Sleep to elaborate complex async Observer patterns and SqlDependency code... 

 I've been working a lot with the Entity Framework lately and I had a requirement to poll the database for records within a table (Entity) that has a specific StatusId. I decided to write a generic poller which is very easy to use and allows a developer to specify a lambda expression to asynchronously query the database: 

Using the code 

To use the attached EntityPoller<T> simply write the following code:  

var poller = new EntityPoller<Notification>(context, q => q.StatusId == (int) Status.New);

poller.ActiveCycle = 200; //milliseconds
poller.IdleCycle = 10000; //10 seconds
poller.RetryAttempts = 5;
poller.RetryInitialInterval = 10000; //10 seconds

poller.EntityReceived += OnNotification;
poller.Error += OnError;

poller.Start();

Code Breakdown  

The poll

First of all, you can see below that the constructor is excepting two parameters, the first is your DbContext which must contain an entity set (DbSet<>) of type T, the second is your lambda expression which returns bool.  

public sealed class EntityPoller<T>
        where T : class
{
    private readonly DbContext _context;
    private readonly Func<T, bool> _query;
    private readonly object _sync = new object();
    private bool _polling;

    public EntityPoller(DbContext context, Func<T, bool> query)
    {
        _context = context;
        _query = query;

        IdleCycle = 10000;
        ActiveCycle = 1000;
        RetryAttempts = 5;
        RetryInitialInterval = 10000;
    }

    public int IdleCycle { get; set; }
    public int ActiveCycle { get; set; }
    public int RetryAttempts { get; set; }
    public int RetryInitialInterval { get; set; }   

The poll consists  of a main loop which will run continuously until an entity is returned. For each cycle of the loop there is a pause (Thread.Sleep) the duration of the pause depends upon the result of the defined query. If the query returns an entity then the pause uses ActiveCycle, otherwise it will use IdleCycle. This is by design to allow entity notifications to be evented out of the object in a controlled manner. If you want new entities to be evented out as fast as possible then set ActiveCycle to 0.

private T Poll()
{
    var set = _context.Set<T>();
    T entity = null;

    try
    {

        while (_polling)
        {
            entity = Retry(() => set.FirstOrDefault(_query),RetryAttempts,RetryInitialInterval);
            if (entity != null) break;

            Thread.Sleep(IdleCycle);
        }

        Thread.Sleep(ActiveCycle);

    }
    catch (Exception ex)
    {
        Stop();

        if (Error != null)
            Error.Invoke(ex);
    }

    return entity;
}

The poll method is invoked by using an AsyncWorker delegate and an AsyncOperation. I won't cover the async code in this article but the code is all included in the source!

Retry Pattern  

As you may have noticed in the above code snippet, the query Func<t,bool> is wrapped within a Retry method. The Retry pattern will run the query RetryAttempts times starting with RetryInitialInterval in milliseconds and increasing this exponentially by the power of  5. This is just what I use as a default but you may wish to alter this logic to suit your needs. The Retry function is as follows:

private static T Retry(Func<T> action, int attempts = 5, int initialInterval = 10000)
{
    if (action == null)
        throw new ArgumentNullException("action");


    for (int i = 1; i <= attempts; i++)
    {
        try
        {
            T result = action.Invoke();
            return result;
        }
        catch (Exception)
        {
            if (i >= attempts) throw;
            Thread.Sleep(initialInterval);
        }

        initialInterval *= 5;
    }

    return null;
}

And that's it! It works like a dream for me. If anyone uses this and would like to comment or ask questions/make suggestions then please feel free. 

Thanks for reading.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Tom Cook (UK)
Software Developer (Senior) Aesha Systems Ltd
United Kingdom United Kingdom
No Biography provided
Follow on   Twitter

Comments and Discussions

 
QuestionUsing this code in a project PinmemberMember 1091500224-Jul-14 5:07 
SuggestionLittle enhacement PinmemberWonde Tadesse11-Nov-12 10:30 
First of all Have 5 for Good work.Thumbs Up | :thumbsup: Here are couple of suggestion.
1. The entity polling process works indefinitely which I believe it should stop after a certain attempt.
2. Since you provided a query expression for poller, it's better to have a capablity to return generic collection with type T to the user. This is because some queries may not return a single record.
 
public sealed class EntitiesPoller<T>
             where T : class
    {
        private readonly DbContext _context;
        private readonly Func<T, bool> _query;
        private readonly object _sync = new object();
        private bool _polling;
        private static int reAttemptCompleteIndicator = 0;
 
        public EntitiesPoller(DbContext context, Func<T, bool> query)
        {
            _context = context;
            _query = query;
 
            IdleCycle = 10000;
            ActiveCycle = 1000;
            RetryAttempts = 5;
            RetryInitialInterval = 10000;
        }
 
        public int IdleCycle { get; set; }
        public int ActiveCycle { get; set; }
        public int RetryAttempts { get; set; }
        public int RetryInitialInterval { get; set; }
        
        private List<T> Poll()
        {
            var set = _context.Set<T>();
            List<T> entities = null;
 
            try
            {
 
                while (true)
                {
                    if (reAttemptCompleteIndicator == RetryAttempts)
                    {
                        OnReAttempt("ReAttempt completed!");
                        break;
                    }
                    entities = Retry(() => set.Where(_query).ToList(), RetryAttempts, RetryInitialInterval);
                    if (entities != null) break;
 
                    Thread.Sleep(IdleCycle);
                }
                Thread.Sleep(ActiveCycle);
 
            }
            catch (Exception ex)
            {
                Stop();
 
                if (Error != null)
                    Error.Invoke(ex);
            }
 
            return entities;
        }
 
        private void PollComplete(IAsyncResult asyncResult)
        {
            var worker = (AsyncWorker)((AsyncResult)asyncResult).AsyncDelegate;
            var entities = worker.EndInvoke(asyncResult);
            var operation = (AsyncOperation)asyncResult.AsyncState;
 
            lock (_sync)
            {
                _polling = false;
            }
 
            if (entities != null)
                ((AsyncComplete)operation.UserSuppliedState).Invoke(entities);
        }
 
        private delegate List<T> AsyncWorker();
        private delegate void AsyncComplete(List<T> entities);
 
        public delegate void EntitiesReceivedArgs(List<T> entities);
        public event EntitiesReceivedArgs EntitiesReceived;
 
        public delegate void ReAttemptCompletedArgs(string message);
        public event ReAttemptCompletedArgs ReAttemptCompleted;
 
        public delegate void PollerErrorArgs(Exception ex);
        public event PollerErrorArgs Error;
 
        public void Start()
        {
            var worker = new AsyncWorker(Poll);
            var completed = new AsyncCallback(PollComplete);
            var onCompleted = new AsyncComplete(OnEntities);
 
            lock (_sync)
            {
                if (_polling)
                    throw new InvalidOperationException("Already polling");
 
                var operation = AsyncOperationManager.CreateOperation(onCompleted);
                worker.BeginInvoke(completed, operation);
                _polling = true;
            }
        }
 
        public void Stop()
        {
            lock (_sync)
            {
                _polling = false;
            }
        }
 
        private void OnEntities(List<T> entities)
        {
            if (EntitiesReceived != null)
                EntitiesReceived(entities);
 
            Start();
        }
 
        private void OnReAttempt(string message)
        {
            if (ReAttemptCompleted != null)
                ReAttemptCompleted(message);
 
        }
 

        private static List<T> Retry(Func<List<T>> action, int attempts = 5, int initialInterval = 10000)
        {
            if (action == null)
                throw new ArgumentNullException("action");
            reAttemptCompleteIndicator++;
            for (int i = 1; i <= attempts; i++)
            {
                try
                {
                    List<T> result = action.Invoke();
                    return result;
                }
                catch (Exception)
                {
                    if (i >= attempts) throw;
                    Thread.Sleep(initialInterval);
                }
 
                initialInterval *= 5;
            }
 

            return null;
        }
    }
 
And the example is based on NorthWind database.
 
 class Program
    {
        static void Main()
        {
            entitiesPoller();
 
            Console.ReadLine();
        }
 
        private static void entitiesPoller()
        {
            var cus = new EntitiesPoller<Customer>(new NorthwindEntities(), c => c.Country.Equals("USA"));
            cus.EntitiesReceived += cus_EntitiesReceived;
            cus.ReAttemptCompleted += cus_ReAttemptCompleted; ;
            cus.Error += cus_Error;
            cus.RetryAttempts = 2;
            cus.Start();
        }
 
        static void cus_EntitiesReceived(List<Customer> entities)
        {
            int index = 1;
            foreach (var entity in entities)
            {
                Console.WriteLine(string.Format("{0}. CustomerID - {1}  CustomerName - {2}", index.ToString(), entity.CustomerID, entity.ContactName));
                index++;
                Thread.Sleep(1000);
            }
            Console.WriteLine();
        }
 
        static void cus_ReAttemptCompleted(string message)
        {
            Console.WriteLine(message);
        }
 
        static void cus_Error(Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
}
Wonde Tadesse

GeneralRe: Little enhacement PinmemberTom Cook (UK)19-Nov-12 1:27 
QuestionMissing code PinmemberCheloXL30-Oct-12 5:22 
AnswerRe: Missing code PinmemberTom Cook (UK)30-Oct-12 5:37 
GeneralRe: Missing code PinmemberCheloXL30-Oct-12 7:09 
GeneralRe: Missing code PinmemberTom Cook (UK)30-Oct-12 9:57 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web02 | 2.8.140916.1 | Last Updated 31 Oct 2012
Article Copyright 2012 by Tom Cook (UK)
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid