Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C++

Runsort: Sorting Data Utilizing Existing Order

4.55/5 (7 votes)
31 Jul 2012GPL39 min read 19.9K   154  
The purpose of this article is to provide a practical working implementation in C++ available under the GNU Public License (GPL)

Introduction

In sorting, there are several well known algorithms with good running times. Algorithms such as quick sort and merge sort have an average running time of O(NlogN) and are widely used for sorting data[1]. However, these algorithms do not take into consideration the order of the data that is to be sorted. In runsort, the algorithm first proceeds by scanning the input data to determine the sequences that already exist in the data and then sorts those sequences to produce a single ordered sequence. A journal article [2] discusses various similar techniques in what is known as "adaptive sorting". Additionally, journal article [3] discusses a similar technique and implementation in Prolog. However, this version was designed and developed independently. The purpose of this article is to provide a practical working implementation in C++ available under the GNU Public License (GPL).

In runsort, there can be two types of sequences, ascending and descending. For the purpose of this algorithm, and without loss of generality, an ascending order is defined as a non-decreasing order, with duplicates permitted. Thus, such a sequence may be:

x0 <= x1 <= ... <= xn-1

The other type of sequence is descending, which is defined as follows:

x0 > x1 > ... > xn-1

The algorithm can then be described as:

  • Scan the input set to determine these sequences, also known as runs, in the data. Record the position at which the run starts, its length, and whether or not it is in descending order. The runs are stored in an array of structures containing these three fields. Each element within this array is ordered from 0 to n-1 based on the order in which the run was discovered. It follows that for each run's starting position, there exists the the following order: run[0].pos < run[1].pos < ... < run[n-1].pos. Additionally, the runs do not overlap and do not contain elements from previous runs. This process is known as "run discovery".
  • For each descending sequence, reverse the order of the data such that the given example sequence.
  • x0 > x1 > ... > xn-1

    becomes

    xn-1 < ... < x1 < x0
  • Let each run be numbered from 0 to n-1 and let the initial run be run 0 for each run i = 1 to n - 1 do
  • Merge run 0 with run I producing a larger ordered sequence which will be become the new run 0. Continue accumulating runs into run 0 until a final ordered sequence is produced.

  • Output the results.

Implementation Notes

The algorithm starts by processing the data in order to discover runs within the input. It is not strictly necessary to always scan the entire input data, it is possible that a portion of the input data is already sorted. Perhaps a significant amount of the data has not changed, and that it is only necessary to sort a smaller amount. Such a scheme can also be implemented. For example, at the time in which the runs are to be merged, once runsort finishes sorting its input data, it can then be merged with a larger subset of data that is already sorted, and perhaps the data is stored in a different memory location or even on disk.

To illustrate what happens in the merge step, consider the following input:

index: 0 1 2 3 4 5 6 7
       ---------------
data:  0 1 2 3 4 3 2 1

Here, the algorithm finds two runs, one ascending and one descending:

run 0          run 1
0 1 2 3 4      3 2 1

with the following book keeping information recorded:

run[0].pos        = 0      run[1].pos       = 5
run[0].length     = 5      run[1].length    = 3
run[0].decending  = 0      run[1].decending = 1

The algorithm first reverses the order of the descending run by modifying the array input to be the following:

run 0         run 1
0 1 2 3 4     1 2 3

Now, the algorithm is ready to merge the two runs to produce a final sorted output. Run 0 is { 0 1 2 3 4 }, and will now be merged with run 1, this produces:

run 0
0 1 1 2 2 3 4

Now run 0 has its length increased to be run[0].length + run[1].length. If there were additional runs, the merge would be called again, this time however, run 0 would look as above with length 8 ( 5 + 3 ), and the merge would consider the next run with its position starting necessarily at position 8. This is maintained by the first steps of the algorithm during run discovery. We note that in this description, if a run has length 8, then the index of those 8 elements is:  0 1 ... 6 7 going up to but not including 8, thus the next run starts at position 8, or more generally:

run[i+2].pos = run[i].length + run[i+1].length

where in the above example i would be 1, and the next run to be merged (not illustrated) would be 2.

Lastly, for iterating from the start of one run, to the end of the next run, we may use logic such as the following:

C++
for(j = run[i].pos; j < (run[i+1].pos + run[i+1].length); j++)

Notice that (run[i+1].pos + run[i+1].length) yields the index of the next element or run[i+2].pos, and that we iterate up to but not including this element.

Optimization

Once the runs are discovered, they can be merged using the above described accumulator approach; however, the accumulator approach is much slower than using a binary merge. In fact, the data should be merged using a binary merge, i.e., first merging the adjacent runs, coalescing them into a larger run, and then repeating until there is one final run. For example, if we have four runs: 0 1 2 3, we should first merge runs 0 and 1 producing a larger run 0, and runs 2 and 3 producing a larger run 2. Thereafter, we merge run 0 with run 2, which produces a final sorted array in run 0. This approach would be more similar to how merging is performed in merge sort; however, it need not be implemented recursively. It can be done iteratively since the runs are recorded within a table during run discovery.  Further, because the runs are based on the data input, they do not necessarily add up to evenly divided segments as they do in merge sort, since merge sort divides the array in halves recursively. Taking this in to consideration, the merge step should be implemented using the following logic:

C++
...
if(rcount % 2) {
    rcount++;
}

k  = 1;
kk = 2;

trcount = rcount;

do {
    for(i = 0, ii = k; i < trcount; i += kk, ii += kk ) {
        tmp = merge(array,runs[i].pos, runs[i].pos  + runs[i].length,
                          runs[ii].pos,runs[ii].pos + runs[ii].length,len);
        runs[i].length += runs[ii].length;
        memcpy(&array[runs[i].pos], tmp, sizeof(unsigned) * runs[i].length);
    }

    rcount >>= 1;
    k  <<= 1;
    kk <<= 1;

} while(rcount > 0);
...

The merge operation is the well known merge operation from merge sort. Except that in this algorithm, it does not need to be applied recursively, rather, it may be applied iteratively, culminating a single ordered sequence. It does not matter that the runs are not even, for example run 0 can have 10 elements while run 1 can have two elements. It does matter that the total number of runs are even, and hence the code increments an odd number of runs by one. Initially when the runs table is generated, it is created with zeroed memory and a total number of elements one greater than the total elements to be sorted. This ensures that the code will always have an extra zero run to merge if needed and helps simplify the merging step.

Analysis

The algorithm first discovers and categorizes all elements in the input array of size N into runs which are sequences of elements in either ascending or descending order. Then, these runs are reversed if they are in descending order. Thereafter, the runs are merged creating a single sequence of length N that is in ascending order. The algorithm will then produce a sorted sequence given any input. The critical logic for this occurs in the merging step were it must be the case that merges occur by first merging adjacent runs, then merging the resulting runs together until all runs are merged into a single run. The discovery and categorization of runs takes O(N) time as the entire input array is scanned. If required, reversing each sequence would take O(N) time as well. The merge step takes O(N) time and in the worst case there can be N elements to merge. Given an accumulator design, the worst case would be N merges in N time or O(N2) in the worst case of merging sequences into the first run. However, the algorithm can also be implemented using the aforementioned binary merge divide and conquer design of merge sort where each run can be merged into its adjacent run at each step therefore reducing the number of runs by one half at each step for a worst case run time of O(NlogN). For example, if we have four runs, 0 1 2 3. The first pass will merge runs (0 1) and runs (2 3) putting the results into run 0 and run 2, respectively. The number of runs is reduced by two. In the next pass, the code only needs to merge runs (0 2) and place the results into run 0. Again, the number of runs is reduced by a factor of 2. Furthermore, it is anticipated that in the average case, there will not be N runs in the data with each run being one element in length. It is expected that the runs will contain several elements with the best case being a single run containing all elements in the N sized input i.e., that the data is already sorted. This gives the algorithm a potential advantage over quick sort as it does not suffer from the same worst case of dealing with already sorted data and an advantage over merge sort in that the base case of the runs can be larger than one element partitions of merge sort’s base case.

To test out the performance of the algorithm in practice, a study was conducted where it was compared against both quick sort and merge sort. The data to be sorted was integer, and was generated using three test cases. The first case, in case.c, generates the input at random using standard rand(). The second case, in quadcase.c, generates the input at random as well, except the data is sorted such that there are runs at least four elements in length. Finally, the third case, in halfsortedcase.c, generates the input at random as well, except half of the data is sorted. The tests generated data with 200K, 500K, and 1Mill elements. The results were:

random     200,000    500,000    1,000,000  10,000,000
quick sort 0.036438   0.099808   0.205733   2.294918
merge sort 0.033868   0.093098    0.195854   2.152524
runsort    0.031721   0.086102    0.176740    1.930010
quadsort 200,000    500,000    1,000,000  10,000,000
quick sort 0.036348   0.098176   0.201880   2.216219
merge sort 0.033514   0.091412   0.191589    2.088849
runsort    0.027340   0.076412   0.156922   1.713937
halfsorted 200,000    500,000     1,000,000  10,000,000
quick sort 0.025682   0.067421   0.140791   1.533914
merge sort 0.025011   0.067496   0.144097   1.630212
runsort    0.020595   0.061310   0.137598   1.461708

where runsort was the faster of the three algorithms in all cases and all three algorithms were of the same order in practice. The complete implementation and tests can be found here [4].

Sample Implementation

C++
////////////////////////////////////////////////////////////////
// Copyright (C) Roger G. Doss, Ph.D.  All Rights Reserved.
// Permission is granted for the use and distribution of this work under
// the terms and conditions of either version 2 or 3 of the GNU Public License (GPL).
// For commercial use, please contact the author at OpenSource@rdoss.com
////////////////////////////////////////////////////////////////
//
// @file:
//  runsort.c
//
// @description:
//  Sort using statistics about the input.
//
// @author:
//  Roger G. Doss
//
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <ctype.h>

#include <sys/types.h>
#include <sys/stat.h>

#include "mergesort.h"
#include "timer.h"

static unsigned *tarray = NULL;

//
// reverse:
//
// Given an array, reverse its contents.
//
void reverse(unsigned *array, int len)
{
    int i = 0;
    unsigned swap = 0;

    for(i = 0; i < --len; ++i) {
        swap = array[i];
        array[i]=array[len];
        array[len]=swap;
    }

}// reverse

//
// merge:
//
// Merge two arrays into one array in ordered form.
//
unsigned *merge(unsigned *array, unsigned start,  unsigned end, 
                                unsigned start2, unsigned end2, unsigned len)

{
    register unsigned first1 = start;
    register unsigned last1  = end;
    register unsigned first2 = start2;
    register unsigned last2  = end2;

    register unsigned i = 0;

    for(; (first1 < last1) && (first2 < last2); i++) {
        if(array[first1] < array[first2]) {
            tarray[i] = array[first1];
            first1++;
        } else {
            tarray[i] = array[first2];
            first2++;
        }
    }

    for(; first1 < last1; first1++, i++) {
        tarray[i] = array[first1];
    }

    for(; first2 < last2; first2++, i++) {
        tarray[i] = array[first2];
    }
    return tarray;
}// merge

typedef struct run {
    unsigned pos,           // Position in the array that run occurs.
             length,        // Length of the run.
             decending;     // 1 if decending x1 > x2 > ... > xn, 0 if otherwise.
} run_t;

//
// runsort:
//
// Given an unordered array, determine the runs
// within the array, and then feed them into a
// merge sort merge like process to produce
// a final sorted array.
//
void
runsort(unsigned len, unsigned *array, unsigned *sarray)
{
    register run_t *runs     = (run_t *)calloc(1,sizeof(run_t) * (len + 1));
    register unsigned i      = 0;
    register unsigned j      = 0;
    unsigned rcount = 0, trcount = 0;
    unsigned tlen            = len - 1;
    unsigned elem            = 0;

    register unsigned *tmp   = 0;
    register unsigned    k   = 1;
    register unsigned    kk  = 2;
    unsigned             m   = 2;
    register unsigned   ii   = 0;

    // allocate tarray once and use through out the calculation.
    tarray = (unsigned *)calloc(1, sizeof(unsigned) * (len + 1));

    // run discovery
    // pre-process the runs in the array.
    for(; ; ) {
        if(array[i] <= array[i+1]) {
            // Found an ascending run.
            runs[rcount].pos = i;
            runs[rcount].length++;
            for(; i < tlen; i++) {
                if(array[i] <= array[i+1]) {
                    runs[rcount].length++;
                } else {
                    i++;
                    break;
                }
            }
            if((runs[rcount].pos + runs[rcount].length) == len) {
                rcount++;
                break;
            }
            rcount++;
        }
        if(array[i] > array[i+1]) {
            // Found a decending run.
            runs[rcount].pos = i;
            runs[rcount].length++;
            for(; i < tlen; i++) {
                if(array[i] > array[i+1]) {
                    runs[rcount].length++;
                } else {
                    i++;
                    break;
                }
            }
            runs[rcount].decending = 1;
            if((runs[rcount].pos + runs[rcount].length) == len) {
                rcount++;
                break;
            }
            rcount++;
        }
    }


#ifdef _DEBUG
    printf("runs counted: %d\n",rcount);
    for(i = 0; i < rcount; i++) {
        printf("pos: %d length: %d decending: %d\n",runs[i].pos, runs[i].length, runs[i].decending);
        elem += runs[i].length;
    }
    printf("discovered: %d elements\n",elem);
#endif

    // First, reverse all decending order runs.
    for(i = 0; i < rcount; i++) {
        if(runs[i].decending) {
#ifdef _DEBUG
            printf("run: %d\n",i);
#endif
            reverse(&array[runs[i].pos],runs[i].length);
        }
    }

    // Now, using a binary merge design, merge all
    // the runs by combining neighbors into larger
    // runs.  So, if we have runs 0 1 2 3, we combine
    // (0 1) (2 3), then
    // (0 2), then
    //  0 is a single sorted run.
#ifdef _DEBUG
    printf("rcount = %d\n", rcount);
#endif
    if(rcount % 2) {
    rcount++;
    }

    k  = 1;
    kk = 2;
    trcount = rcount;
    do {
        for(i = 0, ii = k; i < trcount; i += kk, ii += kk ) {
            // We require that the number of runs is even; however,
            // if this is not the case, we increment the number of runs by one
            // and this should have the desired effect of combining
            // a run with an empty run.
#ifdef _DEBUG
            printf("i: %d ii: %d rcount: %d\n",i,ii,trcount);
#endif
            tmp = merge(array,runs[i].pos, runs[i].pos  + runs[i].length,
                              runs[ii].pos,runs[ii].pos + runs[ii].length,len);
            runs[i].length += runs[ii].length;
            memcpy(&array[runs[i].pos], tmp, sizeof(unsigned) * runs[i].length);
#ifdef _DEBUG
            printf("i: %d runs[i].length: %d\n",i,runs[i].length);
#endif
        }
        rcount >>= 1;
        k  <<= 1;
        kk <<= 1;
    } while(rcount > 0);

    // Now copy back...
    memcpy(sarray,array,sizeof(unsigned) * len);

    // Done.
    free((void *)runs);
    free((void *)tarray);


}// runsort


//
// syntax:
//
// Output the usage of this module.
//
void
syntax()
{
    fprintf(stderr,"runsort nr_elements integer_array\n");
    exit(0);
}// syntax

//
// main:
//
// Sample driver of 'runsort' sorting algorithm.
//
// accepts the following arguments from the command line:
//      runsort nr_elements "element_list"
// where nr_elements is an integer length of the input array
// and element_list is an integer list in quotes.
//
int
main(int argc, char **argv)
{
    static const int NR_ELEMENTS   = 1;
    static const int INTEGER_ARRAY = 2;

    unsigned nr_elements = 0;
    unsigned *array      = 0;
    unsigned *sarray     = 0;
    unsigned i           = 0;
    unsigned j           = 0;
    unsigned len         = 0;
    char *str            = 0;
    if(argc == 3) {
        nr_elements = (unsigned)strtol(argv[NR_ELEMENTS],NULL,10);
        array = (unsigned *)malloc(sizeof(unsigned) * nr_elements);
        for(i = 0, str = argv[INTEGER_ARRAY]; i < nr_elements; i++) {
           len = strlen(str);
           sscanf(str,"%d",&array[i]);
           // Skip to the next element.
           for(j = 0; j < len; j++) {
                if(isdigit(str[j])) {
                    for(; j < len; j++) {
                        if(isspace(str[j])) {
                            for(; j < len; j++) {
                                if(isdigit(str[j])) {
                                    str = &str[j-1];
                                    goto DONE;
                                }
                            }
                        }
                    }
                }
           }
DONE:
            ;
        }
#ifdef _DEBUG
        for(i = 0; i < nr_elements; i++) {
            printf("%d ",array[i]);
        }
        printf("\n");
#endif
        sarray = (unsigned *)malloc(sizeof(unsigned) * nr_elements);
        runsort(nr_elements,array,sarray);
        for(i = 0; i < nr_elements; i++) {
            printf("%d ",sarray[i]);
        }
        printf("\n");
        free((void *)array);
        free((void *)sarray);
    } else {
        syntax();
    }
    return 0;
}// main
//
// EOF

References

[1] Cormen, T.H., Leiserson, C. E., Rivest, R. L., & Stein, C. (2009). Introduction to algorithms. Cambridge, Massachusetts: The MIT Press.

[2] Estivill, C. & Wood, D. (1992). A survey of adaptive sorting algorithms. ACM Comput. Surv.

[3] Brady, M. H. (2005). Runsort: An Adaptive Mergesort for Prolog.TCD Computer Science department

[4] Doss, R. (2012). runsort. Retrieved May 28, 2012 from http://www.rdoss.com/papers/runsort/runsort.tar.gz

License

This article, along with any associated source code and files, is licensed under The GNU General Public License (GPLv3)