“CUDA® is a parallel computing platform and programming model invented by NVIDIA. It enables dramatic increases in computing performance by harnessing the power of the graphics processing unit (GPU).” – Dr. John Owens, UC Davis and Dr. David Luebke, NVIDIA Corp.“
Introduction
In this article, we’ll demonstrate how to use NVIDIA CUDA® multithreaded hardware platform and programming model to develop an efficient parallel code that implements the famous distribution counting algorithm especially designed for solving the complex statistical problem of finding the overall amounts of duplicates for each item in the given dataset of a typically huge size, containing up to 10^6 - 10^9 data items. Since it was initially proposed, the following algorithm remains the only approach for solving a wide-range of computational problems such as collecting statistics of surveys, performing the computation of statistical median for a series of items in an unordered dataset, the value of which in turn is frequently used to reduce the level of noise during the image processing of monochrome raster images as well as in the cluster analysis to increase the distance between clusters produced by k-means clustering algorithm, that itself, provides a better quality of clustering, etc.
In fact, the conventional distribution counting algorithm is mainly based on performing a linear search to find all duplicates for a specific item in the given dataset. In this case, the linear search is performed for each of N data items, which significantly increase the complexity of the computations being performed. Under the normal condition, complexity of the distribution counting algorithm could be estimated as O(Nx(N+k)) >=O(N^2) of trivial comparisons, where N - the amount of data items in the given dataset, k - the number of items in the resulting dataset that contains the occurrence frequencies for each data item from the given dataset. Normally, we consider the value of k as an overhead value for this particular algorithm. Obviously, that, the following value represents the potential size of a resulting frequency table, which, in turn, largely depends on the value of smallest and largest item in the given dataset.
By analyzing the problem being discussed, we might think that we normally can avoid the distribution counting algorithm implementation by using one of the existing sorting algorithm to perform the similar task described above. If we delve into this problem more deeply, we’ll notice that none of the existing sorting algorithms can be used to perform a distribution counting except for the counting sort algorithm that was deemed as an impractical for sorting from one respect, but, from another one, can be efficiently used to solve the problem being discussed.
As we’ve already discussed above, one of the most obvious disadvantages of the conventional distribution counting algorithm is performance of finding the data items duplicates itself, in case, when we use the following algorithm to process the mentioned above typically huge datasets. From the table 1 shown below, we can see how the linear search performed for each particular data item, along with the overhead processing, is capable to affect the overall performance of code that sequentially performs the distribution counting in the datasets with size varying from 10^6 to 10^9 data items:
Table 1: The computational complexity of the sequential distribution counting algorithm
Dataset
N - items
| Frequency Table
k-items
| Computational Complexity p =O(Nx(N+k))
| Sequential Code
Execution Time
(Best-Case Performance)
|
1000000
| 10^6
| 2 x 10^12
| 0h:12m:38s
|
10000000
| 10^7
| 2 x 10^14
| 2h:11m:12s
|
100000000
| 10^8
| 2 x 10^16
| 22h:05m:12s
|
1000000000
| 10^9
| 2 x 10^18
| 232h:34m:23s
|
The empirical statistics that indicates the regular performance of the distribution counting algorithm was estimated by running the specific sequential code on a single computational unit based on the modern CPU Intel Core i7-4790 4.0 GHZ with 32GB of RAM.
By processing the typically huge datasets, the following algorithm is capable to produce the resulting dataset of data items occurrence frequencies for not less than 728 seconds ~ 12m:38s while processing datasets which size is approximately equal or greater than 10^6 data items. Also, in the case when the overall size of the given dataset is increasing, suppose, it becomes 10 times larger, then the performance of that sequentially executed code significantly degrades and has a very long duration compared to other complicated data processing tasks such as games, deep learning AI neural networks, HDD tools and utilities processing the large amounts of clusters in the modern hard drives, and other complex data processing tasks. For example, if you run the following sequential code to find the duplicates of 10^8 - 10^9 data items, it’s capable to accomplish the following processing task for an almost idle time.
This, in turn, actually, means that the problem of processing of the huge datasets, containing typically 10^8-10^9 data items, being discussed, can never be solved without using specific multithreaded libraries and framework that allow to harness the power of parallel computing such as hardware-level parallelism supported by the most of the existing modern multi-core CPUs, the separate hardware platforms and programming models such as NVIDIA CUDA® that allows to use the resources of the most CUDA-enabled GPUs to solve the variety of complex computational problems particularly the problem of increasing the performance of the sequential code intended to process the huge typically sets of data.
Since NVIDIA Corporation first invented and introduced CUDA® technology, - parallel computing GPU hardware platform and programming model implemented as a multithreaded parallelization framework libraries and tools for many programming languages such as C/C++, Fortran, Java, Python, etc. Unlike the other multithreading frameworks, NVIDIA CUDA® technology provides capabilities of performing complex computations unveiling the power of the most of CUDA-enabled GPUs. It serves as an alternative to the conventional CPU-based computing, turning regular average PC into a powerful supercomputer that involves the almost limitless resources of NVIDIA graphics card’s GPU and memory to perform complex computations. The using of NVIDIA CUDA® provides significant performance acceleration gain for those applications that create a huge workload on the CPU and system memory when processing of enormously huge amounts of data.
In the following article, we’ll discuss about the NVIDIA CUDA® scalable programming model architecture as an efficient platform for performing parallel multithreaded computations, and, at the same time, provide a detailed explanation of how to transform sequentially executed code that implements NP-hard poorly scalable conventional distribution counting algorithm to be run in parallel performed by the warps of divergent hardware-level threads executed by an array of GPU’s streaming multiprocessors (SM).
The following, normally, can be accomplished by transforming the sequential loops of the computational process being discussed into CUDA kernels executed by a large number of threads in parallel. CUDA kernels are typically launched from the host code while the fragment of code of each kernel is executed on the device. At the very beginning of the following article we’ll talk about the either host or device code separate execution topic, which is one of the most fundamental aspects of CUDA programming model.
In this article, we’ll also take a closer look at how to properly divide the entire process of finding data items duplicates into the number of independent sub-processes capable to be perform the following task in parallel. To do this, we’ll use the various approaches such as either the concepts of CUDA’s bulk or dynamic parallelism to efficiently parallelize the data-process formulated by distribution counting algorithm leveraging CUDA® multi-core processors hardware platform.
In particular, we’ll discuss the different scenarios of dynamic parallelism such as either linear or recursive parallel threads execution as well as how to efficiently workshare the entire process of finding data items duplicates performed by one or more nested sequential loops across multiple parallel GPU hardware threads, which, in turn, allows to significantly reduce the computational complexity of the algorithm designed to solve the problem being discussed.
During the discussion, we’ll also make a large emphasize on such important code modernization aspects as barrier synchronization of divergent threads blocks that access the data stored into GPU device shared memory as well as specific reduction mechanisms that allow to survive the number of serious well-known issues and side-effects such as data flow dependency and race condition that in many cases obstacle the process of code modernization making it impossible to parallelize many existing algorithms.
Furthermore, during the code modernization, we’ll learn how to use atomic data operations such as atomic addition, increment and compare-and-swap (CAS) to overcome the occurrence of race condition issues in the code being parallelized. Also, we’ll explain how to implement pinned memory, the using of which allows to effectively avoid the race condition occurrence, in cases, when the following issue cannot be survived by using the either barrier synchronization or atomic operations.
Since we’ve found out how transform the sequential loops of the code that implements distribution counting algorithm into kernels executed by the blocks consisting the large number of threads, we’ll learn how to optimize the performance of those kernels launched asynchronously on the host by using CUDA streaming mechanism. Particularly, we’ll discuss how to launch the kernels in either host or device code each one to be executed by a certain streaming multiprocessor in its own stream making the sequential kernel calls synchronous. At the same time, we’ll provide a detailed description of such important aspects as either implicit or explicit streams synchronization and explain the difference between implicit synchronization of a legacy default stream and explicit synchronization of those streams created by using specific CUDA Runtime API calls. During the discussion, we’ll talk about specific event-based mechanism which is the only approach for synchronizing streams.
Also, along with the discussion of mentioned above parallelization techniques, in this article, we’ll provide an overview such important aspects of using CUDA programming model as either host or device memory allocation and management such as specific data transfers between host and device memory during the parallel code execution by using CUDA Runtime API routines.
Besides the CUDA streaming and memory management topics, in this article, we’ll learn how to estimate the code executing time by using events management functions of CUDA Runtime API.
Background
Before we begin, in this section, we’ll explain the conventional distribution counting algorithm used to find the duplicates for each data item in the given dataset and store the statistical data on each item obtained into the array of frequencies. In this section, we’ll thoroughly discuss the operations performed by following algorithm as well as provide detailed recommendations on how to optimize the performance by finding the optimization hotspots in the sequential code that implements distribution counting algorithm.
Distribution Counting Algorithm Overview
Distribution Counting Algorithm (DCA) – one of the most trivial algorithms for solving the statistical problem of finding the duplicates of each data item in the given dataset. As we’ve already discussed, the following algorithm is commonly used for many applications as computing the value of statistical median, gathering the statistics on surveys, as a part of various partitioning and AI algorithms such as k-means clustering algorithm designed to solve the classification problem, etc. In details, the distribution counting algorithm was discussed by Robert Sedgewick in his book “Fundamental Algorithms in C Part 1”.
The following algorithm can be formulated as follows: suppose we’re having a certain unordered dataset data containing N - randomly generated data items of particular data type. To find the overall number of duplicates for each data item in the given dataset we normally need to increment a loop counter variable index from 0 to N and for each data item data[index] perform the linear search in the subset of succeeding data items to find the number of those items having the same value as the current item data[index]. For that purpose, we need to declare the variable count = 1, in which we will accumulate the value of the number of occurrences of each item data[index] as the result of performing the linear search discussed below. To perform the linear search, we need to use another loop counter variable nindex by incrementing its value from index + 1 to N and for each item data[nindex] to perform a comparison of current item’s value with the value of data[index] item. If both values are equal, then we increment the value of the variable count by 1, otherwise proceed the comparison with the next item data[nindex]. Finally, at the end of linear search performed the variable count will hold the actual number of occurrences of item data[index] in the array data. The value of count variable is the total occurrence frequency value for the current item data[index].
For instance, we have an array of values data[N] = { 3, 1, 6, 2, 1, 3, 6 } containing N - data items of integer type and we need to find the occurrence frequency for the first data item with index = 0, which value is equal to 3. At this point, the variable count is assigned to the value of 1 and we’re normally starting to compare each item data[nindex] with indexes nindex from 1 to N with the value of the following item data[index]=data[0]=3. During the comparison, we determine that the item with index nindex = 5 is equal to the current data item with index index = 0. In this case, we increment the value of count variable so that its value becomes equal to count = count + 1 = 2. We’re iterating to the end of array and reveal that there’s no more items which value is equal to the current item’s value data[index] = 3. Since that the value of the variable count = 2 is the value of the occurrence frequency of the first data item data[0] = 3 in the given dataset. Similarly, by iterating through the array of items we proceed with the next items in the array data.
Since we’ve obtained the occurrence frequency value of the current item data[index] in the array, we need to append the following value to the resulting set of frequencies. Note that, each entry to the resulting frequency table is a pair of two values which is either the value of a data item data[index] and the total number of occurrences count. The resulting set of data items frequencies is represented as an array of objects of structure FREQ_T defined in the code snippet below.
Before appending the current entry to the associative array used to store each data item’s frequency value, we need to perform a check if the number of occurrences for the current item data[index] has not previously been computed. To do this we’ll need to declare a boolean variable exists and initialize it with value false. After that, we should iterate loop counter variable item from ft_item - 1 downto 0 until the value of exists variable is not equal to true, or the end of resulting array of frequencies has been reached. By iterating the resulting array of frequencies, for each node of freq_t[item] we get the value of freq_t[item].n_val variable and compare it with the value of the current item data[index]. If the value of freq_t[item].n_val variable for the current node item is equal to the value of data[index], we set the value of exists = true, which means that the number of occurrences for the current item data[index] has been already computed and it’s not necessary to append the frequency data on the current item data[index] to the resulting set. Otherwise, if none of the nodes has the value of freq_t[item].n_val variable equal to the current item’s value data[index], then we assign the variables of freq_t[item].n_val and freq_t[ft_item].n_freq of the node with index ft_item to the values of data[index] and count respectively. Finally, we increment the value of variable ft_item by 1 and proceed with the next item in the array data.
The fragment of code below illustrates the very basic implementation of the distribution counting algorithm:
typedef struct tagFreqT
{
__int64 n_val;
__int64 n_freq;
} FREQ_T;
void dist_count(const __int64* data, FREQ_T* freq_t, const size_t N)
{
__int64 ft_item = 0;
for (__int64 index = 0; index < N; index++) {
__int64 count = 1;
for (__int64 nindex = index + 1; nindex < N; nindex++)
if (data[nindex] == data[index]) count++;
bool exists = false;
for (__int64 item = ft_item - 1; item >= 0 && !exists; item--) if (freq_t[item].n_val == data[index]) exists = true;
if (exists == false)
{
freq_t[ft_item].n_val = data[index];
freq_t[ft_item++].n_freq = count;
}
}
}
The Algorithm’s Multithreaded Parallel Performance Optimization
As you might notice from the code snippet above, according to the algorithm being discussed, the entire process of finding the number of duplicates for each data item mostly relies upon performing the two nested loops. The first outermost loop { 1 } is normally performed to iterate through the array of data items. Another, nested loop basically performs the linear search to find all duplicates of each current item in the subset of succeeding data items and accumulates the number of occurrences obtained into the variable count.
Now, let’s take a closer look to the performance of the nested loop { 2 }. The following loop while being executed performs exactly p = O(N) - iterations to perform the linear search by traversing the dataset to find the number of duplicates for each current item data[index]. Also, at the end of performing the linear search to obtain the number of data item’s occurrences, we normally executing another loop to iterate through the array freq_t to determine if there were no entries for the current data and we’ve not yet previously performed the distribution counting for the current item data[index]. This, in turn, causes a sufficient overhead to the process of finding data items duplicates, since the following loop performs exactly - iterations to check if the current item hasn’t already been indexed into the frequency table freq_t. That’s actually why, the total complexity for this fragment of code implementing the linear search can be estimated as p = O(N+k) iterations.
According to the distribution counting algorithm, the following linear search is repeatedly performed for each of N - items retrieved from the given dataset during each iteration of outermost sequential loop. Thus, the value of complexity of the linear search performed for each item data[index] in dataset is multiplied by the value N - the number of iterations of the following outermost loop and can be estimated as which normally exceeds the square computational complexity O(N^2) of the most sorting and other algorithms performing the complex computations.
Obviously, that, the nested loop that performs the linear search in this case turns to be the parallel multithreaded optimization hotspot. According to the best practices, the performance acceleration gain of the code that implements the distribution counting algorithm can be much higher if we perform the iterations of the nested loop in parallel executed by multiple threads, so that, each iteration is being performed in its own thread. This, in turn, will ensure that the code being discussed is performed much more faster compared to the case when this code is executed sequentially. The nested loop parallel execution provides almost up to 65% of the performance acceleration gain of the entire algorithm.
The outermost loop is another performance optimization hotspot we’re going to discuss. Unlike the other algorithms, especially those ones that perform sorting, the process of finding duplicates for each item in the given dataset, formulated by the distribution counting algorithm, has merely no data flow dependency issues. This actually means that the code that performs the linear search during each iteration of the outermost loop doesn’t normally access the data processed as the result of performing the previous and next iterations. In this particular case, the following code is able to perform the linear search to find the number of duplicates for each item in the given dataset independently. The actual count of duplicates value computed by performing the linear search during each iteration of the outermost loop is not depending on the count values obtained for the other similar items in the given dataset. This, in turn, benefits in the proper and efficient parallelization of the entire algorithm being discussed.
However, there’s another nested loop { 3 } in this code that performs the linear search to determine if the actual number of duplicates value for the current item data[index] has already been indexed into the array representing items frequency table. Unfortunately, the following nested loop cannot be perfectly parallelized due to the data flow dependency that typically might be incurred during the following loop execution. In this case, to ensure that the following code is properly executed, we need to implement the specific thread synchronization mechanism such as critical sections locks to avoid the data flow dependency issue that might persist.
The Code Optimized Using Intel OpenMP Performance Library
Here’s another fragment of code implementing the parallel scalable distribution counting algorithm using Intel OpenMP library. During the parallel multithreaded optimization of the code listed above we’ve exactly checked with the optimization hints discussed in the previous paragraphs of the following section:
void omp_dist_count(const __int64* data, FREQ_T* freq_t, const size_t N, const size_t FT_SIZE)
{
__int64 ft_item = 0; omp_lock_t lock;
int max_threads = omp_get_max_threads();
omp_init_lock(&lock);
#pragma omp parallel default(none) \ // { 1 }
shared(data, freq_t, max_threads, ft_item, lock) num_threads(max_threads)
{
int thread_id = omp_get_thread_num();
__int64 start = thread_id * (N / max_threads);
__int64 end = (thread_id + 1) * (N / max_threads);
for (int index = start; index < end; index++)
{
__int64 count = 0;
#pragma omp parallel for default(none) \ // { 2 }
reduction(+:count) shared(data, index)
for (__int64 nindex = 0; nindex < N; nindex++)
if (data[nindex] == data[index]) count++;
omp_set_lock(&lock);
bool exists = false;
for (__int64 item = FT_SIZE - 1; item >= 0 && !exists; item--)
if (freq_t[item].n_val == data[index]) exists = true;
if (exists == false)
{
freq_t[ft_item].n_freq = count;
freq_t[ft_item++].n_val = data[index];
}
omp_unset_lock(&lock);
}
}
omp_destroy_lock(&lock);
}
As we can see from the code fragment listed above, the very first optimization applied to the algorithm’s sequential code is that we’ve performed a work-sharing of the outermost loop { 1 } used to iterate through the array of data items by splitting up the overall number of iterations to be executed by multiple CPU hardware-level threads, each one is executing its own copy of the parallel region being defined. This, in turn, allows to significantly relax CPU workload by scaling the entire scope of operations being performed across two or more CPU cores.
After we’ve transformed the outermost loop by using parallel workshare construct, we have to provide tight loop parallelization to the nested loop { 2 }. To do that, in this case, we’re using #pragma omp parallel for directive construct to execute each iteration of the nested loop in parallel, which provides the actual speed-up during the parallel code execution.
Note that, in this case, we’re implementing tight-loop parallelization inside the parallel region executed by multiple threads. This normally is called an OpenMP’s nested parallelism. The main goal of using nested parallelism strategy is to provide an even better performance acceleration gain to the code being optimized.
Another optimization aspect that we’ll discuss about is the reduction mechanism used to avoid race condition issues during the nested loop { 2 } parallel execution. As we can see from the code snippet listed above, during each iteration of the nested loop { 2 } executed by its own thread, we’re performing a check if the value of data[nindex] item is equal to the value of the current item data[index] for which we’re estimating the number of duplicates. If so, we’re performing increment of the shared variable count. In this case, to obtain the correct value of the following variable, we’ll perform an explicit reduction of the increment operator by using reduction(+:count) clause.
However, some fragments of code listed above including the nested loop { 3 } still remains not parallelized since the attempts to run the following fragment in parallel normally lead to the data flow dependency issue occurrence. Since the following fragment of code is intended to be run by each thread sequentially, at this point, we’re using an explicit synchronization mechanism such as critical sections locks by invoking OpenMP API routines omp_set_lock(&lock), omp_set_unlock(&lock), that ensure that the following fragment of code is executed sequentially by each thread.
The main disadvantage of the OpenMP parallel code listed above is that he occurrence of data flow dependency issue during the nested loop { 3 } execution normally causes the following code might not to entirely scale across all available CPU cores. For that purpose, we used Intel Amplifier XE to measure and assess the overall performance of the code being parallelized by using OpenMP library. Fig.1. illustrates how the parallel code that performs distribution counting is scaled across multiple CPU cores:
Figure 1: Parallel Code Performance Assessment Using Intel Amplifier XE
As we can see from the fig.1., almost a half of threads are poorly utilized due to the growing value of the performance potential gain and CPU load imbalance which is the result of insufficient parallelization of the fragment of code implementing second nested loop discussed above. Also, using the critical sections locks normally causes sufficient overhead during parallel code execution. From the top hotspots profiling section above we can see that omp_set_lock(&lock), omp_set_unlock(&lock) functions calls has the longest duration compared to specific fork-join calls and implicit barrier synchronization.
In conclusion, as we can see from the either sequential or parallel code fragments, we cannot normally achieve an ideal parallelization of the following code by using Intel OpenMP performance library. One of the reasons why this code cannot be perfectly parallelized is that there’s a lack of implicit synchronization provided by the multithreaded primitives of the OpenMP library itself while executing the following code by scaling it leveraging multiple CPU’s cores.
In the next section of this article we’ll discuss about how to use NVIDIA CUDA®, - parallel multithreading platform that allows to provide more efficient parallelization to the code that implements distribution counting algorithm.
Using the code
The cpdCountChunkKernel kernel while being executed by the multiple divergent threads, retrieves each data item from the array ppdInputVec and performs the linear search to find the actual number of duplicates for each current data item ppdInputVec[threadIdx.x] by dynamically launching another kernel cdpCountIfKernel that performing a trivial count-if operation discussed below:
template<typename cnttype="__uint64" inptype="__int8," typename="">
__global__ void cdpCountChunkKernel(InpType* ppdInputVec, FreqT* ppdCountVec, CntType nSize)
{
__shared__ cudaEvent_t pStreamEvents[CHUNK_SIZE_GRID_0];
__shared__ cudaStream_t pStreamHandles[CHUNK_SIZE_GRID_0];
__shared__ FreqT ppdSHCountVec[CHUNK_SIZE_GRID_0];
if (threadIdx.x < nSize)
{
cudaErrorCheck(cudaStreamCreateWithFlags(&pStreamHandles[threadIdx.x], cudaStreamNonBlocking));
cudaErrorCheck(cudaEventCreateWithFlags(&pStreamEvents[threadIdx.x], cudaEventDisableTiming));
cudaErrorCheck(cudaDeviceSynchronize());
if (threadIdx.x == 0)
{
for (CntType iIndex = 0; iIndex < CHUNK_SIZE_GRID_0; iIndex++)
ppdSHCountVec[iIndex].value = ppdSHCountVec[iIndex].freq = 0;
}
CntType* ppdCountValue = NULL;
InpType nValue = ppdInputVec[threadIdx.x];
cudaErrorCheck(cudaMalloc((void**)&ppdCountValue, sizeof(CntType)));
cdpCountIfKernel<inptype, cnttype=""> << < 1, \
blockDim.x, 0, pStreamHandles[threadIdx.x] >> > (ppdInputVec, ppdCountValue, nValue, nSize);
cudaErrorCheck(cudaEventRecord(pStreamEvents[threadIdx.x], pStreamHandles[threadIdx.x]));
cudaErrorCheck(cudaDeviceSynchronize());
cudaErrorCheck(cudaStreamWaitEvent(pStreamHandles[threadIdx.x], pStreamEvents[threadIdx.x], 0));
__syncthreads(); if (*ppdCountValue > 0 && *ppdCountValue < nSize)
{
__syncthreads(); ppdSHCountVec[threadIdx.x].value = nValue;
ppdSHCountVec[threadIdx.x].freq = *ppdCountValue;
cudaErrorCheck(cudaDeviceSynchronize());
bool bExists = false;
for (int iIndex = 0; iIndex < threadIdx.x && !bExists; iIndex++)
if (ppdSHCountVec[iIndex].value == nValue) bExists = true;
__syncthreads(); if (bExists == false)
{
__syncthreads(); ppdCountVec[threadIdx.x] = ppdSHCountVec[threadIdx.x];
}
}
cudaErrorCheck(cudaFree(ppdCountValue));
cudaErrorCheck(cudaStreamDestroy(pStreamHandles[threadIdx.x]))
}
}
</inptype,></typename>
The following fragment of code implements cdpCountIfKernel that performs the linear search to find the number of duplicates for the current data item in the array ppdInpVec, which value is assigned to nValue variable. It performs the atomic increment operation on locally shared variable nSHLocalCount used to accumulate the number of duplicates value across all threads, each one executing its own instance of cdpCountIfKernel. At the end of each thread execution the value of nSHLocalCount variable is copied to the global context memory which address is assigned to the ppdCountVec global variable.
template<typename cnttype="__uint64" inptype="__int8," typename="">
__global__ void cdpCountIfKernel(const InpType* ppdInputVec, \
CntType* ppdCountVec, const InpType nValue, CntType nSize)
{
__shared__ CntType nSHLocalCount;
if (threadIdx.x < nSize)
{
__syncthreads(); if (threadIdx.x == 0) nSHLocalCount = 0;
if (ppdInputVec[threadIdx.x] == nValue)
{
__syncthreads(); atomicAdd(&nSHLocalCount, 1);
cudaErrorCheck(cudaDeviceSynchronize());
}
if (nSHLocalCount > 0)
{
__syncthreads(); cudaErrorCheck(cudaDeviceSynchronize());
*ppdCountVec = nSHLocalCount;
}
}
}
</typename>
The following fragment of code implements the cdpCountChunkRecursiveKernel kernel that performs the parallel workshare of the distribution couting process between multiple divergent threads each one computing the number of duplicates for a set of items within its own chunk. Each thread dynamically launches the cdpCountChunkKernel to obtain the values of the number of duplicates for each particular item within the current chunk which boundaries are computed at the top of the cdpCountChunkRecursiveKernel execution. After each thread computes the values of the number of duplicates for each item within the current chunk, the following kernel performs the reduction by merging each array of the number of duplicates for each current chunk into a global array ppdCountVec:
template<typename cnttype="__uint64" inptype="__int8," typename="">
__global__ void cdpCountChunkRecursiveKernel(InpType* ppdInputVec, FreqT* ppdCountVec, \
CntType nChunkID, CntType nDepth, CntType nItemsCount, CntType nChunkSize, CntType nSize)
{
__shared__ cudaEvent_t pStreamEvents[CUDA_MAX_DEPTH];
__shared__ cudaStream_t pStreamHandles[CUDA_MAX_DEPTH];
if (nDepth < CUDA_MAX_DEPTH)
{
CntType nBegin = nChunkID * nChunkSize;
CntType nEnd = (nChunkID + 1) * nChunkSize;
cudaErrorCheck(cudaStreamCreateWithFlags(&pStreamHandles[nDepth], cudaStreamNonBlocking));
cudaErrorCheck(cudaEventCreateWithFlags(&pStreamEvents[nDepth], cudaEventDisableTiming));
if (nEnd <= nSize)
{
InpType* ppdChunkPtr = NULL;
cudaErrorCheck(cudaMalloc((void**)&ppdChunkPtr, sizeof(InpType) * nChunkSize));
cudaErrorCheck(cudaMemcpyAsync(ppdChunkPtr, &ppdInputVec[nBegin], \
sizeof(InpType) * nChunkSize, cudaMemcpyDeviceToDevice, pStreamHandles[nDepth]));
FreqT* ppdLocalCountVec = NULL;
cudaErrorCheck(cudaMalloc((void**)&ppdLocalCountVec, sizeof(FreqT) * CHUNK_SIZE_GRID_0));
cudaErrorCheck(cudaMemsetAsync(ppdLocalCountVec, 0x00, sizeof(FreqT) * CHUNK_SIZE_GRID_0, pStreamHandles[nDepth]));
cdpCountChunkKernel <inptype, cnttype=""> << < 1, \
CHUNK_SIZE_GRID_0, 0, pStreamHandles[nDepth] >> > (ppdChunkPtr, ppdLocalCountVec, CHUNK_SIZE_GRID_0);
cudaErrorCheck(cudaEventRecord(pStreamEvents[nDepth], pStreamHandles[nDepth]));
cudaErrorCheck(cudaDeviceSynchronize());
cudaErrorCheck(cudaStreamWaitEvent(pStreamHandles[nDepth], pStreamEvents[nDepth], 0));
cudaErrorCheck(cudaStreamDestroy(pStreamHandles[nDepth]));
cudaErrorCheck(cudaFree(ppdChunkPtr));
CntType nItem = cdpMergeReduction<cnttype>(ppdLocalCountVec, ppdCountVec, \
GridType::LOCAL, 0, 0, nItemsCount, CHUNK_SIZE_GRID_0);
cudaErrorCheck(cudaFree(ppdLocalCountVec));
cudaErrorCheck(cudaStreamCreateWithFlags(&pStreamHandles[nDepth + 1], cudaStreamNonBlocking));
cudaErrorCheck(cudaEventCreateWithFlags(&pStreamEvents[nDepth + 1], cudaEventDisableTiming));
cdpCountChunkRecursiveKernel <inptype, cnttype=""> << < 1, 1, 0, pStreamHandles[nDepth + 1] >> > \
(ppdInputVec, ppdCountVec, nChunkID + 1, nDepth + 1, nItem, nChunkSize, nSize);
cudaErrorCheck(cudaDeviceSynchronize());
cudaErrorCheck(cudaEventRecord(pStreamEvents[nDepth + 1], pStreamHandles[nDepth + 1]));
cudaErrorCheck(cudaStreamWaitEvent(pStreamHandles[nDepth + 1], pStreamEvents[nDepth + 1], 0));
cudaErrorCheck(cudaStreamDestroy(pStreamHandles[nDepth + 1]));
}
}
}
</inptype,></cnttype></inptype,></typename>
The following snippets of code implement cdpCountChunkGrid1, cdpCountChunkGrid2 kernels that are similarly to the previously discussed kernel perform the same parallel workshare of the distribution counting process with only one difference that the following kernels rather than performing recursive workshare, normally perform bulk parallelization to compute the number of duplicates of items separately within each chunk of the array ppdInputVec. The cdpCountChunkGrid1 is a parent grid of threads relatively to the threads that execute cdpCountChunkGrid2 kernel.
template<typename cnttype="__uint64" inptype="__int8," typename="">
__global__ void cdpCountChunkGrid1(InpType* ppdInputVec, FreqT* ppdCountVec, CntType nParentThreadID, CntType nChunkSize, CntType nSize)
{
__shared__ cudaEvent_t pStreamEvents[CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0];
__shared__ cudaStream_t pStreamHandles[CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0];
CntType nBegin = threadIdx.x * nChunkSize;
CntType nEnd = (threadIdx.x + 1) * nChunkSize;
cudaErrorCheck(cudaStreamCreateWithFlags(&pStreamHandles[threadIdx.x], cudaStreamNonBlocking));
cudaErrorCheck(cudaEventCreateWithFlags(&pStreamEvents[threadIdx.x], cudaEventDisableTiming));
if (nEnd <= nSize)
{
InpType* ppdChunkPtr = NULL;
cudaErrorCheck(cudaMalloc((void**)&ppdChunkPtr, sizeof(InpType) * nChunkSize));
cudaErrorCheck(cudaMemcpyAsync(ppdChunkPtr, &ppdInputVec[nBegin], \
sizeof(InpType) * nChunkSize, cudaMemcpyDeviceToDevice, pStreamHandles[threadIdx.x]));
FreqT* ppdLocalCountVec = NULL;
cudaErrorCheck(cudaMalloc((void**)&ppdLocalCountVec, sizeof(FreqT) * CHUNK_SIZE_GRID_1));
cudaErrorCheck(cudaMemsetAsync(ppdLocalCountVec, 0x00, \
sizeof(FreqT) * CHUNK_SIZE_GRID_1, pStreamHandles[threadIdx.x]));
cudaErrorCheck(cudaMemsetAsync(ppdCountVecG1[nParentThreadID][threadIdx.x], 0x00, \
sizeof(FreqT) * CHUNK_SIZE_GRID_1, pStreamHandles[threadIdx.x]));
cdpCountChunkRecursiveKernel <inptype, cnttype=""> << < 1, 1, 0, \
pStreamHandles[threadIdx.x] >> > (ppdChunkPtr, ppdLocalCountVec, 0, 0, 0, CHUNK_SIZE_GRID_0, nChunkSize);
cudaErrorCheck(cudaEventRecord(pStreamEvents[threadIdx.x], pStreamHandles[threadIdx.x]));
cudaErrorCheck(cudaDeviceSynchronize());
for (CntType iIndex = 0; iIndex < CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0; iIndex++)
cudaErrorCheck(cudaStreamWaitEvent(pStreamHandles[iIndex], pStreamEvents[iIndex], 0));
cdpMergeReduction<cnttype>(ppdLocalCountVec, NULL, GridType::GRID_1, \
nParentThreadID, threadIdx.x, 0, CHUNK_SIZE_GRID_1);
cdpMergeAllReduction<cnttype>(ppdCountVec, GridType::GRID_1, nParentThreadID);
cudaErrorCheck(cudaFree(ppdLocalCountVec));
cudaErrorCheck(cudaFree(ppdChunkPtr));
}
}
template<typename cnttype="__uint64" inptype="__int8," typename="">
__global__ void cdpCountChunkGrid2(InpType* ppdInputVec, FreqT* ppdCountVec, CntType nParentThreadID, CntType nChunkSize, CntType nSize)
{
__shared__ cudaStream_t pStreamHandles[CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0];
__shared__ cudaEvent_t pStreamEvents[CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0];
CntType nBegin = threadIdx.x * nChunkSize;
CntType nEnd = (threadIdx.x + 1) * nChunkSize;
cudaErrorCheck(cudaStreamCreateWithFlags(&pStreamHandles[threadIdx.x], cudaStreamNonBlocking));
cudaErrorCheck(cudaEventCreateWithFlags(&pStreamEvents[threadIdx.x], cudaEventDisableTiming));
if (nEnd <= nSize)
{
InpType* ppdChunkPtr = NULL;
cudaErrorCheck(cudaEventRecord(pStreamEvents[threadIdx.x], pStreamHandles[threadIdx.x]));
cudaErrorCheck(cudaMalloc((void**)&ppdChunkPtr, sizeof(InpType) * nChunkSize));
cudaErrorCheck(cudaMemcpyAsync(ppdChunkPtr, &ppdInputVec[nBegin], \
sizeof(InpType) * nChunkSize, cudaMemcpyDeviceToDevice, pStreamHandles[threadIdx.x]));
FreqT* ppdLocalCountVec = NULL;
cudaErrorCheck(cudaMalloc((void**)&ppdLocalCountVec, \
sizeof(FreqT) * CHUNK_SIZE_GRID_2));
cudaErrorCheck(cudaMemsetAsync(ppdLocalCountVec, 0x00, \
sizeof(FreqT) * CHUNK_SIZE_GRID_2, pStreamHandles[threadIdx.x]));
__syncthreads(); cudaErrorCheck(cudaDeviceSynchronize());
for (int iIndex = 0; iIndex < CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0; iIndex++)
cudaErrorCheck(cudaMemsetAsync(ppdCountVecG2[threadIdx.x][iIndex], 0x00, \
sizeof(FreqT) * CHUNK_SIZE_GRID_2, pStreamHandles[threadIdx.x]));
for (int iIndex = 0; iIndex < CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0; iIndex++)
cudaErrorCheck(cudaStreamWaitEvent(pStreamHandles[iIndex], pStreamEvents[iIndex], 0));
cdpCountChunkGrid1 <inptype, cnttype=""> << < 1, CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0, \
0, pStreamHandles[threadIdx.x] >> > (ppdChunkPtr, ppdLocalCountVec, \
threadIdx.x, CHUNK_SIZE_GRID_1, CHUNK_SIZE_GRID_2);
cudaErrorCheck(cudaEventRecord(pStreamEvents[threadIdx.x], pStreamHandles[threadIdx.x]));
cudaErrorCheck(cudaDeviceSynchronize());
for (int i = 0; i < CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0; i++)
cudaErrorCheck(cudaStreamWaitEvent(pStreamHandles[i], pStreamEvents[i], 0));
__syncthreads(); cudaErrorCheck(cudaDeviceSynchronize());
cdpMergeReduction<cnttype>(ppdLocalCountVec, NULL, GridType::GRID_2, nParentThreadID, threadIdx.x, 0, CHUNK_SIZE_GRID_2);
cdpMergeAllReduction<cnttype>(ppdCountVec, GridType::GRID_2, nParentThreadID);
cudaErrorCheck(cudaFree(ppdLocalCountVec));
cudaErrorCheck(cudaFree(ppdChunkPtr));
}
}
</cnttype></cnttype></inptype,></typename></cnttype></cnttype></inptype,></typename>
The following code implements the host-device function cdpMergeReduction. The following function performs the reduction by actually merging the items in each array used to store the number of duplicate values for each data item obtained by each current thread for each particular chunk of the array ppdInputVec. As the result of performing the merge reduction we obtain the global array containing the overall values of the number of duplicates for each item in all chunks of the ppdInputVec array. To provide a better threads synchronization and avoid data flow dependency issue in this case we're using device globally pinned memory such as ppdCountVecG1 and ppdCountVecG2 buffers to store the values of the number of duplicate per each chunk and per each thread.
__device__ FreqT ppdCountVecG1[CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0] \
[CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0][CHUNK_SIZE_GRID_1] = { { { 0 } } };
__device__ FreqT ppdCountVecG2[CHUNK_SIZE_GRID_2 / CHUNK_SIZE_GRID_0] \
[CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0][CHUNK_SIZE_GRID_2] = { { { 0 } } };
typedef enum { GRID_1 = 1, GRID_2 = 2, LOCAL = 3 } GridType;
template<typename cnttype="__uint64">
__device__ __host__ CntType cdpMergeReduction(FreqT* ppdSourceVec, FreqT* ppdCountVec, GridType Grid, \
CntType nParentThreadID, CntType nThreadID, CntType nItemsCount, CntType nSize)
{
CntType nItem = nItemsCount;
for (CntType iIndex = 0; iIndex < nSize; iIndex++)
if (ppdSourceVec[iIndex].freq > 0)
{
CntType nIndex = 0; bool bExists = false;
while (nIndex < nItem && bExists == false)
if (Grid == GridType::GRID_1)
bExists = (ppdCountVecG1[nParentThreadID][nThreadID][nIndex++].value == \
ppdSourceVec[iIndex].value) ? 1 : 0;
else if (Grid == GridType::GRID_2)
bExists = (ppdCountVecG2[nParentThreadID][nThreadID][nIndex++].value == \
ppdSourceVec[iIndex].value) ? 1 : 0;
else if (Grid == GridType::LOCAL)
bExists = (ppdCountVec[nIndex++].value == \
ppdSourceVec[iIndex].value) ? 1 : 0;
if (bExists == true)
{
if (Grid == GridType::GRID_1)
ppdCountVecG1[nParentThreadID][nThreadID][nIndex - 1].freq += \
ppdSourceVec[iIndex].freq;
else if (Grid == GridType::GRID_2)
ppdCountVecG2[nParentThreadID][nThreadID][nIndex - 1].freq += \
ppdSourceVec[iIndex].freq;
else if (Grid == GridType::LOCAL)
ppdCountVec[nIndex - 1].freq += \
ppdSourceVec[iIndex].freq;
}
else
{
if (Grid == GridType::GRID_1)
{
ppdCountVecG1[nParentThreadID][nThreadID][nItem].value = \
ppdSourceVec[iIndex].value;
ppdCountVecG1[nParentThreadID][nThreadID][nItem].freq = \
ppdSourceVec[iIndex].freq;
nItem++;
}
else if (Grid == GridType::GRID_2)
{
ppdCountVecG2[nParentThreadID][nThreadID][nItem].value = \
ppdSourceVec[iIndex].value;
ppdCountVecG2[nParentThreadID][nThreadID][nItem].freq = \
ppdSourceVec[iIndex].freq;
nItem++;
}
else if (Grid == GridType::LOCAL)
{
ppdCountVec[nItem].value = \
ppdSourceVec[iIndex].value;
ppdCountVec[nItem].freq = \
ppdSourceVec[iIndex].freq;
nItem++;
}
}
}
return nItem;</typename>
}
Unlike the cdpMergeReduction function listed above, the cdpMergeAllReduction device function performs the merge reduction of the specific arrays that hold the numbers of duplicates for each chunk per thread across all threads obtaining the number of duplicates for each item in the parent chunk.
template<typename cnttype="__uint64">
__device__ CntType cdpMergeAllReduction(FreqT* ppdCountVec, GridType Grid, CntType nParentThreadID)
{
CntType nThreadID = 0; CntType nItem = 0;
while (nThreadID < CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0)
{
for (CntType iIndex = 0; iIndex < CHUNK_SIZE_GRID_1; iIndex++)
if (((Grid == GridType::GRID_1) ? ppdCountVecG1[nParentThreadID][nThreadID][iIndex].freq : \
(Grid == GridType::GRID_2) ? ppdCountVecG2[nParentThreadID][nThreadID][iIndex].freq : 0) > 0)
{
CntType nIndex = 0; bool bExists = false;
while (nIndex < nItem && bExists == false)
bExists = (ppdCountVec[nIndex++].value == \
((Grid == GridType::GRID_1) ? ppdCountVecG1[nParentThreadID][nThreadID][iIndex].freq : \
(Grid == GridType::GRID_2) ? ppdCountVecG2[nParentThreadID][nThreadID][iIndex].freq : 0)) ? 1 : 0;
if (bExists == true)
{
ppdCountVec[nIndex - 1].freq += \
((Grid == GridType::GRID_1) ? ppdCountVecG1[nParentThreadID][nThreadID][iIndex].freq : \
(Grid == GridType::GRID_2) ? ppdCountVecG2[nParentThreadID][nThreadID][iIndex].freq : 0);
}
else
{
ppdCountVec[nItem].value = \
((Grid == GridType::GRID_1) ? ppdCountVecG1[nParentThreadID][nThreadID][iIndex].value : \
(Grid == GridType::GRID_2) ? ppdCountVecG2[nParentThreadID][nThreadID][iIndex].value : 0);
ppdCountVec[nItem].freq = \
((Grid == GridType::GRID_1) ? ppdCountVecG1[nParentThreadID][nThreadID][iIndex].freq : \
(Grid == GridType::GRID_2) ? ppdCountVecG2[nParentThreadID][nThreadID][iIndex].freq : 0);
nItem++;
}
}
nThreadID++;
}
return nItem;
}
</typename>
The following function is the main host function to perform the distribution count on the host. Similarly to the kernel functions discussed above the following function performs the parallel workshare of the distribution counting process by sequentially performing cdpCountChunkGrid2 kernel using streams to obtain the array of the number of duplicates values for each data item within each chunk of the pData array. The following function perform the reduction of the those arrays by calling the discussed above cdpMergeReduction function:
template<typename InpType = __int8, typename CntType = __uint64>
__host__ CntType cdpDistCountLaunch(InpType* pData, FreqT* phFreqT, CntType nChunks)
{
cudaStream_t* phStreamHandles = (cudaStream_t*)malloc(sizeof(cudaStream_t) * nChunks);
cudaEvent_t* phStreamEvents = (cudaEvent_t*)malloc(sizeof(cudaEvent_t) * nChunks);
for (__uint64 iIndex = 0; iIndex < nChunks; iIndex++)
{
cudaErrorCheckHost(cudaStreamCreateWithFlags(&phStreamHandles[iIndex], cudaStreamNonBlocking));
cudaErrorCheckHost(cudaEventCreateWithFlags(&phStreamEvents[iIndex], cudaEventBlockingSync));
}
__uint64 nItem = 0, nParentChunkID = 0;
for (__uint64 iIndex = 0; iIndex < nChunks; iIndex++)
{
__uint64 nChunkOffset = iIndex * CHUNK_SIZE_HOST;
nParentChunkID = (iIndex % 100 == 0) ? 0 : nParentChunkID;
fprintf(stdout, "ChunkOffset = %llu nParentChunkID = %llu Chunk = %llu of %llu\n", \
nChunkOffset, nParentChunkID, iIndex, nChunks);
__int8* ppdChunkBuf = NULL;
cudaErrorCheckHost(cudaMallocManaged((void**)&ppdChunkBuf, sizeof(__int8) * CHUNK_SIZE_HOST - 1));
cudaErrorCheckHost(cudaMemcpyAsync(ppdChunkBuf, &pData[nChunkOffset], \
sizeof(__int8) * CHUNK_SIZE_HOST - 1, cudaMemcpyHostToDevice, phStreamHandles[iIndex]));
FreqT* ppdCountVec = NULL;
cudaErrorCheckHost(cudaMallocManaged((void**)&ppdCountVec, sizeof(FreqT) * CHUNK_SIZE_HOST));
cudaErrorCheckHost(cudaMemsetAsync((void*)ppdCountVec, 0x00, \
sizeof(FreqT) * CHUNK_SIZE_HOST, phStreamHandles[iIndex]));
cudaErrorCheckHost(cudaEventRecord(phStreamEvents[iIndex], phStreamHandles[iIndex]));
cdpCountChunkGrid2 <__int8, __uint64> << < 1, CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0, 0, \
phStreamHandles[iIndex] >> > (ppdChunkBuf, ppdCountVec, \
nParentChunkID, CHUNK_SIZE_GRID_2, CHUNK_SIZE_HOST);
cudaErrorCheckHost(cudaStreamSynchronize(phStreamHandles[iIndex]));
cudaErrorCheckHost(cudaThreadSynchronize());
cudaErrorCheckHost(cudaDeviceSynchronize());
cudaErrorCheckHost(cudaStreamWaitEvent(phStreamHandles[iIndex], phStreamEvents[iIndex], 0));
FreqT* phCountVec = (FreqT*)malloc(sizeof(FreqT) * CHUNK_SIZE_HOST);
memset((void*)phCountVec, 0x00, sizeof(FreqT) * CHUNK_SIZE_HOST);
cudaErrorCheckHost(cudaMemcpyAsync(phCountVec, ppdCountVec, \
sizeof(FreqT) * CHUNK_SIZE_HOST, cudaMemcpyDeviceToHost, phStreamHandles[iIndex]));
nItem = cdpMergeReduction<__uint64>(phCountVec, phFreqT, GridType::LOCAL, 0, 0, nItem, CHUNK_SIZE_HOST);
nParentChunkID++;
cudaErrorCheckHost(cudaFree(ppdChunkBuf));
cudaErrorCheckHost(cudaFree(ppdCountVec));
free(phCountVec);
}
for (__uint64 iIndex = 0; iIndex < nChunks; iIndex++)
{
cudaErrorCheckHost(cudaEventDestroy(phStreamEvents[iIndex]));
cudaErrorCheckHost(cudaStreamDestroy(phStreamHandles[iIndex]));
}
return nItem;
}
How To Run This Code
To run the code being discussed in this article it's strongly recommended that NVIDIA CUDA 8.0 Toolkit is installed on the host on which you're going to run the CUDA application executable available for download at the top of this article. Also, to avoid the possible interference and conflicts with your PC's graphics card driver it's recommended to set WDDM TDR Delay parameter to 10 seconds. Another limitation for the project intoduced in the following article is that it's recommended to compile and run the following projects by using compute_61, sm_61 compatibility version graphics cards such as NVIDIA GeForce GTX1070 GPU Pascal or higher.
History
- December 10, 2016 - The first version of the article has been published.
I’m software developer, system analyst and network engineer, with over 20 years experience, graduated from L’viv State Polytechnic University and earned my computer science and information technology master’s degree in January 2004. My professional career began as a financial and accounting software developer in EpsilonDev company, located at L’viv, Ukraine. My favorite programming languages - C/C++, C#.NET, Java, ASP.NET, Node.js/JavaScript, PHP, Perl, Python, SQL, HTML5, etc. While developing applications, I basically use various of IDE’s and development tools, including Microsoft Visual Studio/Code, Eclipse IDE for Linux, IntelliJ/IDEA for writing code in Java. My professional interests basically include data processing and analysis algorithms, artificial intelligence and data mining, system analysis, modern high-performance computing (HPC), development of client-server web-applications using various of libraries, frameworks and tools. I’m also interested in cloud-computing, system security audit, IoT, networking architecture design, hardware engineering, technical writing, etc. Besides of software development, I also admire to write and compose technical articles, walkthroughs and reviews about the new IT- technological trends and industrial content. I published my first article at CodeProject in June 2015.