Click here to Skip to main content
15,861,125 members
Articles / Database Development / NoSQL

RaptorDB - the Key Value Store

Rate me:
Please Sign up or sign in to vote.
4.89/5 (118 votes)
22 Jan 2012CPOL22 min read 904.9K   9.9K   266  
Smallest, fastest embedded nosql persisted dictionary using b+tree or MurMur hash indexing. (Now with Hybrid WAH bitmap indexes)
using System;
using System.Collections.Generic;
using System.Text;
using System.IO;
using System.Threading;
using System.Collections;

namespace RaptorDB
{
    public enum INDEXTYPE
    {
        BTREE = 0,
        HASH = 1
    }

    public class RaptorDB
    {
        private RaptorDB(string Filename, byte MaxKeysize, bool AllowDuplicateKeys, INDEXTYPE idxtype)
        {
            Initialize(Filename, MaxKeysize, AllowDuplicateKeys, idxtype);
        }

        private List<LogFile> _logs = new List<LogFile>();
        private string _Path = "";
        private string _FileName = "";
        private LogFile _currentLOG;
        private byte _MaxKeySize;
        private StorageFile _archive;
        private IIndex _index;
        private Thread _IndexerThread;
        private Thread _ThrottleThread;
        private bool _shutdown = false;
        private string _logExtension = ".mglog";
        private string _datExtension = ".mgdat";
        private string _idxExtension = ".mgidx";
        //private string _chkExtension = ".mgchk";
        private string _logString = "000000";
        private INDEXTYPE _idxType = INDEXTYPE.BTREE;
        private long _Count = -1;
        private bool _InMemoryIndex = false;
        private bool _PauseIndex = false;
        private bool _isInternalOPRunning = false;
        private bool _throttleInput = false;

        public int IndexingTimerSeconds = 1;

        public bool FreeCacheOnCommit
        {
            get { return Global.FreeMemoryOnCommit; }
            set { Global.FreeMemoryOnCommit = value; }
        }

        public bool InMemoryIndex
        {
            get { return _InMemoryIndex; }
            set { _InMemoryIndex = value; _index.InMemory = value; }
        }

        public static RaptorDB Open(string Filename, byte MaxKeysize, bool AllowDuplicateKeys, INDEXTYPE idxtype)
        {
            return new RaptorDB(Filename, MaxKeysize, AllowDuplicateKeys, idxtype);
        }

        //public void CompactFiles()
        //{
        //    if (_isInternalOPRunning)
        //        while (_isInternalOPRunning) Thread.Sleep(10);
        //
        //    _isInternalOPRunning = true;
        //    // FIX : add compact logic here
        //
        //
        //    _isInternalOPRunning = false;
        //}

        public void SaveIndex()
        {
            SaveIndex(false);
        }

        public void SaveIndex(bool flushMemoryToDisk)
        {
            if (_isInternalOPRunning)
                while (_isInternalOPRunning) Thread.Sleep(10);

            _isInternalOPRunning = true;
            // wait for indexing to stop
            while (_indexing) Thread.Sleep(10);

            if (flushMemoryToDisk)
            {
                // flush current memory log
                NewLog();
                // flush everything to indexes
                DoIndexing(true);
            }
            _index.SaveIndex();
            // delete log files on disk
            List<string> pp = _deleteList;
            _deleteList = new List<string>();
            foreach (string s in pp)
                File.Delete(s);
            _isInternalOPRunning = false;
        }

        public IEnumerable<int> GetDuplicates(byte[] key)
        {
            if (_isInternalOPRunning)
                while (_isInternalOPRunning) Thread.Sleep(10);

            List<int> dups = new List<int>();

            // get duplicates from index
            dups.AddRange(_index.GetDuplicates(key));

            // get duplicates from memory  
            dups.AddRange(_currentLOG.GetDuplicates(key));
            foreach (var l in _logs)
                dups.AddRange(l.GetDuplicates(key));
            return dups;
        }

        public byte[] FetchDuplicate(int offset)
        {
            if (_isInternalOPRunning)
                while (_isInternalOPRunning) Thread.Sleep(10);

            return _archive.ReadData(offset);
        }

        public IEnumerable<KeyValuePair<byte[], byte[]>> EnumerateStorageFile()
        {
            if (_isInternalOPRunning)
                while (_isInternalOPRunning) Thread.Sleep(10);

            return _archive.Traverse();
        }

        //public void RemoveKey(byte[] key)
        //{
        //    if (_isInternalOPRunning)
        //        while (_isInternalOPRunning) Thread.Sleep(10);
        //    // FIX : add remove logic here
        //}

        // TODO : add enumerate keys

        //public IEnumerator Enumerate(byte[] fromkey, int start, int count)
        //{
        //    // TODO : generate a list from the start key using forward only pages
        //    List<long> l = _index.Enumerate(fromkey, start, count);

        //    return null;
        //}

        public long Count()
        {
            if (_isInternalOPRunning)
                while (_isInternalOPRunning) Thread.Sleep(10);

            if (_Count == -1)
            {
                long c = 0;
                foreach (var i in _logs)
                    c += i.CurrentCount;
                c += _currentLOG.CurrentCount;
                _Count = _index.Count() + c;
            }

            return _Count;
        }

        public bool Get(Guid guid, out byte[] val)
        {
            return Get(guid.ToByteArray(), out val);
        }

        public bool Get(string key, out byte[] val)
        {
            return Get(Helper.GetBytes(key), out val);
        }

        public bool Get(string key, out string val)
        {
            byte[] b;
            val = null;
            bool ok = Get(Helper.GetBytes(key), out b);
            if (ok)
            {
                val = Helper.GetString(b);
            }
            return ok;
        }

        public bool Get(byte[] key, out byte[] val)
        {
            if (_isInternalOPRunning)
                while (_isInternalOPRunning) Thread.Sleep(10);

            int off;
            val = null;
            byte[] k = key;
            _PauseIndex = true;
            // check in current log
            off = _currentLOG.Get(k);
            if (off > -1)
            {
                // return data here
                val = _archive.ReadData(off);
                _PauseIndex = false;
                return true;
            }
            // check in older log files
            foreach (LogFile l in _logs)
            {
                off = l.Get(k);
                if (off > -1)
                {
                    // return data here
                    val = _archive.ReadData(off);
                    _PauseIndex = false;
                    return true;
                }
            }

            // search index here
            if (_index.Get(k, out off))
            {
                val = _archive.ReadData(off);
                _PauseIndex = false;
                return true;
            }
            _PauseIndex = false;
            return false;
        }

        public bool Set(Guid guid, byte[] data)
        {
            return Set(guid.ToByteArray(), data);
        }

        public bool Set(string key, string val)
        {
            return Set(Helper.GetBytes(key), Helper.GetBytes(val));
        }

        public bool Set(string key, byte[] data)
        {
            return Set(Helper.GetBytes(key), data);
        }

        private object _lock = new object();
        public bool Set(byte[] key, byte[] data)
        {
            if (_isInternalOPRunning)
                while (_isInternalOPRunning) Thread.Sleep(10);

            int recno = -1;
            if (key.Length > _MaxKeySize)
            {
                throw new Exception("key greater than max key size of " + _MaxKeySize);
            }
            byte[] k = key;
            if (_throttleInput)
                Thread.Sleep(30);
            lock (_lock)
            {
                // save to storage
                recno = _archive.WriteData(k, data);
                // save to logfile
                _currentLOG.Set(k, recno);
                if (_currentLOG.CurrentCount > Global.MaxItemsBeforeIndexing)
                    NewLog();

                _Count++;
            }
            return true;
        }

        public void Shutdown()
        {
            _shutdown = true;
            while (_indexing)
                Thread.Sleep(50);
            if (_index != null)
                _index.Shutdown();
            if (_archive != null)
                _archive.Shutdown();
            if (_currentLOG != null)
                _currentLOG.Shutdown();
            _index = null;
            _archive = null;
            _currentLOG = null;
        }

        #region [            P R I V A T E     M E T H O D S              ]

        private void NewLog()
        {
            // new log file
            _logs.Add(_currentLOG);
            LogFile newlog = new LogFile(_Path + "\\" + _FileName + _logExtension, _currentLOG.Number + 1, _MaxKeySize, _logString);
            _currentLOG = newlog;
        }

        private void Initialize(string filename, byte maxkeysize, bool AllowDuplicateKeys, INDEXTYPE idxtype)
        {
            _idxType = idxtype;
            _MaxKeySize = maxkeysize;

            _Path = Path.GetDirectoryName(filename);
            Directory.CreateDirectory(_Path);

            _FileName = Path.GetFileNameWithoutExtension(filename);
            string db = _Path + "\\" + _FileName + _datExtension;
            string idx = _Path + "\\" + _FileName + _idxExtension;

            if (_idxType == INDEXTYPE.BTREE)
                // setup database or load database
                _index = new BTree(idx, _MaxKeySize, Global.DEFAULTNODESIZE, AllowDuplicateKeys, Global.BUCKETCOUNT);
            else
                // hash index
                _index = new Hash(idx, _MaxKeySize, Global.DEFAULTNODESIZE, AllowDuplicateKeys, Global.BUCKETCOUNT);

            _archive = new StorageFile(db, _MaxKeySize);

            _archive.SkipDateTime = true;

            // load old log files
            LoadLogFiles();
            Count();

            AppDomain.CurrentDomain.ProcessExit += new EventHandler(CurrentDomain_ProcessExit);
            // create indexing thread
            _IndexerThread = new Thread(new ThreadStart(IndexThreadRunner));
            _IndexerThread.IsBackground = true;
            _IndexerThread.Start();

            //_ThrottleThread = new Thread(new ThreadStart(Throttle));
            //_ThrottleThread.IsBackground = true;
            //_ThrottleThread.Start();
        }

        private void CurrentDomain_ProcessExit(object sender, EventArgs e)
        {
            Shutdown();
        }

        //private int _throttlecount = 0;
        //private void Throttle()
        //{
        //    while (_shutdown == false)
        //    {
        //        _throttlecount++;
        //        if (_throttlecount > 20)
        //        {
        //            _throttlecount = 0;
        //            if (Helper.GetFreeMemory() < Global.ThrottleWhenFreemMemoryLessThan)
        //                _throttleInput = true;
        //            else
        //                _throttleInput = false;
        //        }
        //    }
        //}

        private int _timercounter = 0;
        private void IndexThreadRunner()
        {
            while (_shutdown == false)
            {
                _timercounter++;
                if (_timercounter > IndexingTimerSeconds)
                {
                    DoIndexing(false);
                    _timercounter = 0;
                }

                Thread.Sleep(1000);
            }
        }

        private bool _indexing = false;
        List<string> _deleteList = new List<string>();

        private void DoIndexing(bool flushMode)
        {
            while (_logs.Count > 0)
            {
                if (_shutdown)
                {
                    _indexing = false;
                    return;
                }
                if (flushMode == false)
                    if (_isInternalOPRunning)
                        while (_isInternalOPRunning && _shutdown == false) Thread.Sleep(10);

                if (_logs.Count == 0)
                    return;
                _indexing = true;
                LogFile l = _logs[0];

                if (_logs.Count > Global.ThrottleInputWhenLogCount)
                    _throttleInput = true;
                else
                    _throttleInput = false;

                if (l != null)
                {
                    // save duplicates
                    if (l._duplicates != null)
                    {
                        #region index memory duplicates
                        foreach (KeyValuePair<byte[], List<int>> kv in l._duplicates)
                        {
                            if (kv.Value != null)
                            {
                                foreach (int off in kv.Value)
                                {
                                    _index.Set(kv.Key, off);
                                    
                                    if (flushMode == false)
                                    {
                                        if (_shutdown)
                                        {
                                            _index.Commit();
                                            _indexing = false;
                                            return;
                                        }
                                        while (_PauseIndex)
                                            Thread.Sleep(10);
                                    }
                                }
                            }
                        }
                        #endregion
                    }
                    foreach (KeyValuePair<byte[], int> kv in l._memCache)
                    {
                        #region index data in cache
                        _index.Set(kv.Key, kv.Value);
                        
                        if (flushMode == false)
                        {
                            if (_shutdown)
                            {
                                _index.Commit();
                                _indexing = false;
                                return;
                            }
                            while (_PauseIndex)
                                Thread.Sleep(10);
                        }
                        #endregion
                    }
                    _index.Commit();
                    l.Close();
                    _logs.Remove(l);
                    if (_index.InMemory == false)
                        l.DeleteLog();
                    else
                        _deleteList.Add(l.FileName);
                    l = null;
                }
                //GC.Collect(2);
                _indexing = false;
                if (flushMode == false)
                {
                    if (_logs.Count == 1)
                        return;
                }
            }
        }

        private void LoadLogFiles()
        {
            string[] fnames = Directory.GetFiles(_Path, _FileName + _logExtension + "*", SearchOption.TopDirectoryOnly);
            Array.Sort(fnames);
            if (fnames.Length > 0)
            {
                int i = 0;
                // rename log file to start from 0
                foreach (string f in fnames)
                {
                    File.Move(f, _Path + "\\" + _FileName + _logExtension + i.ToString(_logString));
                    i++;
                }
            }

            fnames = Directory.GetFiles(_Path, _FileName + _logExtension + "*", SearchOption.TopDirectoryOnly);
            Array.Sort(fnames);
            int lognum = 0;

            foreach (string fn in fnames)
            {
                if (File.Exists(fn))
                {
                    // Parse extension number
                    LogFile l = new LogFile();
                    l.FileName = fn;
                    l.Readonly = false;
                    l.Number = lognum++;

                    // load log data data
                    l.ReadLogFile(fn);

                    l.Readonly = true;
                    _logs.Add(l);
                }
            }
            _currentLOG = new LogFile(_Path + "\\" + _FileName + _logExtension, lognum, _MaxKeySize, _logString);
        }
        #endregion
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Architect -
United Kingdom United Kingdom
Mehdi first started programming when he was 8 on BBC+128k machine in 6512 processor language, after various hardware and software changes he eventually came across .net and c# which he has been using since v1.0.
He is formally educated as a system analyst Industrial engineer, but his programming passion continues.

* Mehdi is the 5th person to get 6 out of 7 Platinum's on Code-Project (13th Jan'12)
* Mehdi is the 3rd person to get 7 out of 7 Platinum's on Code-Project (26th Aug'16)

Comments and Discussions