Click here to Skip to main content
13,146,498 members (48,290 online)
Click here to Skip to main content
Add your own
alternative version

Tagged as

Stats

27.1K views
629 downloads
1 bookmarked
Posted 3 Sep 2015

Apache HBase Example Using Java

, 3 Sep 2015
Rate this:
Please Sign up or sign in to vote.
Simple Java program demonstrates HBase table creation, data import and queries

Introduction

This project demonstrates the use of Apache HBase (http://hbase.apache.org/) with the Java API. It is intended as a starting point for exploring the capabilities of HBase and to give developers new to HBase an overview on getting started. Updated code for this project can be found at GitHub (https://github.com/ggraham-412/HBaseJavaExample, commit b1fdec) and in the accompanying zip file.

Background

Apache HBase is a column-oriented, key-value NoSQL database modeled after Google's BigTable (http://research.google.com/archive/bigtable.html). HBase is designed to work with Hadoop Distributed File Store (HDFS), and it is designed from the outset for scalability on clusters of commodity hardware. As with other NoSQL database projects, HBase delivers on its scalability promise by giving up on some of the features of a traditional RDBMS, such as transactional integrity, referential integrity, and ACID (https://en.wikipedia.org/wiki/ACID) guarantees. HBase preserves some of these guarantees, and only under certain conditions.

HBase implements a horizontally partitioned key value map. Every item in HBase is addressable by a row key, a column family, and a column name within the family. Furthermore, every item is versioned by timestamp. HBase will store up to N versions of data with N being settable on the column family. When querying HBase, if the version is not given, then the most recent data is returned.

{row key, column family:column, version} -> {data item}

The row key, column family, and column are represented as byte arrays. Although strings are commonly used for all three, only the column family has the restriction of using printable characters. The version must be a long integer.

Records are clustered lexicographically by row key. It is the only sortable key, and so it is common practice to make it a munged compound key. Care is required in the design of the row key, and the choice of munging must reflect the expected nature of the queries to be supported. In a fully distributed HBase system, data will be housed on "region servers" based on regions of the row key space.

Using the Code

Installation and Deployment

This project contains example code for accessing HBase from Java. The example code will import daily stock price data from Google Finance into HBase and run simple queries against it. The example was developed with HBase 1.0.1.1 or compatible, Java 8 JDK update 60, and Fedora 22 linux (4.1.6-200.fc22.x86_64). It should also run on Windows using Cygwin (http://hbase.apache.org/cygwin.html), but this is untested by me.

Unpack the HBase archive and edit the configuration scripts if desired. HBase should start up running against the /tmp folder by default, and not using HDFS. To change the folder HBase uses for its store, edit the configuration file conf/hbase-site.xml as follows:

<configuration>
  <property>
    <name>hbase.rootdir</name>
    <value>file:///data/hbase</value>
  </property>
  <property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/data/zookeeper</value>
  </property>
</configuration>

The foregoing configuration will cause HBase to use the /data folder on the local host. Note that it is not necessary to create the /data/hbase and the /data/zookeeper folders; HBase will do that for you. However, the /data folder should be writable by whatever user is running the HBase daemon.

To start HBase, issue the command:

bin/start-hbase.sh

The example code contained in this archive was compiled under Java version 8 update 60 from http://www.java.com.

Compilation

The Java code must be compiled against a rather large number of jar files that come with HBase. I do not know if all of the jar files are really needed, but including them all works. There is a small shell script called makeCPATH to help with this. The script must be sourced with the location of the lib folder as first argument.

. ./makeCPATH.sh /path/to/hbase/lib
echo $CPATH

Afterwards, the variable CPATH should contain a list of all of the jar files in the HBase lib folder. On Windows, you'll need to write an equivalent .bat file to do the same thing. Alternatively, you can import this code into an IDE like eclipse and set the project build path to include all of the jar files using a dialog box interface.

To compile the Java code, change to the folder containing the Java source code for this example, e.g.- TestHBase.java. Execute the command:

javac -cp $CPATH *.java
Running the Example

The location of the configuration folder of HBase should be set in the environment variable HBASE_CONF_DIR. This allows the Java code to find and read the HBase configuration. (The file hbase-site.xml should be in this folder.)

In addition, the Java environment variable JAVA_HOME should be set to the folder containing the partial path "bin/java" for your Java installation. (NOTE: Make sure this is the installation folder and not a folder containing a symbolic link. For example, it should look like "/usr/java/jdk1.8.0_60/jre".)

The example code comes with four stock price datasets from Google Finance obtained through http://www.quandl.com for the symbols ABT, BMY, MRK, and PFE. These datasets are contained in the folder FinData. The TestHBase class is defined outside of a package, so you can run it by just:

java -cp $CPATH:. TestHBase

The code will connect to the HBase instance defined in the conf/hbase-site.xml configuration file. Then, it will drop the table (if it already exists from a previous run), (re)create the table, load the four example stock datasets into the table, and run some example queries.

The name of the table is BarData. It will contain daily "candlestick" bars of stock price movements: opening, high, low, and closing prices, and the daily volume. This table can be inspected offline with the hbase shell. (See https://learnhbase.wordpress.com/2013/03/02/hbase-shell-commands/ for more information on the HBase shell.)

Points of Interest

The center of the design is a DAO inspired class called BarDatabase. The schema is specified by byte array constants in the class to avoid unnecessary overhead of repeatedly converting table, row and column string names to byte arrays for the Java HBase API. The class avoids use of intermediate data objects, and instead delegates responsibility to specialized interfaces for reading data from a data source and for processing data returned from queries. More can be done here to use raw streams and to avoid row oriented lines of text.

Creating/Deleting a Table
try (Connection connection = ConnectionFactory.createConnection(config);
     Admin admin = connection.getAdmin()) {

    HTableDescriptor table =
        new HTableDescriptor(TableName.valueOf(TABLE_NAME));
    table.addFamily(new HColumnDescriptor(COLUMN_FAMILY));

    if (!admin.tableExists(table.getTableName())) {
        System.out.print("Creating table. ");
        admin.createTable(table);
        System.out.println(" Done.");
    }
}

Every operation in HBase takes place in the context of a connection. We use the AutoClose feature to guarantee that the connection is closed at the end of the try block. A table is then created (or dropped) using an HTableDescriptor and the HBase Admin interface. The only significant difference between creating a new table or deleting an existing table is that an existing table must be disabled before it can be deleted.

if (admin.tableExists(table.getTableName())) {
     System.out.print("Dropping table. ");
     // a table must be disabled before it can be dropped
     admin.disableTable(table.getTableName());
     admin.deleteTable(table.getTableName());
     System.out.println(" Done.");
 }

Column families are administrative scopes for the columns they logically contain. The limit on the number of versions to keep for columns is an example of an administrative parameter that is defined at the column family level.

Importing Data

Data can be imported using the Put object. This can be done on a row by row basis, or a List of Put objects can be imported at once. (I am not sure if this is a true bulk operation, but it is about twice as fast loading data on the local system this way rather than importing each Put after reading every row record.

In the example code, since the data is resident in CSV files, this is achieved by a class called LineImporter, which is an inner class of BarDatabase that implements a callback interface. After each line is read by a text file LineReader instance, the LineImporter creates a Put object and saves it in a list called currentImport. When the file stream is closed, the LineImporter instance bulk loads the data into HBase.

/**
  *    Imports bulk data into HBase table
  */
 @Override
 public void close() throws Exception {
     if ( currentImport.isEmpty() ) return;
     try (Connection conn = ConnectionFactory.createConnection(config)) {
         Table table = conn.getTable(TableName.valueOf(TABLE_NAME));
         table.put(currentImport);
         table.close();
     }
     finally {
         currentImport.clear();
     }
 }
Querying Data

There are two query methods in general: Get and Scan. Get is intended to find single rows (or single cells of single rows), whereas Scan is intended to return row sets. Get is parameterized by a row key, and an optional column family, an option column within the family, and an optional version number. Here is an example query to retrieve only the closing price of a given stock.

/**
 *    Gets a single cell given the date and stock symbol and column ID
 */
public String GetCell(String date, String symbol, byte[] column)
        throws IOException {
    try (Connection conn = ConnectionFactory.createConnection(config)){
        // Get the table
        Table table = conn.getTable(TableName.valueOf(TABLE_NAME));
        // Construct a "getter" with the rowkey.
        Get get = new Get(makeKey(date, symbol));
        // Further refine the "get" with a column specification
        get.addColumn(COLUMN_FAMILY, column);
        // Get the result by passing the getter to the table
        Result r = table.get(get);
        // return the results
        if ( r.isEmpty() ) return null;
        // Gets the value of the first (and only) column
        return new String(r.value());
    }
}

In this case, both a column family and column within the family are specified. If we had specified only the family with an addFamily invocation, then all columns in that family would be returned.

For a scan example, consider the following code. Instead of specifying a particular row key, you specify a starting row key and a limit. This may seem somewhat limited (no pun intended) in terms of the query power, but you can also specify row filters via the setFilter() method that execute server-side.

However, a Scan remains true to its name: it will hit every row key in the scan. The row limit is implemented purely in the client side code. (It is also implemented client side in the official HBase shell interactive version of scan.) But you can set the cache limit server side and use a PageFilter to keep the servers from churning through every row when you only wanted a few.

/**
 *    Specifies a range of rows to retrieve based on a starting row key
 *    and retrieves up to limit rows.  Each row is passed to the supplied
 *    DataScanner.
 */
public void ScanRows(String startDate, String symbol,
         int limit, DataScanner scanner) throws IOException {
    ResultScanner results = null;
    try (Connection conn = ConnectionFactory.createConnection(config)){
        // Get the table
        Table table = conn.getTable(TableName.valueOf(TABLE_NAME));
        // Create the scan
        Scan scan = new Scan();
        // start at a specific rowkey.
        scan.setStartRow(makeKey(startDate, symbol));
        // Tell the server not to cache more than limit rows
        // since we won;t need them
        scan.setCaching(limit);
        // Can also set a server side filter
        scan.setFilter(new PageFilter(limit));
        // Get the scan results
        results = table.getScanner(scan);
        // Iterate over the scan results and break at the limit
        int count = 0;
        for ( Result r : results ) {
            scanner.ProcessRow(r);
            if ( count++ >= limit ) break;
        }
    }
    finally {
        // ResultScanner must be closed.
        if ( results != null ) results.close();
    }
}

Also in the above code, we see that the query results are not process in-situ, but are sent directly to a callback interface. Again, this is done to avoid creation of intermediate objects. In the example case, the DataScanner provided simply dumps the output to stdout.

The Row Key

We mentioned above the importance of designing a good row key. In this example, the row key was chosen to be a munge of stock name followed by date. In a fully distributed system, this means that rows are going to be allocated to servers first based on their stock symbol and then based on their date. This has important benefits. Queries against historical data are likely going to be focused mainly on yesterday's data, and following that in popularity will be the day before yesterday, etc. Analysts are rarely going to look at the stock price of, say, IBM in 1987. So if the row key had been designed the other way with the date first, days would tend to be clustered together, and real query traffic would tend to bombard a small fraction of the HBase region servers in your cluster!

Conclusion

I didn't talk about Zookeeper. Zookeeper is a kind of distributed task manager that synchronizes configuration and coordinates distributed services. Since this was intended to be a simple example running on a single node, I didn't talk about it.

NoSQL databases offer improved scalability for hosting large data stores, in the range of billions of rows and millions of columns. These database systems deliver on that promise by relaxing various constraints imposed by the relational model, and the choice of relaxed constraint is driven by particular use cases. I think the plethora of NoSQL databases owes to the many different use cases and many different ways and degrees of relaxing the relational model. (http://nosql-database.org/).

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

ggraham412
Web Developer
United States United States
No Biography provided

You may also be interested in...

Pro
Pro

Comments and Discussions

 
GeneralPerfect Pin
Member 821769731-Mar-16 9:17
memberMember 821769731-Mar-16 9:17 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

Permalink | Advertise | Privacy | Terms of Use | Mobile
Web04 | 2.8.170915.1 | Last Updated 3 Sep 2015
Article Copyright 2015 by ggraham412
Everything else Copyright © CodeProject, 1999-2017
Layout: fixed | fluid