Click here to Skip to main content
15,996,448 members
Articles / High Performance Computing / Parallel Processing

Implementing Parallel Scalable Distribution Counting Algorithm (DCA) with CUDA 8.0 Runtime API

Rate me:
Please Sign up or sign in to vote.
4.97/5 (24 votes)
9 Dec 2016CPOL24 min read 13.6K   350   10  
In this article, we'll demonstrate an approach the allows to increase the performance (up to 600%) of the code that implements the conventional distribution counting algorithm (DCA) using NVIDIA CUDA 8.0 Runtime API

“CUDA® is a parallel computing platform and programming model invented by NVIDIA. It enables dramatic increases in computing performance by harnessing the power of the graphics processing unit (GPU).”Dr. John Owens, UC Davis and Dr. David Luebke, NVIDIA Corp.

Introduction

In this article, we’ll demonstrate how to use NVIDIA CUDA® multithreaded hardware platform and programming model to develop an efficient parallel code that implements the famous distribution counting algorithm especially designed for solving the complex statistical problem of finding the overall amounts of duplicates for each item in the given dataset of a typically huge size, containing up to 10^6 - 10^9 data items. Since it was initially proposed, the following algorithm remains the only approach for solving a wide-range of computational problems such as collecting statistics of surveys, performing the computation of statistical median for a series of items in an unordered dataset, the value of which in turn is frequently used to reduce the level of noise during the image processing of monochrome raster images as well as in the cluster analysis to increase the distance between clusters produced by k-means clustering algorithm, that itself, provides a better quality of clustering, etc.

In fact, the conventional distribution counting algorithm is mainly based on performing a linear search to find all duplicates for a specific item in the given dataset. In this case, the linear search is performed for each of N data items, which significantly increase the complexity of the computations being performed. Under the normal condition, complexity of the distribution counting algorithm could be estimated as O(Nx(N+k)) >=O(N^2) of trivial comparisons, where N - the amount of data items in the given dataset, k - the number of items in the resulting dataset that contains the occurrence frequencies for each data item from the given dataset. Normally, we consider the value of k as an overhead value for this particular algorithm. Obviously, that, the following value represents the potential size of a resulting frequency table, which, in turn, largely depends on the value of smallest and largest item in the given dataset.

By analyzing the problem being discussed, we might think that we normally can avoid the distribution counting algorithm implementation by using one of the existing sorting algorithm to perform the similar task described above. If we delve into this problem more deeply, we’ll notice that none of the existing sorting algorithms can be used to perform a distribution counting except for the counting sort algorithm that was deemed as an impractical for sorting from one respect, but, from another one, can be efficiently used to solve the problem being discussed.

As we’ve already discussed above, one of the most obvious disadvantages of the conventional distribution counting algorithm is performance of finding the data items duplicates itself, in case, when we use the following algorithm to process the mentioned above typically huge datasets. From the table 1 shown below, we can see how the linear search performed for each particular data item, along with the overhead processing, is capable to affect the overall performance of code that sequentially performs the distribution counting in the datasets with size varying from 10^6 to 10^9 data items:

Table 1: The computational complexity of the sequential distribution counting algorithm

Dataset

N - items

Frequency Table

k-items

Computational Complexity p =O(Nx(N+k))

Sequential Code

Execution Time

(Best-Case Performance)

1000000

10^6

2 x 10^12

0h:12m:38s

10000000

10^7

2 x 10^14

2h:11m:12s

100000000

10^8

2 x 10^16

22h:05m:12s

1000000000

10^9

2 x 10^18

232h:34m:23s

The empirical statistics that indicates the regular performance of the distribution counting algorithm was estimated by running the specific sequential code on a single computational unit based on the modern CPU Intel Core i7-4790 4.0 GHZ with 32GB of RAM.

By processing the typically huge datasets, the following algorithm is capable to produce the resulting dataset of data items occurrence frequencies for not less than 728 seconds ~ 12m:38s while processing datasets which size is approximately equal or greater than 10^6 data items. Also, in the case when the overall size of the given dataset is increasing, suppose, it becomes 10 times larger, then the performance of that sequentially executed code significantly degrades and has a very long duration compared to other complicated data processing tasks such as games, deep learning AI neural networks, HDD tools and utilities processing the large amounts of clusters in the modern hard drives, and other complex data processing tasks. For example, if you run the following sequential code to find the duplicates of 10^8 - 10^9 data items, it’s capable to accomplish the following processing task for an almost idle time.

This, in turn, actually, means that the problem of processing of the huge datasets, containing typically 10^8-10^9 data items, being discussed, can never be solved without using specific multithreaded libraries and framework that allow to harness the power of parallel computing such as hardware-level parallelism supported by the most of the existing modern multi-core CPUs, the separate hardware platforms and programming models such as NVIDIA CUDA® that allows to use the resources of the most CUDA-enabled GPUs to solve the variety of complex computational problems particularly the problem of increasing the performance of the sequential code intended to process the huge typically sets of data.

Since NVIDIA Corporation first invented and introduced CUDA® technology, - parallel computing GPU hardware platform and programming model implemented as a multithreaded parallelization framework libraries and tools for many programming languages such as C/C++, Fortran, Java, Python, etc. Unlike the other multithreading frameworks, NVIDIA CUDA® technology provides capabilities of performing complex computations unveiling the power of the most of CUDA-enabled GPUs. It serves as an alternative to the conventional CPU-based computing, turning regular average PC into a powerful supercomputer that involves the almost limitless resources of NVIDIA graphics card’s GPU and memory to perform complex computations. The using of NVIDIA CUDA® provides significant performance acceleration gain for those applications that create a huge workload on the CPU and system memory when processing of enormously huge amounts of data.

In the following article, we’ll discuss about the NVIDIA CUDA® scalable programming model architecture as an efficient platform for performing parallel multithreaded computations, and, at the same time, provide a detailed explanation of how to transform sequentially executed code that implements NP-hard poorly scalable conventional distribution counting algorithm to be run in parallel performed by the warps of divergent hardware-level threads executed by an array of GPU’s streaming multiprocessors (SM).

The following, normally, can be accomplished by transforming the sequential loops of the computational process being discussed into CUDA kernels executed by a large number of threads in parallel. CUDA kernels are typically launched from the host code while the fragment of code of each kernel is executed on the device. At the very beginning of the following article we’ll talk about the either host or device code separate execution topic, which is one of the most fundamental aspects of CUDA programming model.

In this article, we’ll also take a closer look at how to properly divide the entire process of finding data items duplicates into the number of independent sub-processes capable to be perform the following task in parallel. To do this, we’ll use the various approaches such as either the concepts of CUDA’s bulk or dynamic parallelism to efficiently parallelize the data-process formulated by distribution counting algorithm leveraging CUDA® multi-core processors hardware platform.

In particular, we’ll discuss the different scenarios of dynamic parallelism such as either linear or recursive parallel threads execution as well as how to efficiently workshare the entire process of finding data items duplicates performed by one or more nested sequential loops across multiple parallel GPU hardware threads, which, in turn, allows to significantly reduce the computational complexity of the algorithm designed to solve the problem being discussed.

During the discussion, we’ll also make a large emphasize on such important code modernization aspects as barrier synchronization of divergent threads blocks that access the data stored into GPU device shared memory as well as specific reduction mechanisms that allow to survive the number of serious well-known issues and side-effects such as data flow dependency and race condition that in many cases obstacle the process of code modernization making it impossible to parallelize many existing algorithms.

Furthermore, during the code modernization, we’ll learn how to use atomic data operations such as atomic addition, increment and compare-and-swap (CAS) to overcome the occurrence of race condition issues in the code being parallelized. Also, we’ll explain how to implement pinned memory, the using of which allows to effectively avoid the race condition occurrence, in cases, when the following issue cannot be survived by using the either barrier synchronization or atomic operations.

Since we’ve found out how transform the sequential loops of the code that implements distribution counting algorithm into kernels executed by the blocks consisting the large number of threads, we’ll learn how to optimize the performance of those kernels launched asynchronously on the host by using CUDA streaming mechanism. Particularly, we’ll discuss how to launch the kernels in either host or device code each one to be executed by a certain streaming multiprocessor in its own stream making the sequential kernel calls synchronous. At the same time, we’ll provide a detailed description of such important aspects as either implicit or explicit streams synchronization and explain the difference between implicit synchronization of a legacy default stream and explicit synchronization of those streams created by using specific CUDA Runtime API calls. During the discussion, we’ll talk about specific event-based mechanism which is the only approach for synchronizing streams.

Also, along with the discussion of mentioned above parallelization techniques, in this article, we’ll provide an overview such important aspects of using CUDA programming model as either host or device memory allocation and management such as specific data transfers between host and device memory during the parallel code execution by using CUDA Runtime API routines.

Besides the CUDA streaming and memory management topics, in this article, we’ll learn how to estimate the code executing time by using events management functions of CUDA Runtime API.

Background

Before we begin, in this section, we’ll explain the conventional distribution counting algorithm used to find the duplicates for each data item in the given dataset and store the statistical data on each item obtained into the array of frequencies. In this section, we’ll thoroughly discuss the operations performed by following algorithm as well as provide detailed recommendations on how to optimize the performance by finding the optimization hotspots in the sequential code that implements distribution counting algorithm.

Distribution Counting Algorithm Overview

Distribution Counting Algorithm (DCA) – one of the most trivial algorithms for solving the statistical problem of finding the duplicates of each data item in the given dataset. As we’ve already discussed, the following algorithm is commonly used for many applications as computing the value of statistical median, gathering the statistics on surveys, as a part of various partitioning and AI algorithms such as k-means clustering algorithm designed to solve the classification problem, etc. In details, the distribution counting algorithm was discussed by Robert Sedgewick in his book “Fundamental Algorithms in C Part 1”.

The following algorithm can be formulated as follows: suppose we’re having a certain unordered dataset data containing N - randomly generated data items of particular data type. To find the overall number of duplicates for each data item in the given dataset we normally need to increment a loop counter variable index from 0 to N and for each data item data[index] perform the linear search in the subset of succeeding data items to find the number of those items having the same value as the current item data[index]. For that purpose, we need to declare the variable count = 1, in which we will accumulate the value of the number of occurrences of each item data[index] as the result of performing the linear search discussed below. To perform the linear search, we need to use another loop counter variable nindex by incrementing its value from index + 1 to N and for each item data[nindex] to perform a comparison of current item’s value with the value of data[index] item. If both values are equal, then we increment the value of the variable count by 1, otherwise proceed the comparison with the next item data[nindex]. Finally, at the end of linear search performed the variable count will hold the actual number of occurrences of item data[index] in the array data. The value of count variable is the total occurrence frequency value for the current item data[index].

For instance, we have an array of values data[N] = { 3, 1, 6, 2, 1, 3, 6 } containing  N - data items of integer type and we need to find the occurrence frequency for the first data item with index = 0, which value is equal to 3. At this point, the variable count is assigned to the value of 1 and we’re normally starting to compare each item data[nindex] with indexes nindex from 1 to N with the value of the following item data[index]=data[0]=3. During the comparison, we determine that the item with index nindex = 5  is equal to the current data item with index index = 0. In this case, we increment the value of count variable so that its value becomes equal to count = count + 1 = 2. We’re iterating to the end of array and reveal that there’s no more items which value is equal to the current item’s value data[index] = 3. Since that the value of the variable count = 2 is the value of the occurrence frequency of the first data item data[0] = 3 in the given dataset. Similarly, by iterating through the array of items we proceed with the next items in the array data.

Since we’ve obtained the occurrence frequency value of the current item data[index] in the array, we need to append the following value to the resulting set of frequencies. Note that, each entry to the resulting frequency table is a pair of two values which is either the value of a data item data[index] and the total number of occurrences count. The resulting set of data items frequencies is represented as an array of objects of structure FREQ_T defined in the code snippet below.

Before appending the current entry to the associative array used to store each data item’s frequency value, we need to perform a check if the number of occurrences for the current item data[index] has not previously been computed. To do this we’ll need to declare a boolean variable exists and initialize it with value false. After that, we should iterate loop counter variable item from ft_item - 1 downto 0 until the value of exists variable is not equal to true, or the end of resulting array of frequencies has been reached. By iterating the resulting array of frequencies, for each node of freq_t[item] we get the value of freq_t[item].n_val variable and compare it with the value of the current item data[index]. If the value of freq_t[item].n_val variable for the current node item is equal to the value of data[index], we set the value of exists = true, which means that the number of occurrences for the current item data[index] has been already computed and it’s not necessary to append the frequency data on the current item data[index] to the resulting set. Otherwise, if none of the nodes has the value of freq_t[item].n_val variable equal to the current item’s value data[index], then we assign the variables of freq_t[item].n_val and freq_t[ft_item].n_freq of the node with index ft_item to the values of data[index] and count respectively. Finally, we increment the value of variable ft_item by 1 and proceed with the next item in the array data.

The fragment of code below illustrates the very basic implementation of the distribution counting algorithm:

C++
typedef struct tagFreqT

{
	__int64 n_val;  
        __int64 n_freq;
} FREQ_T;


void dist_count(const __int64* data, FREQ_T* freq_t, const size_t N)
{
    __int64 ft_item = 0;
    // Iterating through the array of data items data[]
    for (__int64 index = 0; index < N; index++) // { 1 }  <--- Performance optimization hotspot
    {
        __int64 count = 1;
        // Performing the linear search to find the actual amount of
        // duplicates for the current item data[i]
        // { 2 } <--- Performance optimization hotspot
        for (__int64 nindex = index + 1; nindex < N; nindex++) 
             if (data[nindex] == data[index]) count++;

        bool exists = false;
        // Performing the linear search to determine if the frequency value for the
        // current item data[i] has been already stored into the frequency table.
        for (__int64 item = ft_item - 1; item >= 0 && !exists; item--) // { 3 }
             if (freq_t[item].n_val == data[index]) exists = true;     

        // If not, append the frequency value of the current item data[i] to the frequency table
        if (exists == false)

        {
            freq_t[ft_item].n_val = data[index];
            freq_t[ft_item++].n_freq = count;
        }
    }
}

The Algorithm’s Multithreaded Parallel Performance Optimization

As you might notice from the code snippet above, according to the algorithm being discussed, the entire process of finding the number of duplicates for each data item mostly relies upon performing the two nested loops. The first outermost loop { 1 } is normally performed to iterate through the array of data items. Another, nested loop basically performs the linear search to find all duplicates of each current item in the subset of succeeding data items and accumulates the number of occurrences obtained into the variable count.

Now, let’s take a closer look to the performance of the nested loop { 2 }. The following loop while being executed performs exactly p = O(N) - iterations to perform the linear search by traversing the dataset to find the number of duplicates for each current item data[index]. Also, at the end of performing the linear search to obtain the number of data item’s occurrences, we normally executing another loop to iterate through the array freq_t to determine if there were no entries for the current data and we’ve not yet previously performed the distribution counting for the current item data[index]. This, in turn, causes a sufficient overhead to the process of finding data items duplicates, since the following loop performs exactly Image 1- iterations to check if the current item hasn’t already been indexed into the frequency table freq_t. That’s actually why, the total complexity for this fragment of code implementing the linear search can be estimated as  p = O(N+k) iterations.

According to the distribution counting algorithm, the following linear search is repeatedly performed for each of N - items retrieved from the given dataset during each iteration of outermost sequential loop. Thus, the value of complexity of the linear search performed for each item data[index] in dataset is multiplied by the value  N - the number of iterations of the following outermost loop and can be estimated as  which normally exceeds the square computational complexity O(N^2) of the most sorting and other algorithms performing the complex computations.

Obviously, that, the nested loop that performs the linear search in this case turns to be the parallel multithreaded optimization hotspot. According to the best practices, the performance acceleration gain of the code that implements the distribution counting algorithm can be much higher if we perform the iterations of the nested loop in parallel executed by multiple threads, so that, each iteration is being performed in its own thread. This, in turn, will ensure that the code being discussed is performed much more faster compared to the case when this code is executed sequentially. The nested loop parallel execution provides almost up to 65% of the performance acceleration gain of the entire algorithm.

The outermost loop is another performance optimization hotspot we’re going to discuss. Unlike the other algorithms, especially those ones that perform sorting, the process of finding duplicates for each item in the given dataset, formulated by the distribution counting algorithm, has merely no data flow dependency issues. This actually means that the code that performs the linear search during each iteration of the outermost loop doesn’t normally access the data processed as the result of performing the previous and next iterations. In this particular case, the following code is able to perform the linear search to find the number of duplicates for each item in the given dataset independently. The actual count of duplicates value computed by performing the linear search during each iteration of the outermost loop is not depending on the count values obtained for the other similar items in the given dataset. This, in turn, benefits in the proper and efficient parallelization of the entire algorithm being discussed.

However, there’s another nested loop { 3 } in this code that performs the linear search to determine if the actual number of duplicates value for the current item data[index] has already been indexed into the array representing items frequency table. Unfortunately, the following nested loop cannot be perfectly parallelized due to the data flow dependency that typically might be incurred during the following loop execution. In this case, to ensure that the following code is properly executed, we need to implement the specific thread synchronization mechanism such as critical sections locks to avoid the data flow dependency issue that might persist.

The Code Optimized Using Intel OpenMP Performance Library

Here’s another fragment of code implementing the parallel scalable distribution counting algorithm using Intel OpenMP library. During the parallel multithreaded optimization of the code listed above we’ve exactly checked with the optimization hints discussed in the previous paragraphs of the following section:

C++
void omp_dist_count(const __int64* data, FREQ_T* freq_t, const size_t N, const size_t FT_SIZE)

{
        __int64 ft_item = 0; omp_lock_t lock;
        int max_threads = omp_get_max_threads();

        omp_init_lock(&lock);

        // Worksharing the iterations of the outermost loop accross multiple
        // sections executed in parallel
        #pragma omp parallel default(none) \ // { 1 }
                shared(data, freq_t, max_threads, ft_item, lock) num_threads(max_threads)
        {
                // Obtaing the thread-id value of the current thread being executed
                int thread_id = omp_get_thread_num();

                // Computing the start and end position of the current chunk of the array data
                __int64 start = thread_id * (N / max_threads);
                __int64 end = (thread_id + 1) * (N / max_threads);

                // Performing iteration through the data items within the current array chunk
                for (int index = start; index < end; index++)
                {
                        __int64 count = 0;

                        // Performing the tight loop parallelization of the nested loop that performs
                        // the linear search to find the duplicates of the current data data[index]
                        // Reducing of the incremented value of the current item's duplicates count to
                        // avoid the race condition issue occurrence.
                        #pragma omp parallel for default(none) \ // { 2 }
                                reduction(+:count) shared(data, index)

                        for (__int64 nindex = 0; nindex < N; nindex++)
                                if (data[nindex] == data[index]) count++;

                        // Explicitly synchronizing the fragment of code that performs the check to
                        // avoid duplicates of those items for which the actual counting has already
                        // been performed and the number of duplicates have been indexed into the
                        // frequency table

                        omp_set_lock(&lock);

                        bool exists = false;
                        for (__int64 item = FT_SIZE - 1; item >= 0 && !exists; item--)
                                if (freq_t[item].n_val == data[index]) exists = true;

                        if (exists == false)
                        {
                                // Appending the value of the actual count of duplicates
                                // for the current item data[i] to the frequency table

                                freq_t[ft_item].n_freq = count;
                                freq_t[ft_item++].n_val = data[index];
                        }

                        omp_unset_lock(&lock);
                }
        }

        omp_destroy_lock(&lock);
}

As we can see from the code fragment listed above, the very first optimization applied to the algorithm’s sequential code is that we’ve performed a work-sharing of the outermost loop { 1 } used to iterate through the array of data items by splitting up the overall number of iterations to be executed by multiple CPU hardware-level threads, each one is executing its own copy of the parallel region being defined. This, in turn, allows to significantly relax CPU workload by scaling the entire scope of operations being performed across two or more CPU cores.

After we’ve transformed the outermost loop by using parallel workshare construct, we have to provide tight loop parallelization to the nested loop { 2 }. To do that, in this case, we’re using #pragma omp parallel for directive construct to execute each iteration of the nested loop in parallel, which provides the actual speed-up during the parallel code execution.

Note that, in this case, we’re implementing tight-loop parallelization inside the parallel region executed by multiple threads. This normally is called an OpenMP’s nested parallelism. The main goal of using nested parallelism strategy is to provide an even better performance acceleration gain to the code being optimized.

Another optimization aspect that we’ll discuss about is the reduction mechanism used to avoid race condition issues during the nested loop { 2 } parallel  execution. As we can see from the code snippet listed above, during each iteration of the nested loop { 2 } executed by its own thread, we’re performing a check if the value of data[nindex] item is equal to the value of the current item data[index] for which we’re estimating the number of duplicates.  If so, we’re performing increment of the shared variable count. In this case, to obtain the correct value of the following variable, we’ll perform an explicit reduction of the increment operator by using reduction(+:count) clause.

However, some fragments of code listed above including the nested loop { 3 } still remains not parallelized since the attempts to run the following fragment in parallel normally lead to the data flow dependency issue occurrence. Since the following fragment of code is intended to be run by each thread sequentially, at this point, we’re using an explicit synchronization mechanism such as critical sections locks by invoking OpenMP API routines omp_set_lock(&lock), omp_set_unlock(&lock), that ensure that the following fragment of code is executed sequentially by each thread.

The main disadvantage of the OpenMP parallel code listed above is that he occurrence of data flow dependency issue during the nested loop { 3 } execution normally causes the following code might not to entirely scale across all available CPU cores. For that purpose, we used Intel Amplifier XE to measure and assess the overall performance of the code being parallelized by using OpenMP library. Fig.1. illustrates how the parallel code that performs distribution counting is scaled across multiple CPU cores:

Image 2

Figure 1: Parallel Code Performance Assessment Using Intel Amplifier XE

As we can see from the fig.1., almost a half of threads are poorly utilized due to the growing value of the performance potential gain and CPU load imbalance which is the result of insufficient parallelization of the fragment of code implementing second nested loop discussed above. Also, using the critical sections locks normally causes sufficient overhead during parallel code execution. From the top hotspots profiling section above we can see that omp_set_lock(&lock), omp_set_unlock(&lock) functions calls has the longest duration compared to specific fork-join calls and implicit barrier synchronization.

In conclusion, as we can see from the either sequential or parallel code fragments, we cannot normally achieve an ideal parallelization of the following code by using Intel OpenMP performance library.  One of the reasons why this code cannot be perfectly parallelized is that there’s a lack of implicit synchronization provided by the multithreaded primitives of the OpenMP library itself while executing the following code by scaling it leveraging multiple CPU’s cores.

In the next section of this article we’ll discuss about how to use NVIDIA CUDA®, - parallel multithreading platform that allows to provide more efficient parallelization to the code that implements distribution counting algorithm.

Using the code

The cpdCountChunkKernel kernel while being executed by the multiple divergent threads, retrieves each data item from the array ppdInputVec and performs the linear search to find the actual number of duplicates for each current data item ppdInputVec[threadIdx.x] by dynamically launching another kernel cdpCountIfKernel that performing a trivial count-if operation discussed below:

C++
    template<typename cnttype="__uint64" inptype="__int8," typename="">
__global__ void cdpCountChunkKernel(InpType* ppdInputVec, FreqT* ppdCountVec, CntType nSize)
{
	// Declare a locally shared array of stream events 
	// pStreamEvents that are used to synchronize each stream
	__shared__ cudaEvent_t pStreamEvents[CHUNK_SIZE_GRID_0];
	// Declare a locally shared array of events used to 
	// synchronize each stream in which we'll launch cdpCountIfKernel kernel
	__shared__ cudaStream_t pStreamHandles[CHUNK_SIZE_GRID_0];

	// Declare a locally shared pinned array ppdSHCountVec used to store the number 
	// of duplicates value for each data item retrieved during the cdpCountChunkKernel execution
	__shared__ FreqT ppdSHCountVec[CHUNK_SIZE_GRID_0];

	// Perform a check if the value of threadIdx.x variable 
	// is less than the value of the ppdInputVec array size
	if (threadIdx.x < nSize)
	{
		// Initializing the new non-blocking stream in which we'll execute cdpCountIfKernel
		cudaErrorCheck(cudaStreamCreateWithFlags(&pStreamHandles[threadIdx.x], cudaStreamNonBlocking));
		// Initializing the new event used to synchronize the new non-blocking stream execution
		cudaErrorCheck(cudaEventCreateWithFlags(&pStreamEvents[threadIdx.x], cudaEventDisableTiming));

		// Synchronizing all threads of the entire GPU device
		cudaErrorCheck(cudaDeviceSynchronize());
		// If this kernel is executed by the first thread with thread-id equal to 0, 
		// initiallizing the locally shared array ppdSHCountVec used to store the actual 
		// number of duplicates for each data item from the array ppdInputVec
		if (threadIdx.x == 0)
		{
			for (CntType iIndex = 0; iIndex < CHUNK_SIZE_GRID_0; iIndex++)
				ppdSHCountVec[iIndex].value = ppdSHCountVec[iIndex].freq = 0;
		}

		CntType* ppdCountValue = NULL;
		// Retrieving the current data item with index threadIdx.x from the array 
		// ppdInputVec and assigning its value to the thread-private variable nValue
		InpType nValue = ppdInputVec[threadIdx.x];
		// Allocating GPU device memory to store the actual value of the number of the
		// current item's duplicates and assigning its address to the ppdCountValue pointer variable
		cudaErrorCheck(cudaMalloc((void**)&ppdCountValue, sizeof(CntType)));

		// Launching cdpCountIfKernel to obtain the actual number of duplicates 
		// ppdCountValue for the current data item that has been previously retrieved 
		// from the array ppdInputVec and assigned to nValue variable
		cdpCountIfKernel<inptype, cnttype=""> << < 1, \
			blockDim.x, 0, pStreamHandles[threadIdx.x] >> > (ppdInputVec, ppdCountValue, nValue, nSize);

		// Asynchronously record the stream event prior to synchronizing all threads across GPU device
		cudaErrorCheck(cudaEventRecord(pStreamEvents[threadIdx.x], pStreamHandles[threadIdx.x]));
		// Synchronizing all threads of the entire GPU device
		cudaErrorCheck(cudaDeviceSynchronize());
		// Waiting until the cudaDeviceSynchronize API call will finish its execution
		cudaErrorCheck(cudaStreamWaitEvent(pStreamHandles[threadIdx.x], pStreamEvents[threadIdx.x], 0));

		__syncthreads(); // Perfoming synchronization of all threads in a block
		// Performing a check if the value of the number of duplicates is greater 
		// than zero and not exceeding the value of ppdInputVec array size
		if (*ppdCountValue > 0 && *ppdCountValue < nSize)
		{
			__syncthreads(); // Perfoming synchronization of all threads in a block
			// Assigning the value of the current data item retrieved to the ppdSHCountVec[threadIdx.x].value 
			// variable of the current node with index threadIdx.x in the array ppdSHCountVec
			ppdSHCountVec[threadIdx.x].value = nValue;
			// Assigning the number of duplicates to the ppdSHCountVec[threadIdx.x].freq
			// variable of the current node with index threadIdx.x in the array ppdSHCountVec
			ppdSHCountVec[threadIdx.x].freq = *ppdCountValue;
			// Synchronizing all threads of the entire GPU device
			cudaErrorCheck(cudaDeviceSynchronize());

			bool bExists = false;
			// Iterating through the shared array ppdSHCountVec used to store 
			// the values of the actual number of data items duplicates 
			for (int iIndex = 0; iIndex < threadIdx.x && !bExists; iIndex++)
				// For each node with index iIndex in the array we're performing a check
				// if the value of the variable ppdSHCountVec[iIndex].value is equal to
				// the value nValue variable. If so, the actual number of duplicates for
				// the current item was already obtained.
				if (ppdSHCountVec[iIndex].value == nValue) bExists = true;

			__syncthreads(); // Perfoming synchronization of all threads in a block
			// If the value of bExists variable is equal to false 
			// (the number of duplicates value for the current item has not been already obtained), 
			// copy the current node with index threadIdx.x to the array ppdCountVec declared in
			// global GPU context.
			if (bExists == false)
			{
				__syncthreads(); // Perfoming synchronization of all threads in a block
				// Copy the current node with index threadIdx.x to the array 
				// ppdCountVec declared in global GPU context.
				ppdCountVec[threadIdx.x] = ppdSHCountVec[threadIdx.x];
			}
		}

		// Deallocating the buffer which address is assigned to the ppdCountValue variable
		cudaErrorCheck(cudaFree(ppdCountValue));
		// Destorying a stream in which the cdpCountIfKernel is executed
		cudaErrorCheck(cudaStreamDestroy(pStreamHandles[threadIdx.x]))
	}
}
</inptype,></typename>

The following fragment of code implements cdpCountIfKernel that performs the linear search to find the number of duplicates for the current data item in the array ppdInpVec, which value is assigned to nValue variable. It performs the atomic increment operation on locally shared variable nSHLocalCount used to accumulate the number of duplicates value across all threads, each one executing its own instance of cdpCountIfKernel. At the end of each thread execution the value of nSHLocalCount variable is copied to the global context memory which address is assigned to the ppdCountVec global variable.

C++
   // cdpCountIfKernel is a kernel function performing parallel count-if linear search
// to find the actual number of duplicates in the array ppdInputVec of size nSize for 
// the value of nValue variable passed as an argument to the following kernel function.
// The value of the total amount of duplicates for the value of nValue is returned as the 
// value of the pointer ppdCountVec variable passed as an argument to the following kernel
template<typename cnttype="__uint64" inptype="__int8," typename="">
__global__ void cdpCountIfKernel(const InpType* ppdInputVec, \
	CntType* ppdCountVec, const InpType nValue, CntType nSize)
{
	// Declare a locally shared variable nSHLocalCount used to accumulate the total 
	// amount of occurrences for nValue in the array ppdInputVec The following variable 
	// is shared across all threads executing cdpCountIfKernel. Since this variable
	// value has been modified during the kernel execution, we'll use the atomicAdd(...)
	// routine to perform the atomic reduction while incrementing the value of nSHLocalCount by 1
	__shared__ CntType nSHLocalCount;

	// Perform a check if the value of threadIdx.x variable 
	// is less than the value of the ppdInputVec array size
	if (threadIdx.x < nSize)
	{
		__syncthreads(); // Synchronizing all threads across a block
		// If this kernel is executed by the first thread with thread-id 
		// equal to 0 assigning the initial value of variable nSHLocalCount = 0
		if (threadIdx.x == 0) nSHLocalCount = 0;
		// Performing a check if the current item ppdInputVec[threadIdx.x] in the
		// array ppdInputVec is equal to the value of nValue variable.
		if (ppdInputVec[threadIdx.x] == nValue)
		{
			__syncthreads(); // If so, perfoming synchronization of all threads in a block
			// Incrementing the value of nSHLocalCount variable value by 1 
			// using atomic reduction function atomicAdd(...)
			atomicAdd(&nSHLocalCount, 1);
			// Synchronizing all threads of the entire GPU device
			cudaErrorCheck(cudaDeviceSynchronize());
		}

		// If the value nSHLocalCount shared variable is not equal to 0,
		// duplicating its value by assigning it to the GPU device memory
		// accessed by the ppdCountVec pointer allocated in GPU device memory
		if (nSHLocalCount > 0)
		{
			__syncthreads(); // Perfoming synchronization of all threads in a block
		    // Synchronizing all threads of the entire GPU device
			cudaErrorCheck(cudaDeviceSynchronize());
			// Assigning the value of nSHLocalCount variable to the device memory
			// accessed by the global context pointer variable ppdCountVec
			*ppdCountVec = nSHLocalCount;
		}
	}
} 
</typename>

The following fragment of code implements the cdpCountChunkRecursiveKernel kernel that performs the parallel workshare of the distribution couting process between multiple divergent threads each one computing the number of duplicates for a set of items within its own chunk. Each thread dynamically launches the cdpCountChunkKernel to obtain the values of the number of duplicates for each particular item within the current chunk which boundaries are computed at the top of the cdpCountChunkRecursiveKernel execution. After each thread computes the values of the number of duplicates for each item within the current chunk, the following kernel performs the reduction by merging each array of the number of duplicates for each current chunk into a global array ppdCountVec:

C++
    template<typename cnttype="__uint64" inptype="__int8," typename="">
__global__ void cdpCountChunkRecursiveKernel(InpType* ppdInputVec, FreqT* ppdCountVec, \
	CntType nChunkID, CntType nDepth, CntType nItemsCount, CntType nChunkSize, CntType nSize)
{
	// Declare a locally shared array of stream events 
	// pStreamEvents that are used to synchronize each stream
	__shared__ cudaEvent_t pStreamEvents[CUDA_MAX_DEPTH];
	// Declare a locally shared array of events used to 
	// synchronize each stream in which we'll launch cdpCountIfKernel kernel
	__shared__ cudaStream_t pStreamHandles[CUDA_MAX_DEPTH];

	// Performing a check if the actual recursion depth 
	// is not exceeding the specified threshold CUDA_MAX_DEPTH
	if (nDepth < CUDA_MAX_DEPTH)
	{
		// Computing the begining and ending position of the current chunk in the array ppdInputVec
		CntType nBegin = nChunkID * nChunkSize;
		CntType nEnd = (nChunkID + 1) * nChunkSize;

		// Initializing the new non-blocking stream in which we'll execute cdpCountIfKernel
		cudaErrorCheck(cudaStreamCreateWithFlags(&pStreamHandles[nDepth], cudaStreamNonBlocking));
		// Initializing the new event used to synchronize the new non-blocking stream execution
		cudaErrorCheck(cudaEventCreateWithFlags(&pStreamEvents[nDepth], cudaEventDisableTiming));

		// Performing a check if the ending position of the current
		// chunk doesn't exceed the actual size of the ppdInputVec array
		if (nEnd <= nSize)
		{
			// Declaring a pointer variable that is assigned to the 
			// address of buffer used to store the current chunk in GPU memory
			InpType* ppdChunkPtr = NULL;
			// Allocating buffer ppdChunkPtr used to store the current chunk of the array ppdInputVec into GPU memory
			cudaErrorCheck(cudaMalloc((void**)&ppdChunkPtr, sizeof(InpType) * nChunkSize));
			// Performing the tranfer of the current chunk from ppdInputVec array to the temporary buffer ppdChunkPtr.
			cudaErrorCheck(cudaMemcpyAsync(ppdChunkPtr, &ppdInputVec[nBegin], \
				sizeof(InpType) * nChunkSize, cudaMemcpyDeviceToDevice, pStreamHandles[nDepth]));

			// Declaring the pointer variable that is assigned to the address of a temporary
			// buffer used to store the array of values, each one is the number of 
			// duplicates for each data item within the current chunk
			FreqT* ppdLocalCountVec = NULL;
			// Allocating buffer ppdLocalCountVec used to store the array of values, 
			// each one is the number of duplicates for each data item within the current chunk
			cudaErrorCheck(cudaMalloc((void**)&ppdLocalCountVec, sizeof(FreqT) * CHUNK_SIZE_GRID_0));
			cudaErrorCheck(cudaMemsetAsync(ppdLocalCountVec, 0x00, sizeof(FreqT) * CHUNK_SIZE_GRID_0, pStreamHandles[nDepth]));

			// Dynamically launching the cpdCountChunkKernel to compute the 
			// number of duplicates for each data item in the current chunk transfered to the ppdChunkPtr buffer
			cdpCountChunkKernel <inptype, cnttype=""> << < 1, \
				CHUNK_SIZE_GRID_0, 0, pStreamHandles[nDepth] >> > (ppdChunkPtr, ppdLocalCountVec, CHUNK_SIZE_GRID_0);

			// Asynchronously record the stream event prior to synchronizing all threads across GPU device
			cudaErrorCheck(cudaEventRecord(pStreamEvents[nDepth], pStreamHandles[nDepth]));
			// Synchronizing all threads of the entire GPU device
			cudaErrorCheck(cudaDeviceSynchronize());
			// Waiting until the cudaDeviceSynchronize API call will finish its execution
			cudaErrorCheck(cudaStreamWaitEvent(pStreamHandles[nDepth], pStreamEvents[nDepth], 0));

			// Destorying a stream in which the cdpCountIfKernel is executed
			cudaErrorCheck(cudaStreamDestroy(pStreamHandles[nDepth]));
			// Deallocating the buffer which address is assigned to the ppdChunkPtr variable
			cudaErrorCheck(cudaFree(ppdChunkPtr));

			// Performing reduction by merging the array of the number of duplicates values
			// obtained by each divergent thread into the global array used to store the overall
			// number of duplicates for all items from the array ppdInputVec
			CntType nItem = cdpMergeReduction<cnttype>(ppdLocalCountVec, ppdCountVec, \
				GridType::LOCAL, 0, 0, nItemsCount, CHUNK_SIZE_GRID_0);

			// Deallocating the buffer which address is assigned to the ppdLocalCountVec variable
			cudaErrorCheck(cudaFree(ppdLocalCountVec));

			// Initializing the new non-blocking stream in which we'll execute cdpCountIfKernel
			cudaErrorCheck(cudaStreamCreateWithFlags(&pStreamHandles[nDepth + 1], cudaStreamNonBlocking));
			// Initializing the new event used to synchronize the new non-blocking stream execution
			cudaErrorCheck(cudaEventCreateWithFlags(&pStreamEvents[nDepth + 1], cudaEventDisableTiming));

			// Recursively launching the cdpCountChunkRecursiveKernel to obtain the number of duplicates
			// for the data items in the succeeding chunk of the pInputVec array
			cdpCountChunkRecursiveKernel <inptype, cnttype=""> << < 1, 1, 0, pStreamHandles[nDepth + 1] >> > \
				(ppdInputVec, ppdCountVec, nChunkID + 1, nDepth + 1, nItem, nChunkSize, nSize);
			cudaErrorCheck(cudaDeviceSynchronize());

			// Asynchronously record the stream event prior to synchronizing all threads across GPU device
			cudaErrorCheck(cudaEventRecord(pStreamEvents[nDepth + 1], pStreamHandles[nDepth + 1]));
			// Waiting until the cudaDeviceSynchronize API call will finish its execution
			cudaErrorCheck(cudaStreamWaitEvent(pStreamHandles[nDepth + 1], pStreamEvents[nDepth + 1], 0));
			// Destorying a stream in which the cdpCountChunkRecursiveKernel is executed
			cudaErrorCheck(cudaStreamDestroy(pStreamHandles[nDepth + 1]));
		}
	}
}
</inptype,></cnttype></inptype,></typename>

The following snippets of code implement cdpCountChunkGrid1, cdpCountChunkGrid2 kernels that are similarly to the previously discussed kernel perform the same parallel workshare of the distribution counting process with only one difference that the following kernels rather than performing recursive workshare, normally perform bulk parallelization to compute the number of duplicates of items separately within each chunk of the array ppdInputVec. The cdpCountChunkGrid1 is a parent grid of threads relatively to the threads that execute cdpCountChunkGrid2 kernel.

C++
    template<typename cnttype="__uint64" inptype="__int8," typename="">
__global__ void cdpCountChunkGrid1(InpType* ppdInputVec, FreqT* ppdCountVec, CntType nParentThreadID, CntType nChunkSize, CntType nSize)
{
	// Declare a locally shared array of stream events 
	// pStreamEvents that are used to synchronize each stream
	__shared__ cudaEvent_t pStreamEvents[CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0];
	// Declare a locally shared array of events used to 
	// synchronize each stream in which we'll launch cdpCountIfKernel kernel
	__shared__ cudaStream_t pStreamHandles[CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0];

	// Computing the begining and ending position of the current chunk in the array ppdInputVec
	CntType nBegin = threadIdx.x * nChunkSize;
	CntType nEnd = (threadIdx.x + 1) * nChunkSize;

	// Initializing the new non-blocking stream in which we'll execute cdpCountIfKernel
	cudaErrorCheck(cudaStreamCreateWithFlags(&pStreamHandles[threadIdx.x], cudaStreamNonBlocking));
	// Initializing the new event used to synchronize the new non-blocking stream execution
	cudaErrorCheck(cudaEventCreateWithFlags(&pStreamEvents[threadIdx.x], cudaEventDisableTiming));

	// Performing a check if the ending position of the current
	// chunk doesn't exceed the actual size of the ppdInputVec array
	if (nEnd <= nSize)
	{
		// Declaring a pointer variable that is assigned to the 
		// address of buffer used to store the current chunk in GPU memory
		InpType* ppdChunkPtr = NULL;
		// Allocating buffer ppdChunkPtr used to store the current chunk of the array ppdInputVec into GPU memory
		cudaErrorCheck(cudaMalloc((void**)&ppdChunkPtr, sizeof(InpType) * nChunkSize));
		// Performing the tranfer of the current chunk from ppdInputVec array to the temporary buffer ppdChunkPtr.
		cudaErrorCheck(cudaMemcpyAsync(ppdChunkPtr, &ppdInputVec[nBegin], \
			sizeof(InpType) * nChunkSize, cudaMemcpyDeviceToDevice, pStreamHandles[threadIdx.x]));

		// Declaring the pointer variable that is assigned to the address of a temporary
		// buffer used to store the array of values, each one is the number of 
		// duplicates for each data item within the current chunk
		FreqT* ppdLocalCountVec = NULL;
		// Allocating buffer ppdLocalCountVec used to store the array of values, 
		// each one is the number of duplicates for each data item within the current chunk
		cudaErrorCheck(cudaMalloc((void**)&ppdLocalCountVec, sizeof(FreqT) * CHUNK_SIZE_GRID_1));
		cudaErrorCheck(cudaMemsetAsync(ppdLocalCountVec, 0x00, \
			sizeof(FreqT) * CHUNK_SIZE_GRID_1, pStreamHandles[threadIdx.x]));

		cudaErrorCheck(cudaMemsetAsync(ppdCountVecG1[nParentThreadID][threadIdx.x], 0x00, \
			sizeof(FreqT) * CHUNK_SIZE_GRID_1, pStreamHandles[threadIdx.x]));

		// Dynamically launching the cpdCountChunkRecursiveKernel to compute the 
		// number of duplicates for each data item in the current chunk transfered to the ppdChunkPtr buffer
		cdpCountChunkRecursiveKernel <inptype, cnttype=""> << < 1, 1, 0, \
			pStreamHandles[threadIdx.x] >> > (ppdChunkPtr, ppdLocalCountVec, 0, 0, 0, CHUNK_SIZE_GRID_0, nChunkSize);

		// Asynchronously record the stream event prior to synchronizing all threads across GPU device
		cudaErrorCheck(cudaEventRecord(pStreamEvents[threadIdx.x], pStreamHandles[threadIdx.x]));
		// Synchronizing all threads of the entire GPU device
		cudaErrorCheck(cudaDeviceSynchronize());

		// Iterating through the array of stream handles and for each stream performing an asynchronous
		// wait operation to synchronize all streams in which the cdpCountChunkRecursiveKernel is executed
		for (CntType iIndex = 0; iIndex < CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0; iIndex++)
			cudaErrorCheck(cudaStreamWaitEvent(pStreamHandles[iIndex], pStreamEvents[iIndex], 0));

		// Performing reduction by merging the array of the number of duplicates values
		// obtained by each divergent thread into the locally shared array ppdLocalCountVec
		// used to store the overall number of duplicates for all items from the array ppdInputVec
		cdpMergeReduction<cnttype>(ppdLocalCountVec, NULL, GridType::GRID_1, \
			nParentThreadID, threadIdx.x, 0, CHUNK_SIZE_GRID_1);
		// Performing reduction by merging the array of the number of duplicates values
		// obtained by all divergent threads into the global array ppdCountVec used to store the overall
		// number of duplicates for all items from the array ppdInputVec
		cdpMergeAllReduction<cnttype>(ppdCountVec, GridType::GRID_1, nParentThreadID);

		// Deallocating the buffer which address is assigned to the ppdLocalCountVec variable
		cudaErrorCheck(cudaFree(ppdLocalCountVec));
		// Deallocating the buffer which address is assigned to the ppdChunkPtr variable
		cudaErrorCheck(cudaFree(ppdChunkPtr));
	}
}

template<typename cnttype="__uint64" inptype="__int8," typename="">
__global__ void cdpCountChunkGrid2(InpType* ppdInputVec, FreqT* ppdCountVec, CntType nParentThreadID, CntType nChunkSize, CntType nSize)
{
	// Declare a locally shared array of stream events 
	// pStreamEvents that are used to synchronize each stream
	__shared__ cudaStream_t pStreamHandles[CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0];
	// Declare a locally shared array of events used to 
	// synchronize each stream in which we'll launch cdpCountIfKernel kernel
	__shared__ cudaEvent_t pStreamEvents[CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0];

	// Computing the begining and ending position of the current chunk in the array ppdInputVec
	CntType nBegin = threadIdx.x * nChunkSize;
	CntType nEnd = (threadIdx.x + 1) * nChunkSize;

	// Initializing the new non-blocking stream in which we'll execute cdpCountIfKernel
	cudaErrorCheck(cudaStreamCreateWithFlags(&pStreamHandles[threadIdx.x], cudaStreamNonBlocking));
	// Initializing the new event used to synchronize the new non-blocking stream execution
	cudaErrorCheck(cudaEventCreateWithFlags(&pStreamEvents[threadIdx.x], cudaEventDisableTiming));

	// Performing a check if the ending position of the current
	// chunk doesn't exceed the actual size of the ppdInputVec array
	if (nEnd <= nSize)
	{
		// Declaring a pointer variable that is assigned to the 
		// address of buffer used to store the current chunk in GPU memory
		InpType* ppdChunkPtr = NULL;
		// Asynchronously record the stream event prior to synchronizing all threads across GPU device
		cudaErrorCheck(cudaEventRecord(pStreamEvents[threadIdx.x], pStreamHandles[threadIdx.x]));
		// Allocating buffer ppdLocalCountVec used to store the array of values, 
		// each one is the number of duplicates for each data item within the current chunk
		cudaErrorCheck(cudaMalloc((void**)&ppdChunkPtr, sizeof(InpType) * nChunkSize));
		cudaErrorCheck(cudaMemcpyAsync(ppdChunkPtr, &ppdInputVec[nBegin], \
			sizeof(InpType) * nChunkSize, cudaMemcpyDeviceToDevice, pStreamHandles[threadIdx.x]));

		FreqT* ppdLocalCountVec = NULL;
		// Allocating buffer ppdLocalCountVec used to store the array of values, 
		// each one is the number of duplicates for each data item within the current chunk	
		cudaErrorCheck(cudaMalloc((void**)&ppdLocalCountVec, \
			sizeof(FreqT) * CHUNK_SIZE_GRID_2));
		cudaErrorCheck(cudaMemsetAsync(ppdLocalCountVec, 0x00, \
			sizeof(FreqT) * CHUNK_SIZE_GRID_2, pStreamHandles[threadIdx.x]));

		__syncthreads(); // Perfoming synchronization of all threads in a block
		// Synchronizing all threads of the entire GPU device
		cudaErrorCheck(cudaDeviceSynchronize());

		for (int iIndex = 0; iIndex < CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0; iIndex++)
			cudaErrorCheck(cudaMemsetAsync(ppdCountVecG2[threadIdx.x][iIndex], 0x00, \
				sizeof(FreqT) * CHUNK_SIZE_GRID_2, pStreamHandles[threadIdx.x]));

		// Iterating through the array of stream handles and for each stream performing an asynchronous
		// wait operation to synchronize all streams in which the cdpCountChunkGrid1 is executed
		for (int iIndex = 0; iIndex < CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0; iIndex++)
			cudaErrorCheck(cudaStreamWaitEvent(pStreamHandles[iIndex], pStreamEvents[iIndex], 0));

		// Dynamically launching the cpdCountChunkGrid1 kernel to compute the 
		// number of duplicates for each data item in the current chunk transfered to the ppdChunkPtr buffer
		cdpCountChunkGrid1 <inptype, cnttype=""> << < 1, CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0, \
			0, pStreamHandles[threadIdx.x] >> > (ppdChunkPtr, ppdLocalCountVec, \
				threadIdx.x, CHUNK_SIZE_GRID_1, CHUNK_SIZE_GRID_2);

		// Asynchronously record the stream event prior to synchronizing all threads across GPU device
		cudaErrorCheck(cudaEventRecord(pStreamEvents[threadIdx.x], pStreamHandles[threadIdx.x]));
		// Synchronizing all threads of the entire GPU device
		cudaErrorCheck(cudaDeviceSynchronize());

		// Iterating through the array of stream handles and for each stream performing an asynchronous
		// wait operation to synchronize all streams in which the cdpCountChunkGrid2 is executed
		for (int i = 0; i < CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0; i++)
			cudaErrorCheck(cudaStreamWaitEvent(pStreamHandles[i], pStreamEvents[i], 0));

		__syncthreads(); // Perfoming synchronization of all threads in a block
		 // Synchronizing all threads of the entire GPU device
		cudaErrorCheck(cudaDeviceSynchronize());

		// Performing reduction by merging the array of the number of duplicates values
		// obtained by each divergent thread into the locally shared array ppdLocalCountVec
		// used to store the overall number of duplicates for all items from the array ppdInputVec
		cdpMergeReduction<cnttype>(ppdLocalCountVec, NULL, GridType::GRID_2, nParentThreadID, threadIdx.x, 0, CHUNK_SIZE_GRID_2);
		// Performing reduction by merging the array of the number of duplicates values
		// obtained by all divergent threads into the global array ppdCountVec used to store the overall
		// number of duplicates for all items from the array ppdInputVec
		cdpMergeAllReduction<cnttype>(ppdCountVec, GridType::GRID_2, nParentThreadID);

		// Deallocating the buffer which address is assigned to the ppdLocalCountVec variable
		cudaErrorCheck(cudaFree(ppdLocalCountVec));
		// Deallocating the buffer which address is assigned to the ppdChunkPtr variable
		cudaErrorCheck(cudaFree(ppdChunkPtr));
	}
}
</cnttype></cnttype></inptype,></typename></cnttype></cnttype></inptype,></typename>

The following code implements the host-device function cdpMergeReduction. The following function performs the reduction by actually merging the items in each array used to store the number of duplicate values for each data item obtained by each current thread for each particular chunk of the array ppdInputVec. As the result of performing the merge reduction we obtain the global array containing the overall values of the number of duplicates for each item in all chunks of the ppdInputVec array. To provide a better threads synchronization and avoid data flow dependency issue in this case we're using device globally pinned memory such as ppdCountVecG1 and ppdCountVecG2 buffers to store the values of the number of duplicate per each chunk and per each thread.

C++
__device__ FreqT ppdCountVecG1[CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0] \
[CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0][CHUNK_SIZE_GRID_1] = { { { 0 } } };
__device__ FreqT ppdCountVecG2[CHUNK_SIZE_GRID_2 / CHUNK_SIZE_GRID_0] \
[CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0][CHUNK_SIZE_GRID_2] = { { { 0 } } };

typedef enum { GRID_1 = 1, GRID_2 = 2, LOCAL = 3 } GridType;

template<typename cnttype="__uint64">
__device__ __host__ CntType cdpMergeReduction(FreqT* ppdSourceVec, FreqT* ppdCountVec, GridType Grid, \
	CntType nParentThreadID, CntType nThreadID, CntType nItemsCount, CntType nSize)
{
	CntType nItem = nItemsCount;
	// Iterating through the array ppdSourceVec that used to store the 
	// values of the actual number of data items duplicates for the 
	// current chunk obtained during the current thread execution
	for (CntType iIndex = 0; iIndex < nSize; iIndex++)
		// For each node in the array ppdSourceVec we're performing a check if
		// the value of ppdSourceVec[iIndex].freq > 0 is greater than zero, which means
		// the value of the number of duplicates value for the current data item was already
		// indexes into the array ppdSourceVec
		if (ppdSourceVec[iIndex].freq > 0)
		{
			// Performing a linear search to determine if the current item ppdSourceVec[iIndex] exists in the target array ppdCountVec and
			// obtain the value its index. By performing a linear search we actually iterate through the target array and compare the 
			// value of the variable ppdCountVec[nParentThreadID][nThreadID][nIndex++].value of each node with the value of 
			// ppdSourceVec[iIndex].value variable of the current node in the array ppdSourceVec. 
			CntType nIndex = 0;	bool bExists = false;
			while (nIndex < nItem && bExists == false)
				// If the value of the ppdCountVecG1[nParentThreadID][nThreadID][nIndex++].value variable
				// is equal to the the value of ppdSourceVec[iIndex].value variable of the current node in the array ppdSourceVec,
				// which means that the value of the number of duplicates for the current item in the array ppdInputVec
				// has been already indexed into the array ppdCountVec, we assign the boolean variable bExists = true 
				// and finish the loop execution.
				if (Grid == GridType::GRID_1)
					bExists = (ppdCountVecG1[nParentThreadID][nThreadID][nIndex++].value == \
						ppdSourceVec[iIndex].value) ? 1 : 0;
				else if (Grid == GridType::GRID_2)
					bExists = (ppdCountVecG2[nParentThreadID][nThreadID][nIndex++].value == \
						ppdSourceVec[iIndex].value) ? 1 : 0;
				else if (Grid == GridType::LOCAL)
					bExists = (ppdCountVec[nIndex++].value == \
						ppdSourceVec[iIndex].value) ? 1 : 0;
			
			// If the value of the number of duplicates for the current data item exists in the target array 
			// ppdCountVec, then we're performing and addition of the value of  ppdSourceVec[iIndex].freq variable 
			// with the value of the ppdCountVec[nParentThreadID][nThreadID][nIndex - 1].freq variable
			if (bExists == true)
			{
				if (Grid == GridType::GRID_1)
					ppdCountVecG1[nParentThreadID][nThreadID][nIndex - 1].freq += \
					ppdSourceVec[iIndex].freq;
				else if (Grid == GridType::GRID_2)
					ppdCountVecG2[nParentThreadID][nThreadID][nIndex - 1].freq += \
					ppdSourceVec[iIndex].freq;
				else if (Grid == GridType::LOCAL)
					ppdCountVec[nIndex - 1].freq += \
					ppdSourceVec[iIndex].freq;
			}
			
			// Otherwise, store the number of duplicates value to the target 
			// array ppdCountVec by assigning the value of the ppdSourceVec[iIndex].value 
			// variable to the ppdCountVec[nParentThreadID][nThreadID][nItem].value variable.
			else
			{
				if (Grid == GridType::GRID_1)
				{
					ppdCountVecG1[nParentThreadID][nThreadID][nItem].value = \
						ppdSourceVec[iIndex].value;
					ppdCountVecG1[nParentThreadID][nThreadID][nItem].freq = \
						ppdSourceVec[iIndex].freq;
					nItem++;
				}

				else if (Grid == GridType::GRID_2)
				{
					ppdCountVecG2[nParentThreadID][nThreadID][nItem].value = \
						ppdSourceVec[iIndex].value;
					ppdCountVecG2[nParentThreadID][nThreadID][nItem].freq = \
						ppdSourceVec[iIndex].freq;
					nItem++;
				}

				else if (Grid == GridType::LOCAL)
				{
					ppdCountVec[nItem].value = \
						ppdSourceVec[iIndex].value;
					ppdCountVec[nItem].freq = \
						ppdSourceVec[iIndex].freq;
					nItem++;
				}
			}
		}

	return nItem;</typename>
}

Unlike the cdpMergeReduction function listed above, the cdpMergeAllReduction device function performs the merge reduction of the specific arrays that hold the numbers of duplicates for each chunk per thread across all threads obtaining the number of duplicates for each item in the parent chunk.

C++
template<typename cnttype="__uint64">
__device__ CntType cdpMergeAllReduction(FreqT* ppdCountVec, GridType Grid, CntType nParentThreadID)
{
	CntType nThreadID = 0; CntType nItem = 0;
	while (nThreadID < CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0)
	{
		// Performing the merge reduction for the current thread with thread-id assigned to the
		// nThreadID variable by iterating through the array ppdSourceVec that used to store the 
		// values of the actual number of data items duplicates for the current chunk obtained 
		// during the current thread execution
		for (CntType iIndex = 0; iIndex < CHUNK_SIZE_GRID_1; iIndex++)
			// For each node in the array ppdSourceVec we're performing a check if
			// the value of ppdSourceVec[iIndex].freq > 0 is greater than zero, which means
			// the value of the number of duplicates value for the current data item was already
			// indexes into the array ppdSourceVec
			if (((Grid == GridType::GRID_1) ? ppdCountVecG1[nParentThreadID][nThreadID][iIndex].freq : \
				(Grid == GridType::GRID_2) ? ppdCountVecG2[nParentThreadID][nThreadID][iIndex].freq : 0) > 0)
			{
				// Performing a linear search to determine if the current item ppdSourceVec[iIndex] exists in the target array ppdCountVec and
				// obtain the value its index. By performing a linear search we actually iterate through the target array and compare the 
				// value of the variable ppdCountVec[nParentThreadID][nThreadID][nIndex++].value of each node with the value of 
				// ppdSourceVec[iIndex].value variable of the current node in the array ppdSourceVec. 
				CntType nIndex = 0;	bool bExists = false;
				while (nIndex < nItem && bExists == false)
					bExists = (ppdCountVec[nIndex++].value == \
					((Grid == GridType::GRID_1) ? ppdCountVecG1[nParentThreadID][nThreadID][iIndex].freq : \
						(Grid == GridType::GRID_2) ? ppdCountVecG2[nParentThreadID][nThreadID][iIndex].freq : 0)) ? 1 : 0;

				// If the value of the ppdCountVecG1[nParentThreadID][nThreadID][nIndex++].value variable
				// is equal to the the value of ppdSourceVec[iIndex].value variable of the current node in the array ppdSourceVec,
				// which means that the value of the number of duplicates for the current item in the array ppdInputVec
				// has been already indexed into the array ppdCountVec, we assign the boolean variable bExists = true 
				// and finish the loop execution.
				if (bExists == true)
				{
					ppdCountVec[nIndex - 1].freq += \
						((Grid == GridType::GRID_1) ? ppdCountVecG1[nParentThreadID][nThreadID][iIndex].freq : \
						(Grid == GridType::GRID_2) ? ppdCountVecG2[nParentThreadID][nThreadID][iIndex].freq : 0);
				}

				// Otherwise, store the number of duplicates value to the target 
				// array ppdCountVec by assigning the value of the ppdSourceVec[iIndex].value 
				// variable to the ppdCountVec[nParentThreadID][nThreadID][nItem].value variable.
				else
				{
					ppdCountVec[nItem].value = \
						((Grid == GridType::GRID_1) ? ppdCountVecG1[nParentThreadID][nThreadID][iIndex].value : \
						(Grid == GridType::GRID_2) ? ppdCountVecG2[nParentThreadID][nThreadID][iIndex].value : 0);
					ppdCountVec[nItem].freq = \
						((Grid == GridType::GRID_1) ? ppdCountVecG1[nParentThreadID][nThreadID][iIndex].freq : \
						(Grid == GridType::GRID_2) ? ppdCountVecG2[nParentThreadID][nThreadID][iIndex].freq : 0);

					nItem++;
				}
			}

		nThreadID++;
	}

	return nItem;
}    
</typename>

The following function is the main host function to perform the distribution count on the host. Similarly to the kernel functions discussed above the following function performs the parallel workshare of the distribution counting process by sequentially performing cdpCountChunkGrid2 kernel using streams to obtain the array of the number of duplicates values for each data item within each chunk of the pData array. The following function perform the reduction of the those arrays by calling the discussed above cdpMergeReduction function:

C++
    template<typename InpType = __int8, typename CntType = __uint64>
__host__ CntType cdpDistCountLaunch(InpType* pData, FreqT* phFreqT, CntType nChunks)
{
	// Allocating the array of streams handles
	cudaStream_t* phStreamHandles = (cudaStream_t*)malloc(sizeof(cudaStream_t) * nChunks);
	// Allocating the array of stream events handles
	cudaEvent_t* phStreamEvents = (cudaEvent_t*)malloc(sizeof(cudaEvent_t) * nChunks);

	// Iterating through the arrays of streams and initializing a handle for each 
	// new non-blocking stream in which we'll execute cdpCountIfKernel.
	// Also we're initializing the new event used to synchronize each 
	// new non-blocking stream execution
	for (__uint64 iIndex = 0; iIndex < nChunks; iIndex++)
	{
		// Initializing the new non-blocking stream in which we'll execute cdpCountIfKernel
		cudaErrorCheckHost(cudaStreamCreateWithFlags(&phStreamHandles[iIndex], cudaStreamNonBlocking));
		// Initializing the new event used to synchronize the new non-blocking stream execution
		cudaErrorCheckHost(cudaEventCreateWithFlags(&phStreamEvents[iIndex], cudaEventBlockingSync));
	}

	__uint64 nItem = 0, nParentChunkID = 0;
	for (__uint64 iIndex = 0; iIndex < nChunks; iIndex++)
	{
		// Computing the offset of the current chunk in the array pData
		__uint64 nChunkOffset = iIndex * CHUNK_SIZE_HOST;
		// Computing the parent chunk-id for each chunk being processed
		nParentChunkID = (iIndex % 100 == 0) ? 0 : nParentChunkID;

		fprintf(stdout, "ChunkOffset = %llu nParentChunkID = %llu Chunk = %llu of %llu\n", \
			nChunkOffset, nParentChunkID, iIndex, nChunks);

		__int8* ppdChunkBuf = NULL;
		// Allocating buffer ppdChunkPtr used to store the current chunk of the array ppdInputVec into GPU unified memory
		cudaErrorCheckHost(cudaMallocManaged((void**)&ppdChunkBuf, sizeof(__int8) * CHUNK_SIZE_HOST - 1));
		cudaErrorCheckHost(cudaMemcpyAsync(ppdChunkBuf, &pData[nChunkOffset], \
			sizeof(__int8) * CHUNK_SIZE_HOST - 1, cudaMemcpyHostToDevice, phStreamHandles[iIndex]));

		FreqT* ppdCountVec = NULL;
		// Allocating buffer ppdCountVec used to store the array of values, 
		// each one is the number of duplicates for each data item within the current chunk	
		cudaErrorCheckHost(cudaMallocManaged((void**)&ppdCountVec, sizeof(FreqT) * CHUNK_SIZE_HOST));
		cudaErrorCheckHost(cudaMemsetAsync((void*)ppdCountVec, 0x00, \
			sizeof(FreqT) * CHUNK_SIZE_HOST, phStreamHandles[iIndex]));

		// Asynchronously record the stream event prior to synchronizing all threads across GPU device
		cudaErrorCheckHost(cudaEventRecord(phStreamEvents[iIndex], phStreamHandles[iIndex]));

		// Launching the cpdCountChunkGrid1 kernel to compute the 
		// number of duplicates for each data item in the current chunk transfered to the ppdChunkPtr buffer
		cdpCountChunkGrid2 <__int8, __uint64> << < 1, CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0, 0, \
			phStreamHandles[iIndex] >> > (ppdChunkBuf, ppdCountVec, \
				nParentChunkID, CHUNK_SIZE_GRID_2, CHUNK_SIZE_HOST);

		// Performing a synchronization of all active streams being executed
		cudaErrorCheckHost(cudaStreamSynchronize(phStreamHandles[iIndex]));

		// Performing a synchronization of all threads on the host
		cudaErrorCheckHost(cudaThreadSynchronize());
		// Synchronizing all threads of the entire GPU device
		cudaErrorCheckHost(cudaDeviceSynchronize());
		// Waiting until the cudaDeviceSynchronize API call will finish its execution
		cudaErrorCheckHost(cudaStreamWaitEvent(phStreamHandles[iIndex], phStreamEvents[iIndex], 0));

		// Allocating buffer phCountVec on host to store the number of duplicates 
		// for each item within the current chunk 
		FreqT* phCountVec = (FreqT*)malloc(sizeof(FreqT) * CHUNK_SIZE_HOST);
		memset((void*)phCountVec, 0x00, sizeof(FreqT) * CHUNK_SIZE_HOST);

		// Tranfering the buffer ppdCountVec from the device to host memory
		cudaErrorCheckHost(cudaMemcpyAsync(phCountVec, ppdCountVec, \
			sizeof(FreqT) * CHUNK_SIZE_HOST, cudaMemcpyDeviceToHost, phStreamHandles[iIndex]));

		// Performing reduction by merging the array of the number of duplicates values
		// obtained during the kernel execution into the array phCountVec used to store 
		// the overall number of duplicates for all items from the array ppdChunkPtr
		nItem = cdpMergeReduction<__uint64>(phCountVec, phFreqT, GridType::LOCAL, 0, 0, nItem, CHUNK_SIZE_HOST);

		// Incrementing the value of nParentChunkID by 1
		nParentChunkID++;
		
		// Deallocating the buffer which address is assigned to the ppdChunkBuf variable
		cudaErrorCheckHost(cudaFree(ppdChunkBuf));
		// Deallocating the buffer which address is assigned to the ppdCountVec variable
		cudaErrorCheckHost(cudaFree(ppdCountVec));

		// Deallocating the buffer which address is assigned to the phCountVec variable
		free(phCountVec);
	}

	// Iterating through the array of either streams of events handles, destroying
	// each stream and event in which the cdpCountChunkGrid2 kernel has been executed
	for (__uint64 iIndex = 0; iIndex < nChunks; iIndex++)
	{
		cudaErrorCheckHost(cudaEventDestroy(phStreamEvents[iIndex]));
		cudaErrorCheckHost(cudaStreamDestroy(phStreamHandles[iIndex]));
	}
	
	return nItem;
}

 

How To Run This Code

To run the code being discussed in this article it's strongly recommended that NVIDIA CUDA 8.0 Toolkit is installed on the host on which you're going to run the CUDA application executable available for download at the top of this article. Also, to avoid the possible interference and conflicts with your PC's graphics card driver it's recommended to set WDDM TDR Delay parameter to 10 seconds. Another limitation for the project intoduced in the following article is that it's recommended to compile and run the following projects by using compute_61, sm_61 compatibility version graphics cards such as NVIDIA GeForce GTX1070 GPU Pascal or higher.

History

  • December 10, 2016 - The first version of the article has been published.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior) EpsilonDev
Ukraine Ukraine
I’m software developer, system analyst and network engineer, with over 20 years experience, graduated from L’viv State Polytechnic University and earned my computer science and information technology master’s degree in January 2004. My professional career began as a financial and accounting software developer in EpsilonDev company, located at L’viv, Ukraine. My favorite programming languages - C/C++, C#.NET, Java, ASP.NET, Node.js/JavaScript, PHP, Perl, Python, SQL, HTML5, etc. While developing applications, I basically use various of IDE’s and development tools, including Microsoft Visual Studio/Code, Eclipse IDE for Linux, IntelliJ/IDEA for writing code in Java. My professional interests basically include data processing and analysis algorithms, artificial intelligence and data mining, system analysis, modern high-performance computing (HPC), development of client-server web-applications using various of libraries, frameworks and tools. I’m also interested in cloud-computing, system security audit, IoT, networking architecture design, hardware engineering, technical writing, etc. Besides of software development, I also admire to write and compose technical articles, walkthroughs and reviews about the new IT- technological trends and industrial content. I published my first article at CodeProject in June 2015.

Comments and Discussions

 
-- There are no messages in this forum --