Click here to Skip to main content
16,017,502 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
See more:
I am running my tasks for maxdegreeofparallelism using parallel.foreach and if there's a task taking too long time to complete then all have to wait for next iteration process.

Now, the scenario is, I have mutiple file orders processing performed by my windows service and on first phase I fetch my file orders from db, then do some manipulations over those records and then pass that list of file orders with maxdegreeofparallelism to my parallel for each tasking.

There are few files in my file orders records those have many documents like 100 or 200 and into those documents have many pages as well. So, performing tasking such that taking too much time because If an order has processed for short size file having less documents or pages will wait for others to be finished dealing with larges documents or pages.

Now I want those particular free / completed task running in parallel should get new db record for next file order and start processing that one in parallel.

Two exceptions are, the next fetched file orders shouldn't be the ones those are already processed by another tasks in parallel and number of parallel tasks processing should also not exceeded by given max degree of parallelism / maximum scheduled tasks for this service to process all file orders.

Please suggest a better solution for this.

What I have tried:

C#
/// <summary>
        /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
        /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
        /// </summary>
        /// <param name="tasksToRun">The tasks to run.</param>
        /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
        /// <param name="cancellationToken">The cancellation token.</param>
        public static Task StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
        {
            return StartAndWaitAllThrottled(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
        }

        /// <summary>
        /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
        /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
        /// </summary>
        /// <param name="tasksToRun">The tasks to run.</param>
        /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
        /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
        /// <param name="cancellationToken">The cancellation token.</param>
        public static Task StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
        {
            // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
            var tasks = tasksToRun.ToList();

            using (var throttler = new SemaphoreSlim(1,maxTasksToRunInParallel))
            {
                var postTaskTasks = new List<Task>();

                // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
                tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));

                // Start running each task.
                foreach (var task in tasks)
                {
                    // Increment the number of tasks currently running and wait if too many are running.
                    throttler.Wait(timeoutInMilliseconds, cancellationToken);
                    
                    cancellationToken.ThrowIfCancellationRequested();
                    task.Start();
                }

                // Wait for all of the provided tasks to complete.
                // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
                Task.WaitAll(postTaskTasks.ToArray(), cancellationToken);
            }
        }
Posted
Updated 6-Jul-17 23:02pm

1 solution

When doing I/O operations, it is not a good idea to use parallel processing, most of the time this will slow things down, especially when a harddisk is being used.
With newer SSD's or RAID it might work, however I would not recommend it.
 
Share this answer
 

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900