|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Announcements
Chapters
Services
Feature Zones
|
IntroductionThis article shows the usage of the Concurrency and Coordination Runtime (CCR) to calculate directory size using an asynchronous recursive method. Below are the results of calculating the size of a network drive. The network drive I/O latency emphasizes the advantages of parallel execution:
RecursionCalculating directory size recursively is straightforward: static long SerialRecursiveDirectorySize(string path)
{
return Directory.GetFiles(path).Sum(f => new FileInfo(f).Length) +
Directory.GetDirectories(path).Sum(SerialRecursiveDirectorySize);
}
The LINQ query calculates the size of each file and the size of each sub directory (recursively) and then sums everything together. Parallel RecursionThe Concurrency and Coordination Runtime (CCR) allows the execution of methods in different threads by declaring the relationships between the methods. For more information and links, see the previous article Pipes and Filters concurrent design pattern using the Concurrency and Coordination Runtime. long ParallelDirectorySize(string path)
{
using (Dispatcher dispatcher = new Dispatcher(0, "CCR Threads"))
{
DispatcherQueue queue = new DispatcherQueue
("Pipeline DispatcherQueue", dispatcher);
var outputPort = new Port
The dispatcher is the object that holds a thread pool. The dispatcher queue holds the list of pending delegates that can be executed immediately and are waiting for a thread to become available. The The asynchronous recursive implementation enqueues the following tasks:
void ParallelDirectorySizeRecursive(string directory,
DispatcherQueue queue,
Port
If you are new to anonymous methods (delegates), note that these tasks run on a separate thread, but they still have access to local variables (such as Error HandlingWe would like the recursion to recover from long SerialRecursiveDirectorySizeErrorHandling
(string path,Collection<Exception> errors)
{
long fileSize = 0;
var subDirs = new string[] {};
try
{
fileSize = TotalFileSize(path);
subDirs = Directory.GetDirectories(path);
}
catch (UnauthorizedAccessException ex)
{
errors.Add(ex);
}
return fileSize + subDirs.Sum
(p => SerialRecursiveDirectorySizeErrorHandling(p,errors));
}
Similarly the parallel recursion receives a void ParallelDirectorySizeWithErrorHandlerRecursive(
string directory,
DispatcherQueue queue,
PortSet<long,Exception> outputPort)
{
var subDirectories = new string[] {};
try
{
subDirectories = Directory.GetDirectories(directory);
}
catch (UnauthorizedAccessException ex)
{
outputPort.P0.Post(0);
outputPort.P1.Post(ex);
return;
}
var inputPort = new PortSet<long,Exception>(new Port<long>(), outputPort.P1);
Arbiter.Activate(queue,
Arbiter.FromHandler(
delegate()
{
long size = 0;
try
{
size = TotalFileSize(directory);
}
catch (UnauthorizedAccessException ex)
{
outputPort.P1.Post(ex);
}
finally
{
inputPort.P0.Post(size);
}
}),
Arbiter.MultipleItemReceive(
false,
inputPort.P0,
subDirectories.Length + 1,
delegate(long[] subDirSize)
{
outputPort.P0.Post(subDirSize.Sum());
}));
foreach (string subDir in subDirectories)
{
ParallelDirectorySizeWithErrorHandlerRecursive(subDir, queue, inputPort);
}
}
History
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||