Click here to Skip to main content
Click here to Skip to main content

DotNet Programming using Cassandra

, 12 Apr 2014
Rate this:
Please Sign up or sign in to vote.
An article on DotNet programming using Cassandra storage

BigData

What does it mean?

Companies/Industry was seeking to make the best use of information to improve their business capabilities. Itz a lot of data produced quickly in many different forms like production database, transactional histories, web traffic logs, online videos, social media interactions and so forth. In 2001, Doug Laney originally coined the concept "The Three Vs-volume, velocity and variety".

  • Volume - sheer amount of data
  • Velocity - speed of data processing
  • Variety - number of types of data

3Vs

Why do we need?

In the last decade, IT data growth is exponential. We live in the data age. It’s not easy to measure the total volume of data stored electronically, but an IDC estimate put the size of the “digital universe” at 0.18 zettabytes in 2006, and is forecasting a tenfold growth by 2011 to 1.8 zettabytes. Industry benchmark depicts the same as:

growth

As the data grows rapidly, the cost of storage (1GB) disk is reversing with the similar proposition. Industry data exposes it in the below chart since 1980. It was started with 2 million; now less 10 cents.

storecost

Big Data helps the application development group to accomodate the modern technology concepts seamlessly

  • High Availability
  • Scale out Architecture
  • Failover Recovery
  • Highly Distributed

On top of technology revolution, Business is more focused on demand vs supply in their use cases. With the demand of data growth from 8 to 8K Exabyes and supply of hard drive cost from 2 million to 2 cents, business is driven towards the implementation of Big Data technology.

How itz classified?

There are four different types of data models been implemented.
1. Column Families or wide column store
2. Document Store
3. Key/ value Store
4. Graph Data store

1. Column Families or wide column store

Examples: Cassandra, HBase
Typical usages : Distributed data storage

The column families basically extends the typical key/value pair storage and provide two level of nesting. The nested key/value pair is called column in column family storage. Each column can be grouped with a key and thus provide super column functionality as well. The typical use case will be the application with extensive read/write operation.

2. Document Store

Examples: CouchDB, MongoDBv
Typical usages : Web applications.

Document based data model used to store and retrieve document oriented or semi-structured information which is common on web-based applications. Typically all the modern document based data models follow the XML,JSON or BSON model that is easy map with the data model. It is also enable APIs easily interact with the databases as most of the modern-day languages support these formats in build.

3. Key/ Value store

Examples: Membase, Redis

Typical usages : Distributed Hash table, caching

Key / value store function as typical Hash table where it will store the value against a key. This will enable to create schema less storage. It will be best in use when we have excess read operation than write operation. Say for example we have to display Latest post on forum on every 2 minutes, It makes sense to run a background job on every 2 minutes and store as key value pair and read data from there. Lot of content intensive websites using in memory key / value pair storage system.

4. Graph Databases

Examples: Neo4J, InfoGrid

Typical usages: Social networking, Recommendations

Graph data model typically follows the Graph algorithm to store data with nodes and edges. Graph databases are best suit where we face recursive data model. Each an every node will have the following attributes Node Property Relationship (Edges)

type

Mission

Objective

A problem statement is taken to explain how to build .NET application using Big Data Cassandra storage. Here, we are taking the simple profile calculation using 2 transactional systems-Debit, Credit and 2 reference systems-Customer, ForexRate.

Scope

Traditional

For the given problem, application is developed using relational data store. Relational database model first been proposed by Edgar F. Codd in late 1969. The fundamental assumption of the relational data model is that the data is represented as a mathematical n-ary relation which is a subset of Cartesian product. The data been accessed by means of relational algebra using set theory and the consistency been achieved by applying constraints like primary and foreign keys.

In the current data lake world, the traditional data store approach won't scale as the industry data grows.

Target State

This is where, this paper is going to talk about how to build .NET application using BigData Cassandra backend storage instead of traditional approach. This paper addresses this problem right from architecture designs to implemntation of .NET code.

Architecture

Infrastructure

As it is layed as 3-tier architecture, the infra needs Presentation, Business and Storage(Cassandra) layer. With the benefits of highly available peer-peer cluster model, Cassandra layer is built using 2-nodes cluster.

Infra

Business and Storage layers are connected using BigData Cassandra connector called CassandraSharp. You can get more information about CassandraSharp at GitHub reference

Logical

The Logical Architecture defines the Processes with the activities and functions that are required to provide the business requirements. The Logical Architecture is independent of technologies and implementations.

Logic

In our mission, our functions are splitted into 6 categories. Console landing is input layer and result is output. Global data container is the data holder cutting across the application. Key functionalities are covered in the remaining 3 areas: Loader, BusinessEngine and DataAccess.

Loader module load the transaction, reference and business rules during the initial (AppInit) process. BusinessEngine is the business layer which select the calculation rule based on the input. DataAccess is data connectivity layer to load/store the information with the Cassandra storage.

Data

The given problem statement, dealth with 2 transaction data and 2 reference data. They are marked as Debit,Credit and Customer,ForexRate respectively. It is depicted in the below diagram:

Data

Implementation

For the given problem statement, we covered the objectives and Infra,Logical and Data architecture. Let us get into the actual implementation of this requirement using .NET programming with Cassandra as Data Storage.

Cassandra Query

As we are taking Cassandra as the storage, Cassandra Tables (Column Families) are created in the background query as per our data model. The actual CQL (Cassandra Query Language) to create the Cassandra Tables (Column Families) are listed below:

CREATE TABLE Debit (
Trans_id int PRIMARY KEY,
Customer_id int,
Trans_amount varchar,
Trans_date varchar
)

CREATE TABLE Customer (
Customer_id int PRIMARY KEY,
Name varchar,
Location varchar
)

Cassandra Connector

Traditionally, we know that ADO.NET provides consistent access to data sources such as SQL Server and XML, and to data sources exposed through OLE DB and ODBC. So, ADO.NET separates data access from data manipulation into discrete components that can be used separately or in tandem. ADO.NET includes .NET Framework data providers for connecting to a database, executing commands, and retrieving results.

In the similar way, Big Data storage Cassandra can be programmatically connected using one of the open source tool called CassandraSharp. It is part of Apache Cassandra contribution for high performance .NET driver. Namespace CassandraSharp contains ClusterManager, TransportConfig, ClusterConfig, BehaviourConfig, etc. Github reference page is available at https://github.com/pchalamet/cassandra-sharp

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using Apache.Cassandra;
using CassandraSharp;
using CassandraSharp.Config;

namespace DataAccess
{
    public abstract class BaseDataAccess : IDisposable
    {
        private string[] myClusters;
        private int myPort;

        public BaseDataAccess(string[] clusters, int port)
        {
            myClusters = clusters;
            myPort = port;
        }

        protected ICluster GetCluster()
        {
            CassandraSharpConfig config = new CassandraSharpConfig();
            ClusterConfig clusterConfig = new ClusterConfig();
            TransportConfig transConfig = new TransportConfig();            
            clusterConfig.Name = "TestCassandra";
            transConfig.Port = myPort;
            clusterConfig.Transport = new TransportConfig();

            EndpointsConfig endPointConfig = new EndpointsConfig();
            endPointConfig.Servers = myClusters;
            endPointConfig.Snitch = SnitchType.Simple;
            endPointConfig.Strategy = EndpointStrategy.Nearest;

            BehaviorConfig behaveConfig = new BehaviorConfig();
            behaveConfig.KeySpace = ConfigEntries.DefaultDatabase;
            if (!String.IsNullOrWhiteSpace(ConfigEntries.UserName)) behaveConfig.User = ConfigEntries.UserName;
            if (!String.IsNullOrWhiteSpace(ConfigEntries.Password)) behaveConfig.Password = ConfigEntries.Password;
            behaveConfig.ReadConsistencyLevel = Apache.Cassandra.ConsistencyLevel.ONE;
            behaveConfig.WriteConsistencyLevel = Apache.Cassandra.ConsistencyLevel.ONE;

            clusterConfig.Transport = transConfig;
            clusterConfig.Endpoints = endPointConfig;
            clusterConfig.BehaviorConfig = behaveConfig;

            config.Clusters = new ClusterConfig[] { clusterConfig };
            
            //We need to ensure that the connection is not initialized before configuring...
            ClusterManager.Shutdown();
            
            ClusterManager.Configure(config);

            ICluster cluster = ClusterManager.GetCluster("TestCassandra");
            return cluster;
        }

        protected DataTable ConvertCqlResultToDataTable(CqlResult result, string tableName)
        {
            DataCommon common = new DataCommon();
            DataTable store = common.GetSchema(result, tableName);
            return PopulateData(result, common, store);
        }

        private DataTable PopulateData(CqlResult result, DataCommon common, DataTable store)
        {
            string columnName = string.Empty;
            foreach (CqlRow row in result.Rows)
            {
                DataRow dataRow = store.NewRow();
                foreach (Column column in row.Columns)
                {
                    columnName = common.GetValue<string>(column.Name);
                    dataRow[columnName] = common.GetValue(store.Columns[columnName], column.Value);
                }
                store.Rows.Add(dataRow);
            }
            return store;
        }

        public void Dispose()
        {
            ClusterManager.Shutdown();
        }
    }
}

In our DataAccess object, GetCluster method retrieves the details of Cassandra cluster from Application configuration file. It cover the complete details of Cluster like Server address, User credentials, Consistency level, Endpoint Strategy, etc.

We need a generic method to fetch the result in the form of DataTable, for the given Cassandra Table name. ConvertCqlResultToDataTable method satisfies this requirement.

PopulateData method is an internal method of previous method. PopulateData reads each row and column of Cassandra Table using meta data; then return the result into DataTable format.

Data Type Access

.NET framework and Cassandra storage data types are different in terms of representation. This section is about the synchronization on these two technologies' data types. Cassandra stores everything by columns, which are made up for three properties internally as:

Property Type
Name CompareWith type
Value binary
Timestamp 64-bit integer

The Name with CompareWith type is set in the configuration and can be ASCII, UTF8, LexicalUUID, TimeUUID, Long, or Bytes. In other words in the .NET world they can be string, Guid, DateTime, long, or byte[]. The Value can only be the Bytes or byte[] type. And the Timestamp is used for synchronization between Cassandra servers and shouldn’t be directly controlled. The below screen shot depicts what happens to the Value property of the column when it is set and saved.

DataType

From when you set a property to your chosen type to when it is saved in Cassandra it goes through a two steps that you probably aren’t aware of, first the type is serialized and stored in Fluent Cassandra's flexible BytesType that is intelligent enough to understand how to serialize common runtime types in to binary so that you as the developer doesn’t have to worry about interacting with the Cassandra database at a low level. This intelligent type system is also the major driver behind the ASCII, UTF8, LexicalUUID, TimeUUID, Long, and Bytes type that also help serialize the Name property of the column correctly.

With these concepts, the below DataCommon class is created to handle all data types between .NET and Cassandra storage

namespace DataAccess
{
    internal class DataCommon
    {
        internal DataTable GetSchema(CqlResult result, string tableName)
        {
            if (result != null && result.Type == CqlResultType.ROWS)
            {
                return BuildTable(result.Schema, tableName);
            }
            else throw new ArgumentNullException("result", "'result' parameter must not be empty and it should contain atleast one row");
        }

        internal DateTime GetDate(byte[] value)
        {
            if (BitConverter.IsLittleEndian) Array.Reverse(value);
            return GetDateTimeFromLong(BitConverter.ToInt64(value, 0));
        }

        internal string GetName(byte[] value)
        {
            return GetValue<string>(value);
        }

        static IDictionary<string,>> dataProcessors;
        private IDictionary<string,>> GetDataProcessors()
        {
            if (dataProcessors == null)
            {
                //TODO: More data type processors needs to be added.
                dataProcessors = new Dictionary<string,>>();
                dataProcessors["string"] = (byteValue) => GetValue<string>(byteValue);
                dataProcessors["decimal"] = (byteValue) => GetIntValue(byteValue);
                dataProcessors["double"] = (byteValue) => GetValue(byteValue);
                dataProcessors["bool"] = (byteValue) => GetValue<bool>(byteValue);
                dataProcessors["int"] = (byteValue) => GetIntValue(byteValue);
                dataProcessors["long"] = (byteValue) => GetValue<long>(byteValue);
                dataProcessors["datetime"] = (byteValue) => GetDate(byteValue);
            }
            return dataProcessors;
        }

        internal object GetValue(DataColumn column, byte[] value)
        {
            return GetDataProcessors()[column.DataType.Name.ToLower()](value);
        }


        internal decimal GetDecimalValue(byte[] value)
        {
            //check that it is even possible to convert the array
            if (value.Count() != 16)
                throw new Exception("A decimal must be created from exactly 16 bytes");
            //make an array to convert back to int32
            Int32[] bits = new Int32[4];
            for (int i = 0; i <= 15; i += 4)
            {
                //convert every 4 bytes into an int32
                bits[i / 4] = BitConverter.ToInt32(value, i);
            }
            return new decimal(bits);
        }

        internal double GetValue(byte[] value)
        {
            if (BitConverter.IsLittleEndian)
                Array.Reverse(value); //need the bytes in the reverse order
            return BitConverter.ToDouble(value, 0);
        }

        internal int GetIntValue(byte[] value)
        {
            if (BitConverter.IsLittleEndian)
                Array.Reverse(value); //need the bytes in the reverse order
            return BitConverter.ToInt32(value, 0);
        }

        internal T GetValue<t>(byte[] value)
        {
            return (T)Convert.ChangeType(Encoding.Default.GetString(value), typeof(T));
        }

        internal long GetDateTimeInLong(DateTime value)
        {
            DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
            TimeSpan elapsedTime = value - Epoch;
            return (long)elapsedTime.TotalSeconds;
        }

        internal DateTime GetDateTimeFromLong(long value)
        {
            return new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc).AddSeconds(Math.Round(value * 1.0));
        }

        private DataTable BuildTable(CqlMetadata metadata, string tableName)
        {
            DataTable dataStore = new DataTable();

            foreach (KeyValuePair<byte[],> column in metadata.Value_types)
            {
                DataColumn dataColumn = new DataColumn();
                dataColumn.ColumnName = GetValue<string>(column.Key);
                dataColumn.DataType = GetColumnType(column.Value);
                dataStore.Columns.Add(dataColumn);
            }
            return dataStore;
        }

        static IDictionary<string,> typeProvider;
        private IDictionary<string,> GetCqlToDotNetTypeProviders()
        {
            if (typeProvider == null)
            {
                typeProvider = new Dictionary<string,>();
                typeProvider["AsciiType"] = typeof(string);
                typeProvider["BytesType"] = typeof(byte[]);
                typeProvider["BooleanType"] = typeof(bool);
                typeProvider["CounterColumnType"] = typeof(int);
                typeProvider["DateType"] = typeof(DateTime);
                typeProvider["DecimalType"] = typeof(decimal);
                typeProvider["DoubleType"] = typeof(double);
                typeProvider["DynamicCompositeType"] = typeof(string);
                typeProvider["FloatType"] = typeof(decimal);
                typeProvider["IntegerType"] = typeof(int);
                typeProvider["LexicalUUIDType"] = typeof(Guid);
                typeProvider["LongType"] = typeof(long);
                typeProvider["TimeUUIDType"] = typeof(DateTime);
                typeProvider["UTF8Type"] = typeof(string);
                typeProvider["UUIDType"] = typeof(Guid);
            }
            return typeProvider;
        }

        private Type GetColumnType(string cqlType)
        {
            return GetCqlToDotNetTypeProviders()[cqlType];
        }
    }
}
</string,></string,></string,></string></byte[],></t></long></bool></string>

The act of casting is enough to tell the BytesType object how the binary data should be desterilized in to a runtime type that is understood by .NET. This is all done through a lot of operator magic, but the result is the same. You get the type you entered in to the database out of the database.

Data Access Object

In terms of business layer implementation, DAO (Data Access Object) is key to connect and process. In our exercise, 2 transactional and 2 reference DAO is created as below:

namespace DataAccess
{
    public class CreditDAO : BaseDataAccess, ISelectAllData, ISelectData
    {
        public CreditDAO()
            : base(ConfigEntries.Clusters, ConfigEntries.Port)
        { }

        DataTable ISelectData.GetSpecificData(string query, object[] parameters)
        {
            CqlResult result = base.GetCluster().ExecuteCql(string.Format(query, parameters));
            return ConvertCqlResultToDataTable(result, "Credit");
        }

        DataTable ISelectAllData.GetData()
        {
            CqlResult result = base.GetCluster().ExecuteCql(DbConstants.SelectCreditData);
            return ConvertCqlResultToDataTable(result, "Credit");
        }
    }
}

DAO is extented using BaseDataAccess object. SelectData interface retrieves the specific data based on the given parameters. Where as, SelectAllData interface fetches the complete data for the specific DAO.

Data Common

As the common/global config entries across the application, ConfigEntries object is created under Common namespace. Our ConfigEntries class has the common attributes of cluster server, port, default database, user credentials.

namespace Common
{
    public class ConfigEntries
    {
        public static string[] Clusters = ConfigurationManager.AppSettings["Clusters"].Split(new string[] { "|" }, StringSplitOptions.RemoveEmptyEntries);
        public static int Port = Convert.ToInt32(ConfigurationManager.AppSettings["Port"]);
        public static string DefaultDatabase = ConfigurationManager.AppSettings["DefaultDatabase"];
        public static string UserName = ConfigurationManager.AppSettings["UserName"];
        public static string Password = ConfigurationManager.AppSettings["Password"];
    }
}

By writing all these core module code, our objective is achieved as per the architecture design and implemented code base. Ultimatively, it helps/directs us to build the end-end application using .NET technology with Cassandra storage.

Points of Interest

Hope, it will be interesting to know the big data-Cassandra and .NET programming concepts along with store connectivity ideas, instead of the traditional code level implementation.

History

  • Version 1.0 - Initial Version.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

About the Author

GanesanSenthilvel
Architect
India India
Currently working as IT Solution Architect for Financial Services applications. Out of 19+ years, worked few career years at USA, Europe and APAC. Basically from C, C++, VC++, C# family, extending to the emerging Big Data and Cloud technology. Weekly Tech inking at http://ganesansenthilvel.blogspot.com/ Loves driving, spending time with friends&family, blogging, grooming next Gen.
Follow on   Twitter   LinkedIn

Comments and Discussions

 
-- There are no messages in this forum --
| Advertise | Privacy | Mobile
Web01 | 2.8.140721.1 | Last Updated 13 Apr 2014
Article Copyright 2014 by GanesanSenthilvel
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid