Click here to Skip to main content
15,885,767 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
Hi All,

I have a question regarding cancellation of tasks. Please read below for the explanation. Any suggestion would help me solve the problem.

I have 3 tasks. Lets call them task1, task2, & task3. Each of them are independent except for the data. i.e. task1 puts data into a queue1 and task2 reads it. task2 puts some other processed data into queue2 and task3 reads it. task3 does some final work. This chain should continue till task1 completes successfully. Upon completion of task1 and after processing their respective queues task2 and task3 should run into successful completion. If for any reason a task stops in-between (say due to an exception in the method the task is running), then the other two tasks should also stop immediately.

For successful completion I am using an integer variable. I increment it when task1 completes. The other 2 tasks read it and exit smoothly after emptying their own queues.

For erroneous exits I can use the same integer variable. But I am looking at a better solution so that code looks cleaner.

See the code below:

I have simulated error by throwing exception in Method2. After this error occurs, task2 stops, task1 runs to completion and task3 infinitely waits for more data in the queue thus blocking the application to finish.

Is there a way to notify other tasks when a task has faulted/completed?
E.g.
1. Notify task2 and task3 when task1 faults/completes
2. Notify task1 and task3 when task2 faults/completes
3. Notify task1 and task2 when task3 faults/completes

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Drawing;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
using System.Xml.Linq;
using System.Xml.XPath;
using System.Diagnostics;

namespace Designer.UpdateManager.Dependency
{
	public class StringDependencyCalculator
	{
		#region Fields
        private ConcurrentQueue<int> queue1 = new ConcurrentQueue<int>();
		private ConcurrentQueue<string> queue2 = new ConcurrentQueue<string>();
		private int foundAllDependencies = 0;
		private int convertedAllDependencies = 0;
		private int maxLimit;
		#endregion

        #region Constructor

        public static void Main(string[] args)
		{
			if(args.Length != 1)
			{
				Console.WriteLine("Invalid no. of arguments");
				Console.WriteLine("Usage: TaskParallelization.exe <maxLimit>");
			}

			int maxLimit = 0;
			if(int.TryParse(args[0], out maxLimit))
			{
				StringDependencyCalculator sdc = new StringDependencyCalculator();
				sdc.CalculateDependency(maxLimit);
			}
			else
			{
				Console.WriteLine("Invalid no. of arguments");
				Console.WriteLine("Usage: TaskParallelization.exe <maxLimit>");
			}
		}

		#endregion

        #region Methods

        public void CalculateDependency(int maxLimit)
		{
			this.maxLimit = maxLimit;
			Stopwatch stopWatch = new Stopwatch();
			stopWatch.Start();

			try
			{
				Task[] tasks = new Task[]
				{
					Task.Factory.StartNew(() => Method1()),
                    Task.Factory.StartNew(() => Method2()),
                    Task.Factory.StartNew(() => Method3())
				};

				Task.WaitAll(tasks);
			}
			catch (AggregateException ex)
			{
				StringBuilder messageStringBuilder = new StringBuilder();
				foreach (Exception exception in ex.InnerExceptions)
				{
					messageStringBuilder.AppendLine(exception.Message);
				}

				Console.WriteLine(messageStringBuilder.ToString());
			}

			finally
			{
				stopWatch.Stop();
				Console.WriteLine("Elapsed time - " + (stopWatch.ElapsedMilliseconds / 1000) + "s");
				stopWatch = null;
			}
		}

		private void Method1()
		{
			Random r = new Random();

			for (int i = 0; i < maxLimit; i++)
			{
				queue1.Enqueue(i * r.Next(100));
			}

			Interlocked.Increment(ref foundAllDependencies);
		}

		private void Method2()
		{
			for (int i = 0; i < maxLimit; i++)
			{
				int dequedItem = 0;
				while (!queue1.TryDequeue(out dequedItem))
				{
					// Spin to find any element.
					if (foundAllDependencies > 0 && !queue1.Any())
						break;
				}

				queue2.Enqueue(dequedItem.ToString());

				// Simulate erroneous condition
				if(i == 10)
					throw new InvalidDataException("10 is not a valid value");
			}

			Interlocked.Increment(ref convertedAllDependencies);
		}

		private void Method3()
		{
			for (int i = 0; i < maxLimit; i++)
			{
				string dequedItem = string.Empty;
				while (!queue2.TryDequeue(out dequedItem))
				{
					// Spin to find any element.
					if (foundAllDependencies > 0 && convertedAllDependencies > 0 && !queue2.Any())
						break;
				}

				if(string.IsNullOrEmpty(dequedItem))
					continue;

				Console.WriteLine(dequedItem);
			}
		}
		
		#endregion
    }
}
Posted

1 solution

Hi,

What you are trying to do can be done. But it cannot be answered in this quick answers section. It is a pretty vast topic. Here are some articles that help you understand the concept and implement a solution.

Process task based on the outcome of the previous task
http://blogs.msdn.com/b/pfxteam/archive/2012/08/02/processing-tasks-as-they-complete.aspx[^]

Chain Multiple Tasks with Continuations
https://msdn.microsoft.com/en-us/library/dd537612(v=vs.110).aspx[^]

Task cancellation
http://blogs.msdn.com/b/csharpfaq/archive/2010/07/19/parallel-programming-task-cancellation.aspx[^]
 
Share this answer
 
Comments
vinayvraman 26-May-15 0:07am    
Hi Mathi Mani, thank you for the links. I have implemented the task cancellation by using the ContinueWith and a CancellationTokenSource.

<pre lang="c#">
private static void CheckIfOtherTasksNeedToBeCancelled(Task task, CancellationTokenSource cancellationTokenSource)
{
if (task.IsFaulted)
{
cancellationTokenSource.Cancel();
throw task.Exception.InnerException;
}
}

// The task initialization has changed to
Task[] tasks = new Task[]
{
Task.Factory.StartNew(() => Method1(), taskCancellationSource.Token)
.ContinueWith(t => CheckIfOtherTasksNeedToBeCancelled(t, taskCancellationSource)),
Task.Factory.StartNew(() => Method2(), taskCancellationSource.Token)
.ContinueWith(t => CheckIfOtherTasksNeedToBeCancelled(t, taskCancellationSource)),
Task.Factory.StartNew(() => Method3(), taskCancellationSource.Token)
.ContinueWith(t => CheckIfOtherTasksNeedToBeCancelled(t, taskCancellationSource))
};
</pre>

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900