Click here to Skip to main content
15,885,546 members
Please Sign up or sign in to vote.
1.00/5 (1 vote)
See more: , +
I've reviewed the QueuedTaskScheduler and I'm very confused.
http://blogs.msdn.com/b/pfxteam/archive/2010/04/09/9990424.aspx[^]

So I wrote this class to do what I wanted.
What I wanted is a class that will queue up tasks in the background and start them in order with a lower priority. Also managing a maximum concurrency.

But also I wanted to allow for the the provided Action/Task to be executed synchronously (or not) when a maximum queue limit is reached.

Can someone tell me how this might be bad or why I should simply use a TaskScheduler?

C#
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Utilities
{

	/// <summary>
	/// Acts as a producer-consumer for tasks...
	/// </summary>
	public class TaskQueue
	{
		readonly object _syncRoot = new Object();
		readonly Queue<Action> _actions = new Queue<Action>();
		readonly List<Thread> _tasks = new List<Thread>();

		int _maxConcurrency;
		public int MaxConcurrency
		{
			get
			{
				return _maxConcurrency;
			}
			set
			{
				Contract.Requires(value > 0);
				lock (_tasks)
					_maxConcurrency = value;
			}
		}

		int _maxTasks;
		public int MaxTasks
		{
			get
			{
				return _maxTasks;
			}
			set
			{
				Contract.Requires(value > 0);
				lock (_tasks)
					_maxTasks = value;
			}
		}

		public int Count {
			get {
				return _tasks.Count + _actions.Count;
			}
		}
		// Default limits to 4.
		public TaskQueue(int maxConcurrency = 4, int maxTasks = 24)
		{
			MaxConcurrency = maxConcurrency;
			MaxTasks = Math.Max(maxConcurrency,maxTasks);
		}

		void Cycle()
		{
			if (_tasks.Count < _maxConcurrency)
			{
				lock (_syncRoot)
				{
					if (_tasks.Count < _maxConcurrency && _actions.Any())
					{
						var task = new Thread(new ThreadStart(_actions.Dequeue()));
						_tasks.Add(task);
						task.IsBackground = true;
						task.Priority = ThreadPriority.Lowest;

						Task.Factory.StartNew(() =>
						{
							task.Start();
							task.Join();
							lock (_syncRoot)
								_tasks.Remove(task);
							Cycle();
						});
					}
				}
			}
		}

		public bool Queue(Action action, bool executeSynchronousIfLimitreached = true)
		{
			if(Count < MaxTasks) {
				lock (_syncRoot)
				{
					if (Count < MaxTasks)
					{
						_actions.Enqueue(action);
						Cycle();
						return true;
					}
				}
			}

			Debug.Print("Task limit reached: " + MaxTasks);
				
			if(executeSynchronousIfLimitreached)
				action();

			return false;
		}

	}
}
Posted

1 solution

Hi,

I like what you have done however personally I feel the mix of custom thread management plus mangeing TPL tasks etc very problematic in this instance. TPL has many great features that do not require all the custom management.

Whilst I have not tried executing tasks in order of thread priority I would like to mention a couple of ideas that may help.

Using ParallelOptions you can manage the concurrency of your tasks. You can also manage the priority and other thread aspects purely from the TPL wrappers.

a good link is below:
http://aviadezra.blogspot.co.uk/2009/10/how-many-threads-tpl-concurrency.html[^]

Secondly, the idea of executing something synchronously if the max threads has been reached will not help too much. If the all cores are busy on jobs, your task is going to wait anyway until something is free. I am not sure why you would want to do this but then I might be missing something.

I would also look into the System.Collections.Concurrency stuff. Using the ConcurrentBag will be better for thread safety as this is all done for you. Also take a look at the Concurrent Stack and queue. These could help.
System.Collections.Concurrency[^]

Lastly, I must recommend the BlockingCollection. Basically what you can do here is add tasks / actions / stuff to the collection. You can then enumerate over it in parrallel. It will be thread safe and only give you the next item in the collection. This can then be parsed off for processing. The beauty of sticking with TPL is the ability to safely cancel actions that are rather problematic if relying on threads. You can also create continuations that can handle faults and clean up.

http://msdn.microsoft.com/en-us/library/dd267312.aspx[^]

So, whilst not a direct solution, I hope some of this is useful. Best of luck!
 
Share this answer
 
Comments
db7uk 5-Feb-13 16:50pm    
oh, and yes the QueuedTaskScheduler is very good and I would try to stick with that.
essence 5-Feb-13 19:21pm    
I used QueuedTaskScheduler and it looks like it's very good.
But it seemed strange to me. Also, when implemented, I saw that it reserved threads as if it was its own thread pool. Then reguardless of running or not, the "Current Queue Length" was permanently raised when reviewing Performance Monitor. :(
I will try again using it. But the next thing I wanted was if the incoming flow of tasks reaches a limit, put the queuing thread on hold until there is enough slots to allow it. Otherwise you could progressively run out of memory depending on what your are trying to do.

Thank you very much for your response.
essence 5-Feb-13 19:42pm    
So I just confirmed that when using QueuedTaskScheduler the value of "Current Queue Length" in Performance Monitor shows permanently higher than zero.
In this case, I used a maximum concurrency of 8 and the "Current Queue Length" never dropped below 24. Curiously enough, in my test, I statically included the class, but I never actually queued any tasks!

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900