|
using System;
using System.ComponentModel;
using System.Data.Entity;
using System.Diagnostics;
using System.Linq;
using System.Runtime.Remoting.Messaging;
using System.Threading;
namespace CodeProject
{
public sealed class EntityPoller<T>
where T : class
{
private readonly DbContext _context;
private readonly Func<T, bool> _query;
private readonly object _sync = new object();
private bool _polling;
public EntityPoller(DbContext context, Func<T, bool> query)
{
_context = context;
_query = query;
IdleCycle = 10000;
ActiveCycle = 1000;
RetryAttempts = 5;
RetryInitialInterval = 10000;
}
public int IdleCycle { get; set; }
public int ActiveCycle { get; set; }
public int RetryAttempts { get; set; }
public int RetryInitialInterval { get; set; }
private T Poll()
{
var set = _context.Set<T>();
T entity = null;
try
{
while (true)
{
entity = Retry(() => set.FirstOrDefault(_query),RetryAttempts,RetryInitialInterval);
if (entity != null) break;
Thread.Sleep(IdleCycle);
}
Thread.Sleep(ActiveCycle);
}
catch (Exception ex)
{
Stop();
if (Error != null)
Error.Invoke(ex);
}
return entity;
}
private void PollComplete(IAsyncResult asyncResult)
{
var worker = (AsyncWorker) ((AsyncResult) asyncResult).AsyncDelegate;
var entity = worker.EndInvoke(asyncResult);
var operation = (AsyncOperation) asyncResult.AsyncState;
lock (_sync)
{
_polling = false;
}
if (entity != null)
((AsyncComplete)operation.UserSuppliedState).Invoke(entity);
}
private delegate T AsyncWorker();
private delegate void AsyncComplete(T entity);
public delegate void EntityReceivedArgs(T entity);
public event EntityReceivedArgs EntityReceived;
public delegate void PollerErrorArgs(Exception ex);
public event PollerErrorArgs Error;
public void Start()
{
var worker = new AsyncWorker(Poll);
var completed = new AsyncCallback(PollComplete);
var onCompleted = new AsyncComplete(OnEntity);
lock (_sync)
{
if (_polling)
throw new InvalidOperationException("Already polling");
var operation = AsyncOperationManager.CreateOperation(onCompleted);
worker.BeginInvoke(completed, operation);
_polling = true;
}
}
public void Stop()
{
lock (_sync)
{
_polling = false;
}
}
private void OnEntity(T entity)
{
if (EntityReceived != null)
EntityReceived(entity);
Start();
}
private static T Retry(Func<T> action, int attempts = 5, int initialInterval = 10000)
{
if (action == null)
throw new ArgumentNullException("action");
for (int i = 1; i <= attempts; i++)
{
try
{
T result = action.Invoke();
return result;
}
catch (Exception)
{
if (i >= attempts) throw;
Thread.Sleep(initialInterval);
}
initialInterval *= 5;
}
return null;
}
}
}
|
By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.
If a file you wish to view isn't highlighted, and is a text file (not binary), please
let us know and we'll add colourisation support for it.
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.