Click here to Skip to main content
Click here to Skip to main content
Add your own
alternative version

Parallel Recursive Methods using the Concurrency and Coordination Runtime

, 11 Oct 2008
A multi threaded directory size implementation using the CCR
DirectorySize_bin.zip
DirectorySize_bin
Release
Ccr.Core.dll
DirectorySize.exe
DirectorySize.pdb
DirectorySize_src.zip
DirectorySize_src
DirectorySize.suo
DirectorySize
DirectorySize.csproj.user
Properties
using System;
using System.IO;
using System.Linq;
using System.Threading;
using System.Collections.ObjectModel;
using System.Collections.Generic;
using Microsoft.Ccr.Core;

namespace filesize
{
	class Program
	{
			
		static void Main(string[] args)
		{   

            string path = (args.Length >= 1) ? args[0] : ".";            
            PrintDirectorySize(SerialRecursiveDirectorySize,path);            
            PrintDirectorySize(ParallelDirectorySize,path);
            PrintDirectorySizeWithErrorHandling(SerialRecursiveDirectorySizeErrorHandling,path);
            PrintDirectorySizeWithErrorHandling(ParallelDirectorySizeWithErrorHandler, path);
		}

        static void PrintDirectorySize(Func<string,long> getDirectorySize,string path)
        {
            try
            {
                Console.WriteLine("Running {0}", getDirectorySize.Method.Name);
                var sw = System.Diagnostics.Stopwatch.StartNew();
                long size = getDirectorySize(path);
                TimeSpan bench = sw.Elapsed;
                Console.WriteLine("Directory Size = {0:N0} Bytes. TimeSpan = {1:N2} seconds", size, bench.TotalSeconds);
            }
            catch (UnauthorizedAccessException ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
       
        static long SerialRecursiveDirectorySize(string path)
        {
            return TotalFileSize(path) +
                   Directory.GetDirectories(path).Sum((Func<string, long>)SerialRecursiveDirectorySize);			
		}
        
        static void PrintDirectorySizeWithErrorHandling(DirectorySizeWithErrorHandling getDirectorySize,string path)
        {
            Console.WriteLine("Running {0}", getDirectorySize.Method.Name);
            Collection<Exception> errors = new Collection<Exception>();
            var sw = System.Diagnostics.Stopwatch.StartNew();
            long size = getDirectorySize(path, errors);
            TimeSpan bench = sw.Elapsed;
            Console.WriteLine("Directory Size = {0:N0} Bytes. TimeSpan = {1:N2} seconds", size, bench.TotalSeconds);
            foreach (var error in errors)
            {
                Console.WriteLine(error.Message);
            }
        }

        
        delegate long DirectorySizeWithErrorHandling(string path, Collection<Exception> errors);
        
        
        static long SerialRecursiveDirectorySizeErrorHandling(string path,Collection<Exception> errors)
        {
            long fileSize = 0;
            var subDirs = new string[] {};
            try
            {
                fileSize = TotalFileSize(path);
                subDirs = Directory.GetDirectories(path);
            }
            catch (UnauthorizedAccessException ex)
            {
                errors.Add(ex);
            }

            return fileSize + subDirs.Sum(p => SerialRecursiveDirectorySizeErrorHandling(p, errors));
        }

        static long ParallelDirectorySize(string path)
        {
            using (Dispatcher dispatcher = new Dispatcher(0, "CCR Threads"))
            {
                // Create a DispatcherQueue that queues work item tasks to the Dispatcher
                DispatcherQueue queue = new DispatcherQueue("Pipeline DispatcherQueue", dispatcher);
                
                var outputSizePort = new Port<long>();
                ParallelDirectorySizeRecursive(path,queue,outputSizePort);

                return queue.WaitOnPort(outputSizePort);
            }
        }

        static void ParallelDirectorySizeRecursive(string directory,
                                                   DispatcherQueue queue,
                                                   Port<long> outputPort)
        {      
            var subDirectories = Directory.GetDirectories(directory);
            var inputPort = new Port<long>();

            Arbiter.Activate(queue,
                Arbiter.FromHandler(            
                            delegate()
                            {
                                inputPort.Post(TotalFileSize(directory));                                
                            }),
                Arbiter.MultipleItemReceive(
                            false,
                            inputPort,
                            subDirectories.Length + 1,
                            delegate(long[] subDirSize)
                            {
                                outputPort.Post(subDirSize.Sum());
                            }));
            
            foreach (string subDir in subDirectories)
            {    
                ParallelDirectorySizeRecursive(subDir,queue, inputPort);
            }             
        }

        static long TotalFileSize(string path)
        {
            return Directory.GetFiles(path).Sum(f => new FileInfo(f).Length);
        }

        static long ParallelDirectorySizeWithErrorHandler(string path, Collection<Exception> errors)
        {
            using (Dispatcher dispatcher = new Dispatcher(0, "CCR Threads"))
            {
                // Create a DispatcherQueue that queues work item tasks to the Dispatcher
                DispatcherQueue queue = new DispatcherQueue("Pipeline DispatcherQueue", dispatcher);

                var outputSizePort = new PortSet<long,Exception>();
                ParallelDirectorySizeWithErrorHandlerRecursive(path, queue, outputSizePort);

                return queue.WaitOnPortWithErrorHandler(outputSizePort, errors);
            }
        }

        static void ParallelDirectorySizeWithErrorHandlerRecursive(string directory,
                                                   DispatcherQueue queue,
                                                   PortSet<long, Exception> outputPort)
        {
            var subDirectories = new string[] {};
            try
            {
                subDirectories = Directory.GetDirectories(directory);
            }
            catch (UnauthorizedAccessException ex)
            {
                outputPort.P0.Post(0);
                outputPort.P1.Post(ex);                
                return;
            }
            var inputPort = new PortSet<long, Exception>(new Port<long>(), outputPort.P1);
            Arbiter.Activate(queue,
                Arbiter.FromHandler(
                            delegate()
                            {
                                long size = 0;
                                try
                                {
                                    size = TotalFileSize(directory);
                                }
                                catch (UnauthorizedAccessException ex)
                                {
                                    outputPort.P1.Post(ex);
                                }
                                finally
                                {
                                    inputPort.P0.Post(size);
                                }
                            }),
                Arbiter.MultipleItemReceive(
                            false,
                            inputPort.P0,
                            subDirectories.Length + 1,
                            delegate(long[] subDirSize)
                            {
                                outputPort.P0.Post(subDirSize.Sum());
                            }));

            foreach (string subDir in subDirectories)
            {
                ParallelDirectorySizeWithErrorHandlerRecursive(subDir, queue, inputPort);
            }
        }


	}

    public static class PortExtension
    {
        public static T WaitOnPort<T>(this DispatcherQueue queue , Port<T> port)
        {
            ManualResetEvent complete = new ManualResetEvent(false);
            T result = default(T);
            Arbiter.ExecuteToCompletion(queue, Arbiter.Receive<T>(false, port, v => { result = v; complete.Set(); }));
            complete.WaitOne();
            return result;
        }

        public static T WaitOnPortWithErrorHandler<T>(this DispatcherQueue queue, PortSet<T,Exception> port,Collection<Exception> errors)
        {            
            T result = WaitOnPort(queue, port.P0);            
            Exception error = null;
            do
            {
                port.P1.Test(out error);
                if (error != null)
                {
                    errors.Add(error);
                }
            } while (error != null);                       
            return result;
        }
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Microsoft Public License (Ms-PL)

About the Author

itaifrenkel
Software Developer
Israel Israel
No Biography provided

| Advertise | Privacy | Mobile
Web04 | 2.8.140721.1 | Last Updated 11 Oct 2008
Article Copyright 2008 by itaifrenkel
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid