Click here to Skip to main content
Click here to Skip to main content

Parallel Recursive Methods using the Concurrency and Coordination Runtime

, 11 Oct 2008
Rate this:
Please Sign up or sign in to vote.
A multi threaded directory size implementation using the CCR

Introduction

This article shows the usage of the Concurrency and Coordination Runtime (CCR) to calculate directory size using an asynchronous recursive method.

Below are the results of calculating the size of a network drive. The network drive I/O latency emphasizes the advantages of parallel execution:

directorysize_networkdrive_results.png

Recursion

Calculating directory size recursively is straightforward:

static long SerialRecursiveDirectorySize(string path)
{
   return Directory.GetFiles(path).Sum(f => new FileInfo(f).Length) +
          Directory.GetDirectories(path).Sum(SerialRecursiveDirectorySize);
}

The LINQ query calculates the size of each file and the size of each sub directory (recursively) and then sums everything together.

Parallel Recursion

The Concurrency and Coordination Runtime (CCR) allows the execution of methods in different threads by declaring the relationships between the methods. For more information and links, see the previous article Pipes and Filters concurrent design pattern using the Concurrency and Coordination Runtime.

long ParallelDirectorySize(string path)
{
   using (Dispatcher dispatcher = new Dispatcher(0, "CCR Threads"))
   {   
      DispatcherQueue queue = new DispatcherQueue
				("Pipeline DispatcherQueue", dispatcher);
                
      var outputPort = new Port<long />();
      ParallelDirectorySizeRecursive(path,queue,outputPort);

      return queue.WaitOnPort(outputSizePort);
    }
}

The dispatcher is the object that holds a thread pool. The dispatcher queue holds the list of pending delegates that can be executed immediately and are waiting for a thread to become available.

The ParallelDirectorySizeRecursive method calculates the size of the directory by enqueuing tasks in the dispatcher queue. When the computation is complete, it posts the result to the outputPort.
A port is essentially two queues. The first queue holds (data) messages and the second queue holds methods interested in processing these messages. The WaitOnPort is an extension method (not part of the CCR) that blocks until a message is received and returns the received value.

The asynchronous recursive implementation enqueues the following tasks:

  • Task that sums the total file size in the directory.
    This task is scheduled for immediate execution. Once it completes, it posts the result to inputPort.
  • Each recursive call enqueues a task that posts the sub directory size to inputPort. The recursive call is invoked subDirectories.Length times.
  • Task that sums all of the results in inputPort and posts it to outputPort.
    This task is scheduled for execution once inputPort has subDirectories.Length+1 values in it (which means file size and recursive sub directory size computation has completed).
void ParallelDirectorySizeRecursive(string directory,
                                    DispatcherQueue queue,
                                    Port<long /> outputPort)
{      
    var subDirectories = Directory.GetDirectories(directory);
    var inputPort = new Port<long />();

    Arbiter.Activate(queue,
          Arbiter.FromHandler(            
                  delegate()
                  {
                       inputPort.Post(TotalFileSize(directory));               
                  }),
          Arbiter.MultipleItemReceive(
                       false,
                       inputPort,
                       subDirectories.Length + 1,
                       delegate(long[] subDirSize)
                       {
                             outputPort.Post(subDirSize.Sum());
                       }));
            
    foreach (string subDir in subDirectories)
    {    
          ParallelDirectorySizeRecursive(subDir,queue, inputPort);
    }             
}

long TotalFileSize(string path)
{
      return Directory.GetFiles(path).Sum(f => new FileInfo(f).Length);
}

If you are new to anonymous methods (delegates), note that these tasks run on a separate thread, but they still have access to local variables (such as directory and subDirSize). What actually happens is that they receive a copy that can be used in their thread. This makes the code much shorter.

Error Handling

We would like the recursion to recover from UnauthorizedAccessException(s). The simple recursive implementation receives a Collection<Exception> which is used to collect caught exceptions. All recursive calls receive the same collection object:

long SerialRecursiveDirectorySizeErrorHandling
	(string path,Collection<Exception> errors)
{
   long fileSize = 0;
   var subDirs = new string[] {};
   try
   {
      fileSize = TotalFileSize(path);
      subDirs = Directory.GetDirectories(path);
   }
   catch (UnauthorizedAccessException ex)
   {
      errors.Add(ex);
   }

   return fileSize + subDirs.Sum
		(p => SerialRecursiveDirectorySizeErrorHandling(p,errors));
}

Similarly the parallel recursion receives a Port<exception> object (see outputPort.P1 below). The same exception port object is passed to all recursive calls.
One caveat here - the method has to post a zero result to the outputPort (see outputPort.P0 below) even when an error occurs. This is needed since the "Sum" task is not listening on the exception port, and it keeps waiting for exactly subDirectories.Length + 1 results.

void ParallelDirectorySizeWithErrorHandlerRecursive(
           string directory,
           DispatcherQueue queue,
           PortSet<long,Exception> outputPort)
{
   var subDirectories = new string[] {};
   try
   {
      subDirectories = Directory.GetDirectories(directory);
   }
   catch (UnauthorizedAccessException ex)
   {
      outputPort.P0.Post(0);
      outputPort.P1.Post(ex);                
      return;
   }
   var inputPort = new PortSet<long,Exception>(new Port<long>(), outputPort.P1);
   Arbiter.Activate(queue,
         Arbiter.FromHandler(
                   delegate()
                   {
                         long size = 0;
                          try
                          {
                             size = TotalFileSize(directory);
                          }
                          catch (UnauthorizedAccessException ex)
                          {
                             outputPort.P1.Post(ex);
                          }
                          finally
                          {
                             inputPort.P0.Post(size);
                          }
                    }),
         Arbiter.MultipleItemReceive(
                    false,
                    inputPort.P0,
                    subDirectories.Length + 1,
                    delegate(long[] subDirSize)
                    {
                          outputPort.P0.Post(subDirSize.Sum());
                    }));

   foreach (string subDir in subDirectories)
   {
      ParallelDirectorySizeWithErrorHandlerRecursive(subDir, queue, inputPort);
   }
}

History

  • 11th October, 2008: Initial version

License

This article, along with any associated source code and files, is licensed under The Microsoft Public License (Ms-PL)

About the Author

itaifrenkel
Software Developer
Israel Israel
No Biography provided

Comments and Discussions

 
Questionsystem cache? PinmemberUnruled Boy12-Oct-08 16:06 
AnswerRe: system cache? Pinmemberitaifrenkel12-Oct-08 18:29 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web04 | 2.8.140721.1 | Last Updated 11 Oct 2008
Article Copyright 2008 by itaifrenkel
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid