Click here to Skip to main content
15,504,830 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
See more: , +
Hi
So I am new to programming using the .Net TPL Framework and multi-threading in general. I have googled up some tutorials and articles and put together the solution below.

It is a simple multithreaded queue and a scheduler that polls the queue and processes each item as a seperate task.

It would be great if you'll can comment on the solution and point out areas where I am doing things wrong or how it can be done more efficiently.

C#
//Console app that starts the Queue and Scheduler
class Program
    {        
        static void Main(string[] args)
        {
            Console.WriteLine("Job Scheduler Started. Press Ctrl-C to end");

            Parallel.Invoke(
                () => clJobQueue.Instance.Populate(),
                () => clJobRunner.Instance.Start()
                );

            var autoResetEvent = new AutoResetEvent(false);
            Console.CancelKeyPress += (sender, eventArgs) =>
            {
                eventArgs.Cancel = true;
                autoResetEvent.Set();
            };

            autoResetEvent.WaitOne();
			
            Stop();
            Console.WriteLine("Job Scheduler Shutting Down");
            Console.ReadLine();
        }
        
        private static void Stop()
        {            
            Parallel.Invoke(
             () => clJobQueue.Instance.CloseQueue(),
             () => clJobRunner.Instance.Stop()
             ); 
        }
    }


C#
//Singelton pattern Queue using a BlockingCollection.
//Contains a Populate method that polls a db and retreives new items
public sealed class clJobQueue
{
	private static volatile clJobQueue instance;
	private static volatile BlockingCollection<clJob> queue;
	private static volatile CancellationTokenSource cts;

	private static object syncRoot = new Object();

	static clJobQueue() { }
	private clJobQueue() { }

	public static clJobQueue Instance
	{
		get
		{
			if (instance == null)
			{
				lock (syncRoot)
				{
					if (instance == null)
					{
						instance = new clJobQueue();
						queue = new BlockingCollection<clJob>(10);
						cts = new CancellationTokenSource();
					}
				}
			}

			return instance;
		}
	}

	public bool Add(clJob jobItem)
	{
		queue.Add(jobItem, cts.Token);
		jobItem.setJobStatus(JobStatus.Queued);
		jobItem.JobQueue = this;
		return true;
	}

	public clJob GetNextJob()
	{
		try
		{
			clJob nextJob = queue.Take(cts.Token);
			nextJob.setJobStatus(JobStatus.Scheduled);
			return nextJob;
		}
		catch (Exception ex)
		{
			Console.WriteLine("Cancelling Queue");
		}
		return null;
	}

	public void CloseQueue()
	{
		cts.Cancel();
		Console.WriteLine("Closing Queue");
		while (!IsEmpty)
		{
			clJob job = queue.Take();
			job.setJobStatus(JobStatus.Scheduled);
		}
	}

	public int Count
	{
		get { return queue.Count; }
	}

	public bool IsEmpty
	{
		get { return Count <= 0; }
	}

	public void Populate()
	{
		Parallel.Invoke(
                () => {
                    while (!cts.IsCancellationRequested)
                    {
                        Console.WriteLine("Looking for X Job");
                        clJob job = clXJob.getJob();
                        if (job.JobID == 0)
                        {
                            Console.WriteLine("No X Jobs in DB, Sleeping for 60s");
                            Thread.Sleep(TimeSpan.FromSeconds(60));
                        }
                        else
                        {
                            Console.WriteLine("Adding X Job to Queue - {0} {1}", job.JobID, job.JobName);
                            Add(job);
                        }
                    }
                },
            () => {
                while (!cts.IsCancellationRequested)
                {
                    Console.WriteLine("Looking for Y Job");
                    clJob job = clYJob.getJob();
                    if (job == null || job.JobID == 0)
                    {
                        Console.WriteLine("No Y Jobs in DB, Sleeping for 60s");
                        Thread.Sleep(TimeSpan.FromSeconds(60));
                    }
                    else
                    {
                        Console.WriteLine("Adding Y Job to Queue - {0} {1}", job.JobID, job.JobName);
                        Add(job);
                    }
                }
            });
	}
}


C#
//Polls the queue for items and initiates the items Run() method as a new Task
public sealed class clJobRunner
    {
        private static volatile clJobRunner instance;
        private static volatile CancellationTokenSource cts;

        static clJobRunner() { }
        private clJobRunner() { }

        public static clJobRunner Instance
        {
            get
            {
                if (instance == null)
                {
                    instance = new clJobRunner();
                    cts = new CancellationTokenSource();
                }

                return instance;
            }
        }

        public void Start()
        {
            Task.Run(() =>
            {
                while (!cts.IsCancellationRequested)
                {
                    clJob job;
                    if ((job = clJobQueue.Instance.GetNextJob()) == null)
                    {
                        Console.WriteLine("Queue Empty, Sleeping for 30s");
                        Thread.Sleep(TimeSpan.FromSeconds(30));
                        continue;
                    }

                    Console.WriteLine("Running Job - {0} {1}", job.JobID, job.JobName);
                    Task.Run(() => job.Run());
                }
                Console.WriteLine("CancellationRequested!");
            }, cts.Token);
        }

        public void Stop()
        {
            cts.Cancel();
            Console.WriteLine("Stopping Job Runner");
        }


Thanks
Mobin
Posted
Comments
[no name] 7-Oct-14 12:19pm    
we are not clear where is your doubt

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900