Click here to Skip to main content
Click here to Skip to main content
Go to top

InputSplit indexing on Mapreduce

, 3 May 2013
Rate this:
Please Sign up or sign in to vote.
Create custom indexes for improving Mapreduce performance

Introduction

Using the Mapreduce framework, Yahoo! Inc. was able to sort 1Tb of data in less than 60 seconds (1Tb sort). Unfortunately not every companies should have the ability to build a thousand nodes cluster to achieve pseudo real-time querying. Assuming you have a 10 nodes cluster, which is already a good start for playing with big-data, sorting 1Tb of data should take roughly around 1h to complete. Given this constraint of limited nodes, it might become a need to reorganize your data on HDFS (e.g. partition, sort) and / or to implement indexes in order to improve overall performances. I am describing in this report the implementation of custom indexes based on Hadoop InputSplits that I have created, the issues I was facing, and finally what have been (if any) the added value compared to non-indexed Mapreduce jobs.

Background

Reader should understand concept of Hadoop, HDFS and mapreduce, and should have experience in implementing basic Mappers and Reducers classes using Hadoop New API. Should you need to refresh your knowledge, the famous definitive guide from Tom White will be more than helpful.

Input Split

Once you submit a mapreduce job, JobTracker will compute the number of Map tasks that need to be executed on your data set. This number depends on the number of files available in your HDFS, the file’s size and the block’s size. Any file is actually split into one or several InputSplit(s), which is basically a chunk of 1 or several block(s). From what I understand, each InputSplit will get its dedicated mapper instance, so the number of mappers should be actually directly related to the number of InputSplits. 

An InputSplit is based on 3 different values:

  1. The file name
  2. The offset (where InputSplit starts)
  3. The length (where InputSplit stops)

The InputSplit method toString() will return below pattern:

dfs://server.domain:8020/path/to/my/file:0+100000

By hashing this value using MD5Hash, you can get a unique ID identifying any InputSplit

911c058fbd1e60ee710dcc41fff16b27

Main assumption: JobTracker will always return the exact same InputSplits as long as the data set does not change.

Indexing

We can identify 3 different levels: The File, the InputSplit, and the Block. In theory, there should be at least 3 different ways for creating indexes: Index based on File URI, index based on InputSplit, or index based on block. I must confess I was quite afraid to manually deal with blocks, so I focused mainly on File and InputSplit indexes only.

Let’s take 2 different examples of data set.

In this first example, 2 files in your data set fit in 25 blocks, and have been identified as 7 different InputSplits. The target you are looking for (grey highlighted) is available on file #1 (block #2,#8 and #13), and on file #2 (block #17)

  • With File based indexing, you will end up with 2 files (full data set here), meaning that your indexed query will be equivalent to a full scan query
  • With InputSplit based indexing, you will end up with 4 InputSplits on 7 available. The performance should be definitely better than doing a full scan query

Let’s take a second example. This time the same data set has been sorted by the column you want to index. The target you are looking for (grey highlighted) is now available on file #1 (block #1,#2,#3 and #4).

  • With File based indexing, you will end up with only 1 file from your data set
  • With InputSplit based indexing, you will end up with 1 InputSplit on 7 available

For this specific study, I decided to use a custom InputSplit based index. I believe such approach should be quite a good balance between the efforts it takes to implement, the added value it might bring in term of performance optimization, and its expected applicability regardless to the data distribution.

Implementation

Process for implementing index is quite simple and consists on the following 3 steps:

  1. Build index from your full data set
  2. Query index in order to get the InputSplit(s) for the value you are looking for
  3. Execute your actual mapreduce job on indexed InputSplits only

The first step must be executed only once as long as the full data set does not change.

Building index

Building a Mapreduce Index might take a really long time to execute as you are outputing each value to index together with its actual InputSplit location. The key parameter to estimate here is the number of Reducers to be used. Using a single reducer, all your indexes will be written into a single file, but the time required to copy all data from mappers to a single reducer will be definitely too long. Using thousands of reducers, you will output indexes on thousands of different files, but it might be significantly faster to execute. The right value is, I believe, a balance between the number of available reduce slots in your cluster and the expected size of your final index. In order to estimate it properly, I did a Simple Random Sampling (SRS) of 10% my data set. If you expect a 100Gb large index, you could set up to around 50 the number of reducers so that you will end up with 50 files of 2Gb each.

Mapper Class

Using Hadoop context, you can retrieve the current InputSplit from the running mapper instance. I am extracting the value I want to index, and output it together with its InputSplit MD5 hash.

public class RebuildIndexMapper extends Mapper<Object, Text, Text, Text> {
    
    private String splitId;
    
    public void setup(Context context) {
        // Retrieve current inputSplit from Context
        InputSplit is = context.getInputSplit();
        splitId = MD5Hash.digest(is.toString()).toString();
    }

    public void map(Object object, Text line, Context context) {
        // Retrieve key to index
        String key = Utils.getValueToIndex(line.toString());
        // Output value to index + current Split Id
        context.write(new Text(key), new Text(splitId));
    }
}

Assuming you are indexing IP addresses, intermediates key values will look like the following. From a 1st mapper,

192.168.0.1     60390c7e429e38e8449519011a24f79d
192.168.0.2     60390c7e429e38e8449519011a24f79d
192.168.0.3     60390c7e429e38e8449519011a24f79d
192.168.0.1     60390c7e429e38e8449519011a24f79d

And from a second one,

192.168.0.1     ccc6decd3d361c3d651807a0c1a665e4
192.168.0.5     ccc6decd3d361c3d651807a0c1a665e4
192.168.0.6     ccc6decd3d361c3d651807a0c1a665e4
192.168.0.3     ccc6decd3d361c3d651807a0c1a665e4
Combiner Class 

As you will output key / values for each line of your data set, using a Combiner that removes any duplicate might be really useful. Implementation is quite obvious and will not be described here.

Reducer Class 

The goal of the reducer is simply to get the distinct InputSplit for any indexed value, and output them within 1 single line.

public class RebuildIndexReducer extends Reducer<Text, Text, Text, Text> {
    
    public void reduce(Text key, Iterable<text> values, Context context) {
        
        // Remove duplicated SplitId for same target
        List<string> list = new ArrayList<string>();
        for (Text value : values) {
               String str = value.toString();
               if (!list.contains(str)) {
                    list.add(str);
               }
        }

        // Concatenate distinct SplitId
        StringBuilder sb = new StringBuilder();
        for (String value : list) {
             sb.append(value);
             sb.append(",");
        }

        context.write(key, new Text(sb.toString()));
    }
}

Given the same example as before, final output will look like the following

192.168.0.1     ccc6decd3d361c3d651807a0c1a665e4,60390c7e429e38e8449519011a24f79d
192.168.0.2     60390c7e429e38e8449519011a24f79d
192.168.0.3     ccc6decd3d361c3d651807a0c1a665e4,60390c7e429e38e8449519011a24f79d
192.168.0.5     ccc6decd3d361c3d651807a0c1a665e4
192.168.0.6     ccc6decd3d361c3d651807a0c1a665e4

Because index output is usually quite large, using a SequenceOutputFile might be really helpful.

Querying index 

Each time you need to execute a mapreduce job for a given indexed value (IP address in my example), you have first to query the index you have created at previous step in order to retrieve the distinct InputSplit this value belongs to.

Mapper class

The map task is quite simple. For each indexed value that matches the target you are looking for, output all its indexed InputSplit.

public class FetchIndexMapper extends Mapper<text,> {

    private String indexLookup;

    public void setup(Context context) {
        // Get the value to look up
        indexLookup = context.getConfiguration().get("target.lookup");
    }

    public void map(Text indexKey, Text indexValue, Context context) {
        
        String strKey = indexKey.toString();
        if (!strKey.equals(indexLookup)) {
            // Ignore record if it does not match target
            return;
        } else {
            for (String index : indexValue.toString().split(",")) {
                // Output each single InputSplit location
                context.write(new Text(index), NullWritable.get());
            }
        }
    }
}
Reducer class

Purpose of Reduce task is simply to remove any duplicate. Implementation is quite obvious so it will not be described here. Given the same example as before,

192.168.0.1     ccc6decd3d361c3d651807a0c1a665e4,60390c7e429e38e8449519011a24f79d
192.168.0.2     60390c7e429e38e8449519011a24f79d
192.168.0.3     ccc6decd3d361c3d651807a0c1a665e4,60390c7e429e38e8449519011a24f79d
192.168.0.5     ccc6decd3d361c3d651807a0c1a665e4
192.168.0.6     ccc6decd3d361c3d651807a0c1a665e4

an index query for IP address 192.168.0.1 will output the following

ccc6decd3d361c3d651807a0c1a665e4
60390c7e429e38e8449519011a24f79d

These InputSplit MD5Hash should be written somewhere on HDFS temp folder so that it can be read in your actual mapreduce job (next section). If the file is quite large, getting a SequenceOutputFormat can be – once again – really helpful.

Executing your mapreduce job

Now that we have built our index table and retrieved the actual InputSplit(s) for the target we are looking for, it is time to set up the actual mapreduce job.

Custom FileInputFormat

Using default configuration, Hadoop is able to retrieve the number of InputSplit to be used using the FileInputFormat class. We will create our own FileInputFormat class extending the default one, and overriding its getSplits() method. You have to read the file you have created at previous step, add all your indexed InputSplits into a list, and then compare this list with the one returned by the super class. You will return to JobTracker only the InputSplits that were found in your index.

public class IndexFileInputFormat extends FileInputFormat<LongWritable, Text> {
    
.../...

    @Override
    public List getSplits(JobContext job) throws IOException {

        // Retrieve all default InputSplits
        List<InputSplit> totalIs = super.getSplits(job);    
        
        // Keep only the InputSplits matching our indexed InputSplits
        List<InputSplit> indexedIs = Utils.removeNonIndexedIS(totalIs);
        
        // Return list of Indexed InputSplits
        return indexedIs;
        
    }
}

With somewhere in an Utils class

public static List removeNonIndexedIS(List<InputSplit> totalIs){
    
    // Read your previous file and list all your indexed MD5 hash
    List<string> md5Is = readInputSplitsFromFile();    
    
    // Initialize new list of InputSplit
    List<InputSplit> indexedIs = new ArrayList<InputSplit>();
    
    // Filter out InputSplit that are not found in our indexed list
    for (InputSplit split : totalIs) {
        String str = MD5Hash.digest(split.toString()).toString();
        if (md5Is.contains(str)) {
            indexedIs.add(split);
        }
    }    
    
    // Return list of input Split to query
    // Return empty list if target does not exist
    return indexedIs;
    }
    
}
Driver code

We have now to use this IndexFileInputFormat class instead of the default one (FileInputFormat) in our driver code. During JobTracker initialization, Hadoop will use only the InputSplits that match the ones we have specified, and you should end up with less map tasks than required for a “full scan query”.

public class TestIndex(){

    public void main(String[] args) {
    
    .../...
    
    // Create a new Job and configuration
    Configuration conf = new Configuration();
    Job job = new Job(conf);
    
        // Configure your mapreduce Job
    job.setJarByClass(MyCustomMapper.class);
    job.setMapperClass(MyCustomMapper.class);
    job.setReducerClass(MyCustomReducer.class);
    
    // Use our custom IndexFileInputFormat
    job.setInputFormatClass(IndexFileInputFormat.class);
    
    .../...
    
    job.waitForCompletion(true);
    
    }
}

Testing 

Test environment 

For this specific test, I set up a small cluster of 3 virtual nodes (Virtualbox) as per below description. Hadoop cluster has been installed from Cloudera Manager 4.5.2 (free edition).

Host
OS: Mac OS X 10.7.5
Processor: Intel(R) Core(TM) i7-2600 CPU @ 3.40GHz
Memory: 16Gb

1 Namenode + JobTracker

OS: Ubuntu server 12.04.2 LTS
memory : 5Gb
storage : 50Gb
jdk: Java SDK 1.6.0_31
Hadoop 2.0.0-cdh4.2.0
2 Datanodes + TaskTrackers
OS: Ubuntu server 12.04.2 LTS
memory : 2.5Gb
storage : 50Gb
jdk: Java SDK 1.6.0_31
Hadoop 2.0.0-cdh4.2.0

Test data

I have generated a test data set using a simple perl script. Even though 30Gb of data is far away from what a real big-data environment should be, I believe such a data set is large enough to see any potential improvement by using indexing.

30Gb
100 bytes per records
1Gb per file

I have generated 5’000’000 different target (IP addresses) that I have randomly distributed over these 30’000’000 records. A same IP is found 60 times in average.

Results

I have done the exact same indexed and non-indexed jobs several times over subsets of 5, 10, 15, 20 and 25Gb of my data set. Because my indexed values were evenly distributed, I expected most of my figures to follow somehow a linear trend.

I have represented in below graph the actual index size vs. data set size. As expected, the index size is linearly growing up, and so is its rebuild execution time. My index is around 7.5Gb large for a 25Gb data set (30%), and has been fully rebuilt in around 30mn.

I have represented in below graph the execution time for both indexed and non-indexed jobs as a function of the data set size. This is obviously what everybody expect from indexing: Larger your data set is, faster your indexed query will be (proportionally to the non-indexed one). Even though my test environment was really small, I have been able to see the great potential indexes is bringing to mapreduce performance.

In previous graph I was not taking into account the time required for rebuilding my index. Even though this is done only once (e.g. once a day), this process is really heavy and might take a lot of time to complete. This must be taken into account when forecasting indexing performance. Assuming you are building your index only once a day, and then executing 10 mapreduce jobs, the overall execution time for all your indexed queries will be:

Time total = rebuild_time + 10 x (indexed_mapreduce_time)

This is what I have represented on below figure. Time required to execute X requests a day using both indexed and non indexed queries, over a 25Gb data set.

For my specific test environment, I need to execute at least 5 mapreduce jobs a day to take any benefit of using custom indexes.

Conclusion

Mapreduce index performances are strongly dependent on your data distribution, and might be really powerful especially with large data set. Should you need to forecast how powerful this same implementation might be for your specific use case, I suggest you to benchmark on your production environment using a subset of your production data set. For that purpose, you can perform several tests on a Simple Random Sampling so that you will be able to extrapolate these results to your entire data set.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Antoine Amend
Software Developer
France France
I graduated in 2008 with a Master Degree in Astrophysics from Strasbourg university (France). In addition to the theoretical physics I have learned, I have acquired robust mathematical and IT knowledge. I am levering at best this scientific and academic background in my daily activities, allowing me to think out the box and have a structured approach to problem solving. With excellent knowledge on shell scripting (ksh, bash) and Perl, I am working comfortably in Linux / Unix environment (Redhat, Centos, Ubuntu).
I am interested in expanding my professional horizons by seeking new challenges in the area of Big-data, where my academical background in data analysis, my proven experience in IT consulting, and my analytic and deductive skills are outstanding assets in such an exciting and emerging technology.

Comments and Discussions

 
GeneralFollow me PinprofessionalAntoine Amend31-May-13 8:16 
GeneralMy vote of 1 Pinmemberthefiloe16-May-13 23:46 
GeneralMy vote of 5 Pinmemberyafan3-May-13 5:47 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web02 | 2.8.140916.1 | Last Updated 3 May 2013
Article Copyright 2013 by Antoine Amend
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid