Introduction
In sorting,
there are several well known algorithms with good running times. Algorithms
such as quick sort and merge sort have an average running time of O(NlogN) and are
widely used for sorting data[1]. However, these algorithms do not take into
consideration the order of the data that is to be sorted. In runsort, the
algorithm first proceeds by scanning the input data to determine the sequences
that already exist in the data and then sorts those sequences to produce a
single ordered sequence. A journal article [2] discusses various similar
techniques in what is known as "adaptive sorting". Additionally, journal
article [3] discusses a similar technique and implementation in Prolog.
However, this version was designed and developed independently. The purpose of
this article is to provide a practical working implementation in C++ available
under the GNU Public License (GPL).
In runsort,
there can be two types of sequences, ascending and descending. For the purpose
of this algorithm, and without loss of generality, an ascending order is
defined as a non-decreasing order, with duplicates permitted. Thus, such a
sequence may be:
x0 <= x1 <= ... <= xn-1
The other type of sequence is descending, which is defined as follows:
x0 > x1 > ... > xn-1
The algorithm can then be described as:
- Scan the input set to determine these sequences, also known as runs, in the data. Record the position at which the run starts, its
length, and whether or not it is in descending order. The runs are stored in an array of structures containing these three fields. Each element within this array is ordered from 0 to n-1
based on the order in which the run was discovered. It follows that for each run's starting position, there exists the the following
order: run[0].pos < run[1].pos < ... < run[n-1].pos. Additionally, the runs do not overlap and do not contain elements from previous runs.
This process is known as "run discovery".
- For each descending sequence, reverse the order of the data such that the given example sequence.
x0 > x1 > ... > xn-1
becomes
xn-1 < ... < x1 < x0
Let each run be numbered from 0 to n-1 and let the initial run be run 0 for each run i = 1 to n - 1 doMerge run 0 with run I producing a larger ordered sequence which will be become the new run 0. Continue accumulating runs into run 0 until a final ordered sequence is produced.
Output the results.
Implementation Notes
The algorithm
starts by processing the data in order to discover runs within the
input. It is not strictly necessary to always scan the entire input
data, it is possible that a portion of the input data is already
sorted. Perhaps a significant amount of the data has not changed,
and that it is only necessary to sort a smaller amount. Such a scheme
can also be implemented. For example, at the time in which the runs are
to be merged, once runsort finishes sorting its input
data, it can then be merged with a larger subset of data that is
already sorted, and perhaps the data is stored in a different memory location or even on disk.
To illustrate
what happens in the merge step, consider the following input:
index: 0 1 2 3 4 5 6 7
---------------
data: 0 1 2 3 4 3 2 1
Here, the algorithm
finds two runs, one ascending and one descending:
run 0 run 1
0 1 2 3 4 3 2 1
with the following
book keeping information recorded:
run[0].pos = 0 run[1].pos = 5
run[0].length = 5 run[1].length = 3
run[0].decending = 0 run[1].decending = 1
The algorithm first
reverses the order of the descending run by modifying the array input to be
the following:
run 0 run 1
0 1 2 3 4 1 2 3
Now, the algorithm is
ready to merge the two runs to produce a final sorted output. Run 0
is { 0 1 2 3 4 }, and will now be merged with run 1, this produces:
run 0
0 1 1 2 2 3 4
Now run 0 has its
length increased to be run[0].length + run[1].length. If there were
additional runs, the merge would be called again, this time however, run 0
would look as above with length 8 ( 5 + 3 ), and the merge would
consider the next run with its position starting necessarily at
position 8. This is maintained by the first steps of the algorithm during run
discovery. We note that in this description, if a run has length 8,
then the index of those 8 elements is: 0 1 ... 6 7 going up to but not
including 8, thus the next run starts at position 8, or more generally:
run[i+2].pos = run[i].length + run[i+1].length
where in the above
example i would be 1, and the next run to be merged (not
illustrated) would be 2.
Lastly, for iterating
from the start of one run, to the end of the next run, we may use logic
such as the following:
for(j = run[i].pos; j < (run[i+1].pos + run[i+1].length); j++)
Notice that
(run[i+1].pos + run[i+1].length
) yields the index of the next element or
run[i+2].pos
, and that we iterate up to but not including this element.
Optimization
Once the runs are
discovered, they can be merged using the above described
accumulator approach; however, the accumulator approach is much slower than
using a binary merge. In fact, the data should be merged using a binary merge,
i.e., first merging the
adjacent runs, coalescing them into a larger run, and then repeating
until there is one final run. For example, if we have four runs: 0 1 2 3, we
should first merge runs 0 and 1 producing
a larger run 0, and runs 2 and 3 producing a larger run 2. Thereafter, we merge
run 0 with run 2, which produces a final sorted array in run 0. This approach would be
more similar to how merging is performed in merge
sort; however, it need not be implemented recursively. It can be done
iteratively since the runs are recorded within a table during run discovery.
Further, because the runs are based on the data input, they do not necessarily add up to
evenly divided segments as they do in merge sort, since merge sort divides the array in
halves recursively. Taking this in to consideration, the merge step should be
implemented using the following logic:
...
if(rcount % 2) {
rcount++;
}
k = 1;
kk = 2;
trcount = rcount;
do {
for(i = 0, ii = k; i < trcount; i += kk, ii += kk ) {
tmp = merge(array,runs[i].pos, runs[i].pos + runs[i].length,
runs[ii].pos,runs[ii].pos + runs[ii].length,len);
runs[i].length += runs[ii].length;
memcpy(&array[runs[i].pos], tmp, sizeof(unsigned) * runs[i].length);
}
rcount >>= 1;
k <<= 1;
kk <<= 1;
} while(rcount > 0);
...
The merge
operation is the well known merge operation from merge sort. Except that in this
algorithm, it does not need to be applied recursively, rather, it may be applied
iteratively, culminating a single ordered sequence. It does not matter that the
runs are not even, for example run 0 can have 10 elements while run 1 can have
two elements. It does matter that the total number of runs are even, and hence
the code increments an odd number of runs by one. Initially when the runs table
is generated, it is created with zeroed memory and a total number of elements
one greater than the total elements to be sorted. This ensures that the code
will always have an extra zero run to merge if needed and helps simplify the
merging step.
Analysis
The algorithm first discovers and categorizes all
elements in the input array of size N into runs which are sequences of elements
in either ascending or descending order. Then, these runs are reversed if they
are in descending order. Thereafter, the runs are merged creating a single
sequence of length N that is in ascending order. The algorithm will then
produce a sorted sequence given any input. The critical logic for this occurs
in the merging step were it must be the case that merges occur by first merging
adjacent runs, then merging the resulting runs together until all runs are
merged into a single run. The discovery and categorization of runs takes O(N)
time as the entire input array is scanned. If required, reversing each sequence
would take O(N) time as well. The merge step takes O(N) time and in the worst
case there can be N elements to merge. Given an accumulator design, the worst
case would be N merges in N time or O(N2) in the worst case of
merging sequences into the first run. However, the algorithm can also be
implemented using the aforementioned binary merge divide and conquer design of
merge sort where each run can be merged into its adjacent run at each step
therefore reducing the number of runs by one half at each step for a worst case
run time of O(NlogN). For example, if we have four runs, 0 1 2 3. The first
pass will merge runs (0 1) and runs (2 3) putting the results into run 0 and
run 2, respectively. The number of runs is reduced by two. In the next pass, the
code only needs to merge runs (0 2) and place the results into run 0. Again,
the number of runs is reduced by a factor of 2. Furthermore, it is anticipated
that in the average case, there will not be N runs in the data with each run
being one element in length. It is expected that the runs will contain several
elements with the best case being a single run containing all elements in the N
sized input i.e., that the data is already sorted. This gives the algorithm a
potential advantage over quick sort as it does not suffer from the same worst
case of dealing with already sorted data and an advantage over merge sort in
that the base case of the runs can be larger than one element partitions of
merge sort’s base case.
To test out
the performance of the algorithm in practice, a study was conducted where it
was compared against both quick sort and merge sort. The data to be sorted was
integer, and was generated using three test cases. The first case, in case.c,
generates the input at random using standard rand()
. The second case, in
quadcase.c, generates the input at random as well, except the data is sorted
such that there are runs at least four elements in length. Finally, the third
case, in halfsortedcase.c, generates the input at random as well, except half
of the data is sorted. The tests generated data with 200K, 500K, and 1Mill elements.
The results were:
random | 200,000 | 500,000 | 1,000,000 | 10,000,000
|
quick sort | 0.036438 | 0.099808 | 0.205733 | 2.294918
|
merge sort | 0.033868 | 0.093098 | 0.195854 | 2.152524
|
runsort | 0.031721 | 0.086102 | 0.176740 | 1.930010
|
quadsort | 200,000 | 500,000 | 1,000,000 | 10,000,000
|
quick sort | 0.036348 | 0.098176 | 0.201880 | 2.216219
|
merge sort | 0.033514 | 0.091412 | 0.191589 | 2.088849
|
runsort | 0.027340 | 0.076412 | 0.156922 | 1.713937
|
halfsorted | 200,000 | 500,000 | 1,000,000 | 10,000,000
|
quick sort | 0.025682 | 0.067421 | 0.140791 | 1.533914
|
merge sort | 0.025011 | 0.067496 | 0.144097 | 1.630212
|
runsort | 0.020595 | 0.061310 | 0.137598 | 1.461708
|
where runsort was the
faster of the three algorithms in all cases and all three algorithms were of
the same order in practice. The complete implementation and tests can be found
here [4].
Sample Implementation
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <ctype.h>
#include <sys/types.h>
#include <sys/stat.h>
#include "mergesort.h"
#include "timer.h"
static unsigned *tarray = NULL;
void reverse(unsigned *array, int len)
{
int i = 0;
unsigned swap = 0;
for(i = 0; i < --len; ++i) {
swap = array[i];
array[i]=array[len];
array[len]=swap;
}
}
unsigned *merge(unsigned *array, unsigned start, unsigned end,
unsigned start2, unsigned end2, unsigned len)
{
register unsigned first1 = start;
register unsigned last1 = end;
register unsigned first2 = start2;
register unsigned last2 = end2;
register unsigned i = 0;
for(; (first1 < last1) && (first2 < last2); i++) {
if(array[first1] < array[first2]) {
tarray[i] = array[first1];
first1++;
} else {
tarray[i] = array[first2];
first2++;
}
}
for(; first1 < last1; first1++, i++) {
tarray[i] = array[first1];
}
for(; first2 < last2; first2++, i++) {
tarray[i] = array[first2];
}
return tarray;
}
typedef struct run {
unsigned pos, length, decending; } run_t;
void
runsort(unsigned len, unsigned *array, unsigned *sarray)
{
register run_t *runs = (run_t *)calloc(1,sizeof(run_t) * (len + 1));
register unsigned i = 0;
register unsigned j = 0;
unsigned rcount = 0, trcount = 0;
unsigned tlen = len - 1;
unsigned elem = 0;
register unsigned *tmp = 0;
register unsigned k = 1;
register unsigned kk = 2;
unsigned m = 2;
register unsigned ii = 0;
tarray = (unsigned *)calloc(1, sizeof(unsigned) * (len + 1));
for(; ; ) {
if(array[i] <= array[i+1]) {
runs[rcount].pos = i;
runs[rcount].length++;
for(; i < tlen; i++) {
if(array[i] <= array[i+1]) {
runs[rcount].length++;
} else {
i++;
break;
}
}
if((runs[rcount].pos + runs[rcount].length) == len) {
rcount++;
break;
}
rcount++;
}
if(array[i] > array[i+1]) {
runs[rcount].pos = i;
runs[rcount].length++;
for(; i < tlen; i++) {
if(array[i] > array[i+1]) {
runs[rcount].length++;
} else {
i++;
break;
}
}
runs[rcount].decending = 1;
if((runs[rcount].pos + runs[rcount].length) == len) {
rcount++;
break;
}
rcount++;
}
}
#ifdef _DEBUG
printf("runs counted: %d\n",rcount);
for(i = 0; i < rcount; i++) {
printf("pos: %d length: %d decending: %d\n",runs[i].pos, runs[i].length, runs[i].decending);
elem += runs[i].length;
}
printf("discovered: %d elements\n",elem);
#endif
for(i = 0; i < rcount; i++) {
if(runs[i].decending) {
#ifdef _DEBUG
printf("run: %d\n",i);
#endif
reverse(&array[runs[i].pos],runs[i].length);
}
}
#ifdef _DEBUG
printf("rcount = %d\n", rcount);
#endif
if(rcount % 2) {
rcount++;
}
k = 1;
kk = 2;
trcount = rcount;
do {
for(i = 0, ii = k; i < trcount; i += kk, ii += kk ) {
#ifdef _DEBUG
printf("i: %d ii: %d rcount: %d\n",i,ii,trcount);
#endif
tmp = merge(array,runs[i].pos, runs[i].pos + runs[i].length,
runs[ii].pos,runs[ii].pos + runs[ii].length,len);
runs[i].length += runs[ii].length;
memcpy(&array[runs[i].pos], tmp, sizeof(unsigned) * runs[i].length);
#ifdef _DEBUG
printf("i: %d runs[i].length: %d\n",i,runs[i].length);
#endif
}
rcount >>= 1;
k <<= 1;
kk <<= 1;
} while(rcount > 0);
memcpy(sarray,array,sizeof(unsigned) * len);
free((void *)runs);
free((void *)tarray);
}
void
syntax()
{
fprintf(stderr,"runsort nr_elements integer_array\n");
exit(0);
}
int
main(int argc, char **argv)
{
static const int NR_ELEMENTS = 1;
static const int INTEGER_ARRAY = 2;
unsigned nr_elements = 0;
unsigned *array = 0;
unsigned *sarray = 0;
unsigned i = 0;
unsigned j = 0;
unsigned len = 0;
char *str = 0;
if(argc == 3) {
nr_elements = (unsigned)strtol(argv[NR_ELEMENTS],NULL,10);
array = (unsigned *)malloc(sizeof(unsigned) * nr_elements);
for(i = 0, str = argv[INTEGER_ARRAY]; i < nr_elements; i++) {
len = strlen(str);
sscanf(str,"%d",&array[i]);
for(j = 0; j < len; j++) {
if(isdigit(str[j])) {
for(; j < len; j++) {
if(isspace(str[j])) {
for(; j < len; j++) {
if(isdigit(str[j])) {
str = &str[j-1];
goto DONE;
}
}
}
}
}
}
DONE:
;
}
#ifdef _DEBUG
for(i = 0; i < nr_elements; i++) {
printf("%d ",array[i]);
}
printf("\n");
#endif
sarray = (unsigned *)malloc(sizeof(unsigned) * nr_elements);
runsort(nr_elements,array,sarray);
for(i = 0; i < nr_elements; i++) {
printf("%d ",sarray[i]);
}
printf("\n");
free((void *)array);
free((void *)sarray);
} else {
syntax();
}
return 0;
}
References
[1] Cormen, T.H., Leiserson, C. E., Rivest, R. L., & Stein, C. (2009). Introduction to algorithms. Cambridge, Massachusetts: The MIT Press.
[2] Estivill, C. & Wood, D. (1992). A survey of adaptive sorting algorithms. ACM Comput. Surv.
[3] Brady, M. H. (2005). Runsort: An Adaptive Mergesort for Prolog.TCD Computer Science department
[4] Doss, R. (2012). runsort. Retrieved May 28, 2012 from http://www.rdoss.com/papers/runsort/runsort.tar.gz