Click here to Skip to main content
15,861,125 members
Articles / Database Development / NoSQL

RaptorDB - the Key Value Store

Rate me:
Please Sign up or sign in to vote.
4.89/5 (118 votes)
22 Jan 2012CPOL22 min read 904.9K   9.9K   266  
Smallest, fastest embedded nosql persisted dictionary using b+tree or MurMur hash indexing. (Now with Hybrid WAH bitmap indexes)
using System;
using System.Collections.Generic;
using System.Text;
using System.IO;
using System.Threading;
using System.Collections;

namespace RaptorDB
{
    public enum INDEXTYPE
    {
        BTREE = 0,
        HASH = 1
    }

    public class RaptorDB
    {
        private RaptorDB(string Filename, byte MaxKeysize, bool AllowDuplicateKeys, INDEXTYPE idxtype)
        {
            Initialize(Filename, MaxKeysize, AllowDuplicateKeys, idxtype);
        }

        private List<LogFile> _logs = new List<LogFile>();
        private string _Path = "";
        private string _FileName = "";
        private LogFile _currentLOG;
        private byte _MaxKeySize;
        private StorageFile _archive;
        private IIndex _index;
        private Thread _IndexerThread;
        private bool _shutdown = false;
        private string _logExtension = ".mglog";
        private string _datExtension = ".mgdat";
        private string _idxExtension = ".mgidx";
        //private string _chkExtension = ".mgchk";
        private string _logString = "000000";
        private INDEXTYPE _idxType = INDEXTYPE.BTREE;
        private long _Count = -1;
        private bool _InMemoryIndex = false;


        public bool FreeCacheOnCommit = false;
        public int IndexingTimerSeconds = 1;
        public bool InMemoryIndex
        {
            get { return _InMemoryIndex; }
            set { _InMemoryIndex = value; _index.InMemrory = value; }
        }

        public static RaptorDB Open(string Filename, byte MaxKeysize, bool AllowDuplicateKeys, INDEXTYPE idxtype)
        {
            return new RaptorDB(Filename, MaxKeysize, AllowDuplicateKeys, idxtype);
        }

        public void SaveIndex()
        {
            _index.SaveIndex();
            List<string> pp = _deleteList;
            _deleteList = new List<string>();
            foreach (string s in pp)
                File.Delete(s);
        }

        public List<long> GetDuplicates(byte[] key)
        {
            List<long> dups = new List<long>();

            // get duplicates from memory  
            dups.AddRange(_currentLOG.GetDuplicates(key));
            foreach (var l in _logs)
                dups.AddRange(l.GetDuplicates(key));

            // get duplicates from index
            dups.AddRange(_index.GetDuplicates(key));

            return dups;
        }

        public byte[] FetchDuplicate(long offset)
        {
            return _archive.ReadData(offset);
        }

        public IEnumerable<KeyValuePair<byte[],byte[]>> EnumerateStorageFile()
        {
            return _archive.Traverse();
        }

        //public IEnumerator Enumerate(byte[] fromkey, int start, int count)
        //{
        //    // TODO : generate a list from the start key using forward only pages
        //    List<long> l = _index.Enumerate(fromkey, start, count);

        //    return null;
        //}


        public long Count()
        {
            if (_Count == -1)
            {
                long c = 0;
                foreach (var i in _logs)
                    c += i.CurrentCount;
                c += _currentLOG.CurrentCount;
                _Count = _index.Count() + c;
            }

            return _Count;
        }

        public bool Get(Guid guid, out byte[] val)
        {
            return Get(guid.ToByteArray(), out val);
        }

        public bool Get(string key, out byte[] val)
        {
            return Get(Helper.GetBytes(key), out val);
        }

        public bool Get(string key, out string val)
        {
            byte[] b;
            val = null;
            bool ok = Get(Helper.GetBytes(key), out b);
            if (ok)
            {
                val = Helper.GetString(b);
            }
            return ok;
        }

        public bool Get(byte[] key, out byte[] val)
        {
            long off;
            val = null;
            byte[] k = key;
            // check in current log
            off = _currentLOG.Get(k);
            if (off > -1)
            {
                // return data here
                val = _archive.ReadData(off);
                return true;
            }
            // check in older log files
            foreach (LogFile l in _logs)
            {
                off = l.Get(k);
                if (off > -1)
                {
                    // return data here
                    val = _archive.ReadData(off);
                    return true;
                }
            }

            // search btree here
            if (_index.Get(k, out off))
            {
                val = _archive.ReadData(off);
                return true;
            }
            return false;
        }

        public bool Set(Guid guid, byte[] data)
        {
            return Set(guid.ToByteArray(), data);
        }

        public bool Set(string key, string val)
        {
            return Set(Helper.GetBytes(key), Helper.GetBytes(val));
        }

        public bool Set(string key, byte[] data)
        {
            return Set(Helper.GetBytes(key), data);
        }

        private object _lock = new object();
        public bool Set(byte[] key, byte[] data)
        {
            long offset = -1;
            if (key.Length > _MaxKeySize)
            {
                throw new Exception("key greater than max key size of " + _MaxKeySize);
            }
            byte[] k = key;

            lock (_lock)
            {
                // save to storage
                offset = _archive.WriteData(k, data);
                // save to logfile
                _currentLOG.Set(k, offset);
                if (_currentLOG.CurrentCount > Global.MaxItemsBeforeIndexing)
                {
                    // new log file
                    _logs.Add(_currentLOG);
                    LogFile newlog = new LogFile(_Path + "\\" + _FileName + _logExtension, _currentLOG.Number + 1, _MaxKeySize, _logString);
                    _currentLOG = newlog;
                }
                _Count++;
            }
            return true;
        }

        #region [            P R I V A T E     M E T H O D S              ]

        private void Stop()
        {
            _shutdown = true;
            while (_indexing)
                Thread.Sleep(50);
            _index.Shutdown();
            _archive.Shutdown();
            _currentLOG.Shutdown();
        }

        private void Initialize(string filename, byte maxkeysize, bool AllowDuplicateKeys, INDEXTYPE idxtype)
        {
            _idxType = idxtype;
            _MaxKeySize = maxkeysize;

            _Path = Path.GetDirectoryName(filename);
            Directory.CreateDirectory(_Path);

            _FileName = Path.GetFileNameWithoutExtension(filename);
            string db = _Path + "\\" + _FileName + _datExtension;
            string idx = _Path + "\\" + _FileName + _idxExtension;

            if (_idxType == INDEXTYPE.BTREE)
                // setup database or load database
                _index = new BTree(idx, _MaxKeySize, Global.DEFAULTNODESIZE, AllowDuplicateKeys, Global.BUCKETCOUNT);
            else
                // hash index
                _index = new Hash(idx, _MaxKeySize, Global.DEFAULTNODESIZE, AllowDuplicateKeys, Global.BUCKETCOUNT);

            _archive = new StorageFile(db, _MaxKeySize);

            // load old log files
            LoadLogFiles();
            Count();

            AppDomain.CurrentDomain.ProcessExit += new EventHandler(CurrentDomain_ProcessExit);
            // create indexing thread
            _IndexerThread = new Thread(new ThreadStart(IndexThreadRunner));
            _IndexerThread.IsBackground = true;
            _IndexerThread.Start();
        }

        void CurrentDomain_ProcessExit(object sender, EventArgs e)
        {
            Stop();
        }

        int _timercounter = 0;
        private void IndexThreadRunner()
        {
            while (_shutdown == false)
            {
                _timercounter++;
                if (_timercounter > IndexingTimerSeconds)
                {
                    DoIndexing();
                    _timercounter = 0;
                }

                Thread.Sleep(1000);
            }
        }

        private bool _indexing = false;
        private int _lastNumber = -1;
        private double _runningtime = 0;
        List<string> _deleteList = new List<string>();

        private void DoIndexing()
        {
            if (_logs.Count == 0)
                return;
            else
            {
            }
            _indexing = true;
            _lastNumber++;
            DateTime dt = FastDateTime.Now;
            LogFile l = _logs.Find(delegate(LogFile f) { return f.Number == _lastNumber; });
            int count = 0;
            if (l != null)
            {
                Console.Write("(");
                // save duplicates
                if (l._duplicates != null)
                {
                    #region index duplicates
                    foreach (KeyValuePair<byte[], List<long>> kv in l._duplicates)
                    {
                        if (kv.Value != null)
                        {
                            foreach (long off in kv.Value)
                            {
                                _index.Set(kv.Key, off);
                                if (_shutdown)
                                {
                                    _index.Commit();
                                    _indexing = false;
                                    return;
                                }
                            }
                        }
                    }
                    #endregion
                }
                foreach (KeyValuePair<byte[], long> kv in l._memCache)
                {
                    _index.Set(kv.Key, kv.Value);
                    count++;
                    if (count % 1000 == 0)
                        Console.Write("*");
                    if (_shutdown)
                    {
                        _index.Commit();
                        _indexing = false;
                        return;
                    }
                }
                _index.Commit();
                l.Close();
                _logs.Remove(l);
                if (_index.InMemrory == false)
                    l.DeleteLog();
                else
                    _deleteList.Add(l.FileName);
                l = null;
            }
            _runningtime += FastDateTime.Now.Subtract(dt).TotalSeconds;
            Console.Write(") time sec = " + FastDateTime.Now.Subtract(dt).TotalSeconds);
            Console.Write(" Total = " + _runningtime);
            Console.WriteLine();
            GC.Collect(2);
            _indexing = false;
        }

        private void LoadLogFiles()
        {
            string[] fnames = Directory.GetFiles(_Path, _FileName + _logExtension + "*", SearchOption.TopDirectoryOnly);
            Array.Sort(fnames);
            if (fnames.Length > 0)
            {
                int i = 0;
                // rename log file to start from 0
                foreach (string f in fnames)
                {
                    File.Move(f, _Path + "\\" + _FileName + _logExtension + i.ToString(_logString));
                    i++;
                }
            }

            fnames = Directory.GetFiles(_Path, _FileName + _logExtension + "*", SearchOption.TopDirectoryOnly);
            Array.Sort(fnames);
            int lognum = 0;

            foreach (string fn in fnames)
            {
                if (File.Exists(fn))
                {
                    // Parse extension number
                    LogFile l = new LogFile();
                    l.FileName = fn;
                    l.Readonly = false;
                    l.Number = lognum++;

                    // load log data data
                    l.ReadLogFile(fn);

                    l.Readonly = true;
                    _logs.Add(l);
                }
            }
            _currentLOG = new LogFile(_Path + "\\" + _FileName + _logExtension, lognum, _MaxKeySize, _logString);
        }
        #endregion
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Architect -
United Kingdom United Kingdom
Mehdi first started programming when he was 8 on BBC+128k machine in 6512 processor language, after various hardware and software changes he eventually came across .net and c# which he has been using since v1.0.
He is formally educated as a system analyst Industrial engineer, but his programming passion continues.

* Mehdi is the 5th person to get 6 out of 7 Platinum's on Code-Project (13th Jan'12)
* Mehdi is the 3rd person to get 7 out of 7 Platinum's on Code-Project (26th Aug'16)

Comments and Discussions