Click here to Skip to main content
Click here to Skip to main content
Articles » Database » NoSQL » General » Downloads
 
Add your own
alternative version

RavenDB - An Introduction

, , 28 Apr 2010 CPOL
An introduction to RavenDB - a new open source .NET document database using .NET 4.0 and VS 2010
Raven_DB.zip
RavenDB
.gitignore
ayende-open-source.snk
build
default.ps1
psake.ps1
psake_ext.ps1
Raven.Client.Tests
Document
Properties
Shard
Transactions
Raven.Client
Client
Document
Properties
Shard
ShardStrategy
ShardAccess
ShardResolution
ShardSelection
Util
Raven.Database
Data
Defaults
Exceptions
Extensions
Indexing
Json
Linq
PrivateExtensions
Properties
Storage
Tasks
Raven.FileStorage
Properties
Raven.Importer
Properties
Raven.Sample.ShardClient
Properties
Raven.Sample.SimpleClient
Properties
Raven.Scenarios
Properties
Scenarios
AddDocument_AddIndex_QueryIndexForDoc.saz
AfterTooManyErrorsDocumentIndexingForTheIndexWillBeDisabled.saz
AfterTooManyErrorsIndexWillBeDisabled.saz
BatchAndGetDocuments.saz
BatchQueryById.saz
BrowseIndex.saz
CanGenerateIdentityValues.saz
CanGetMetadataWhenLookingAtAllDocs.saz
CanGetMetadataWhenLookingAtAnIndex.saz
CannotDeleteStaticWithoutAccruateETag.saz
CanPutAndDeleteDocumentWithETag.saz
CanUseCollectionNamesInIndexes.saz
DeleteStatic.saz
DeletingDocumentThatDoesNotExistsAlwaysWorks.saz
DeletingWithBadETagWillFail.saz
IndexingErrorsWillShowUpInTheStats.saz
IndexingFunctionWithMultipleResultsPerSourceDocument.saz
PatchDocument.saz
PutAndGetAttachment.saz
PutAndGetDocument.saz
PutAndGetDocumentWithMetadataViaHeaders.saz
PutAndGetIndex.saz
PuttingDocumentUsingTransaction.saz
QueryMapReduceIndex.saz
QueryUsingIndex.saz
RecordHeadersForStatics.saz
SelectFieldsFromIndex.saz
TryingToUpdateAttachmentWithBadETagWillFail.saz
TryingToUpdateDocumentWithBadETagWillFail.saz
TryingToUpdateDocumentWithNonGuidETagWillFail.saz
UpdateAttachemtnUsingETag.saz
UpdatingDocsWillUpdateMapReduceResult.saz
WhenDeletingDocsWillUpdateMapReduceIndex.saz
WillDeleteDocumentFromIndex.saz
WillSupport304.saz
WillSupport304ForAttachments.saz
Raven.Server
Defaults
default.json
PowerShellProvider
Properties
Responders
WebUI
css
smoothness
images
ui-bg_flat_0_aaaaaa_40x100.png
ui-bg_flat_75_ffffff_40x100.png
ui-bg_glass_55_fbf9ee_1x400.png
ui-bg_glass_65_ffffff_1x400.png
ui-bg_glass_75_dadada_1x400.png
ui-bg_glass_75_e6e6e6_1x400.png
ui-bg_glass_95_fef1ec_1x400.png
ui-bg_highlight-soft_75_cccccc_1x100.png
ui-icons_222222_256x240.png
ui-icons_2e83ff_256x240.png
ui-icons_454545_256x240.png
ui-icons_888888_256x240.png
ui-icons_cd0a0a_256x240.png
favicon.ico
images
ajax-loader.gif
bgR.png
c1_i1.png
c2_i2.png
c2_i3.png
c2_i4.png
c2_i5.png
c2_i6.png
c3_i7.png
footer_bg.png
footerContainerBgR.png
header_bg.png
jsonEditor
doc.gif
docNode.gif
docNodeLast.gif
docNodeLastFirst.gif
folder.gif
folderNode.gif
folderNodeFirst.gif
folderNodeLast.gif
folderNodeLastFirst.gif
folderNodeOpen.gif
folderNodeOpenFirst.gif
folderNodeOpenLast.gif
folderNodeOpenLastFirst.gif
folderOpen.gif
vertLine.gif
logo.png
midBox1.png
midBox2Arrow.png
sideBarArrow.png
sideBarListBoxBottom.png
sideBarListBoxTop.png
submit.png
topNavSep.png
js
jstree
themes
default
dot_for_ie.gif
icons.png
throbber.gif
icons
txt.png
rdb.jsonEditor
JSONTemplates
Raven.Tests
Indexes
Linq
Patching
Properties
Storage
Transactions
Views
Raven.Tryouts
Properties
RavenDB.suo
Release
SharedLibs
Esent.Interop.dll
Esent.Interop.pdb
ICSharpCode.NRefactory.dll
ICSharpCode.SharpZipLib.dll
log4net.dll
Lucene.Net.dll
Lucene.Net.pdb
Newtonsoft.Json.dll
Rhino.Mocks.dll
SgmlReaderDll.dll
SgmlReaderDll.pdb
System.Management.Automation.dll
xunit.dll
xunit.extensions.dll
Tools
ILMerge.exe
xUnit
xunit.console.exe
xunit.dll
xunit.extensions.dll
xunit.gui.exe
xunit.installer.exe
XUnit.NAntTasks.dll
xunit.runner.msbuild.dll
xunit.runner.resharper.dll
xunit.runner.tdnet.dll
xunit.runner.utility.dll
zip.exe
Utilities
Binaries
ILMerge.exe
Newtonsoft.Json.dll
Raven.DefaultDatabase.Creator.exe
Raven.DefaultDatabase.Creator.pdb
Raven.Merger.exe
Raven.Merger.pdb
SgmlReaderDll.dll
SgmlReaderDll.pdb
Raven.DefaultDatabase.Creator
Properties
Raven.Merger
Properties
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using log4net;
using Microsoft.Isam.Esent.Interop;
using Newtonsoft.Json.Linq;
using Raven.Database.Data;
using Raven.Database.Exceptions;
using Raven.Database.Extensions;
using Raven.Database.Tasks;

namespace Raven.Database.Storage
{
	[CLSCompliant(false)]
	public class DocumentStorageActions : IDisposable
	{
		protected readonly JET_DBID dbid;
		public Table Documents { get; set; }
		protected readonly IDictionary<string, JET_COLUMNID> documentsColumns;
		public Table Transactions { get; set; }
		protected readonly IDictionary<string, JET_COLUMNID> transactionsColumns;

		public Table DocumentsModifiedByTransactions { get; set; }
		protected readonly IDictionary<string, JET_COLUMNID> documentsModifiedByTransactionsColumns;

		public Table Files { get; set; }
		protected readonly IDictionary<string, JET_COLUMNID> filesColumns;
		public Table IndexesStats { get; private set; }
		private readonly IDictionary<string, JET_COLUMNID> indexesStatsColumns;
		protected readonly ILog logger = LogManager.GetLogger(typeof (DocumentStorageActions));
		public Table MappedResults { get; private set; }
		private readonly IDictionary<string, JET_COLUMNID> mappedResultsColumns;
		protected readonly Session session;
		public Table Tasks { get; set; }
		protected readonly IDictionary<string, JET_COLUMNID> tasksColumns;
		private readonly Transaction transaction;
		private int innerTxCount;
		public Table Identity { get; set; }
		protected readonly IDictionary<string, JET_COLUMNID> identityColumns;
		
		[CLSCompliant(false)]
		[DebuggerHidden, DebuggerNonUserCode, DebuggerStepThrough]
		public DocumentStorageActions(
			JET_INSTANCE instance,
		    string database,
		    IDictionary<string, JET_COLUMNID> documentsColumns,
		    IDictionary<string, JET_COLUMNID> tasksColumns,
		    IDictionary<string, JET_COLUMNID> filesColumns,
		    IDictionary<string, JET_COLUMNID> indexesStatsColumns,
		    IDictionary<string, JET_COLUMNID> mappedResultsColumns, 
            IDictionary<string, JET_COLUMNID> documentsModifiedByTransactionsColumns, 
			IDictionary<string, JET_COLUMNID> transactionsColumns, 
			IDictionary<string, JET_COLUMNID> identityColumns)
		{
			try
			{
				session = new Session(instance);
				transaction = new Transaction(session);
				Api.JetOpenDatabase(session, database, null, out dbid, OpenDatabaseGrbit.None);

				Documents = new Table(session, dbid, "documents", OpenTableGrbit.None);
				Tasks = new Table(session, dbid, "tasks", OpenTableGrbit.None);
				Files = new Table(session, dbid, "files", OpenTableGrbit.None);
				IndexesStats = new Table(session, dbid, "indexes_stats", OpenTableGrbit.None);
				MappedResults = new Table(session, dbid, "mapped_results", OpenTableGrbit.None);
                DocumentsModifiedByTransactions = new Table(session, dbid, "documents_modified_by_transaction", OpenTableGrbit.None);
			    Transactions = new Table(session, dbid, "transactions", OpenTableGrbit.None);
				Identity = new Table(session, dbid, "identity_table", OpenTableGrbit.None);

				this.documentsColumns = documentsColumns;
				this.tasksColumns = tasksColumns;
				this.filesColumns = filesColumns;
				this.indexesStatsColumns = indexesStatsColumns;
				this.mappedResultsColumns = mappedResultsColumns;
                this.documentsModifiedByTransactionsColumns = documentsModifiedByTransactionsColumns;
                this.transactionsColumns = transactionsColumns;
				this.identityColumns = identityColumns;
			}
			catch (Exception)
			{
				Dispose();
				throw;
			}
		}

		public bool CommitCalled { get; set; }

		public IEnumerable<string> DocumentKeys
		{
			get
			{
				Api.MoveBeforeFirst(session, Documents);
				while (Api.TryMoveNext(session, Documents))
				{
					yield return
						Api.RetrieveColumnAsString(session, Documents, documentsColumns["key"], Encoding.Unicode);
				}
			}
		}

		#region IDisposable Members

		[DebuggerHidden, DebuggerNonUserCode, DebuggerStepThrough]
		public void Dispose()
		{
			if (Identity != null)
				Identity.Dispose();
            if(Transactions != null)
                Transactions.Dispose();

            if (DocumentsModifiedByTransactions!=null)
                DocumentsModifiedByTransactions.Dispose();

			if (MappedResults != null)
				MappedResults.Dispose();

			if (IndexesStats != null)
				IndexesStats.Dispose();

			if (Files != null)
				Files.Dispose();

			if (Documents != null)
				Documents.Dispose();

			if (Tasks != null)
				Tasks.Dispose();

			if (Equals(dbid, JET_DBID.Nil) == false && session != null)
				Api.JetCloseDatabase(session.JetSesid, dbid, CloseDatabaseGrbit.None);

			if (transaction != null)
				transaction.Dispose();

			if (session != null)
				session.Dispose();
		}

		#endregion

		public JsonDocument DocumentByKey(string key, TransactionInformation transactionInformation)
		{
		    byte[] data;
		    if (transactionInformation != null)
            {
                Api.JetSetCurrentIndex(session, DocumentsModifiedByTransactions, "by_key");
                Api.MakeKey(session, DocumentsModifiedByTransactions, key, Encoding.Unicode, MakeKeyGrbit.NewKey);
                if(Api.TrySeek(session, DocumentsModifiedByTransactions,SeekGrbit.SeekEQ))
                {
                    var txId = Api.RetrieveColumn(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["locked_by_transaction"]);
                    if(new Guid(txId) == transactionInformation.Id)
                    {
                        if (Api.RetrieveColumnAsBoolean(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["delete_document"]) == true)
                        {
                            logger.DebugFormat("Document with key '{0}' was deleted in transaction: {1}", key, transactionInformation.Id);
                            return null;
                        }
                        data = Api.RetrieveColumn(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["data"]);
                        logger.DebugFormat("Document with key '{0}' was found in transaction: {1}", key, transactionInformation.Id);
                        return new JsonDocument
                        {
                            Data = data,
                            Etag = new Guid(Api.RetrieveColumn(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["etag"])),
                            Key = Api.RetrieveColumnAsString(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["key"], Encoding.Unicode),
                            Metadata = JObject.Parse(Api.RetrieveColumnAsString(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["metadata"]))
                        };
                    }
                }
            }

			Api.JetSetCurrentIndex(session, Documents, "by_key");
			Api.MakeKey(session, Documents, key, Encoding.Unicode, MakeKeyGrbit.NewKey);
			if (Api.TrySeek(session, Documents, SeekGrbit.SeekEQ) == false)
			{
				logger.DebugFormat("Document with key '{0}' was not found", key);
				return null;
			}
			data = Api.RetrieveColumn(session, Documents, documentsColumns["data"]);
			logger.DebugFormat("Document with key '{0}' was found", key);
			return new JsonDocument
			{
				Data = data,
				Etag = new Guid(Api.RetrieveColumn(session, Documents, documentsColumns["etag"])),
				Key = Api.RetrieveColumnAsString(session, Documents, documentsColumns["key"], Encoding.Unicode),
				Metadata = JObject.Parse(Api.RetrieveColumnAsString(session, Documents, documentsColumns["metadata"]))
			};
		}

		public void Commit()
		{
			if (innerTxCount != 0)
				return;

			CommitCalled = true;
			transaction.Commit(CommitTransactionGrbit.None);
		}

		public int GetNextIdentityValue(string name)
		{
			Api.JetSetCurrentIndex(session, Identity, "by_key");
			Api.MakeKey(session, Identity, name, Encoding.Unicode, MakeKeyGrbit.NewKey);
			if (Api.TrySeek(session, Identity, SeekGrbit.SeekEQ) == false)
			{
				using(var update = new Update(session, Identity, JET_prep.Insert))
				{
					Api.SetColumn(session, Identity, identityColumns["key"],name, Encoding.Unicode);
					Api.SetColumn(session, Identity, identityColumns["val"], 1);

					update.Save();
				}
				return 1;
			}

			return Api.EscrowUpdate(session, Identity, identityColumns["val"], 1) + 1;
		}

		public Tuple<int, int> FirstAndLastDocumentIds()
		{
			var item1 = 0;
			var item2 = 0;
			Api.JetSetCurrentIndex(session, Documents, "by_id");
			Api.MoveBeforeFirst(session, Documents);
			if (Api.TryMoveNext(session, Documents))
				item1 = Api.RetrieveColumnAsInt32(session, Documents, documentsColumns["id"]).Value;
			Api.MoveAfterLast(session, Documents);
			if (Api.TryMovePrevious(session, Documents))
				item2 = Api.RetrieveColumnAsInt32(session, Documents, documentsColumns["id"]).Value;
			return new Tuple<int, int>(item1, item2);
		}

		public bool DoesTasksExistsForIndex(string name)
		{
			Api.JetSetCurrentIndex(session, Tasks, "by_index");
			Api.MakeKey(session, Tasks, name, Encoding.Unicode, MakeKeyGrbit.NewKey);
			if (Api.TrySeek(session, Tasks, SeekGrbit.SeekEQ) == false)
			{
				Api.MakeKey(session, Tasks, "*", Encoding.Unicode, MakeKeyGrbit.NewKey);
				return Api.TrySeek(session, Tasks, SeekGrbit.SeekEQ);
			}
			return true;
		}

		public IEnumerable<Tuple<JsonDocument, int>> DocumentsById(Reference<bool> hasMoreWork, int startId, int endId,
		                                                           int limit)
		{
			Api.JetSetCurrentIndex(session, Documents, "by_id");
			Api.MakeKey(session, Documents, startId, MakeKeyGrbit.NewKey);
			// this sholdn't really happen, it means that the doc is missing
			// probably deleted before we can get it?
			if (Api.TrySeek(session, Documents, SeekGrbit.SeekGE) == false)
			{
				logger.DebugFormat("Document with id {0} or higher was not found", startId);
				yield break;
			}
			var count = 0;
			do
			{
				if ((++count) > limit)
				{
					hasMoreWork.Value = true;
					yield break;
				}
				var id = Api.RetrieveColumnAsInt32(session, Documents, documentsColumns["id"],
				                                   RetrieveColumnGrbit.RetrieveFromIndex).Value;
				if (id > endId)
					break;

				var data = Api.RetrieveColumn(session, Documents, documentsColumns["data"]);
				logger.DebugFormat("Document with id '{0}' was found, doc length: {1}", id, data.Length);
				var json = Api.RetrieveColumnAsString(session, Documents, documentsColumns["metadata"],
				                                      Encoding.Unicode);
				var doc = new JsonDocument
				{
					Key = Api.RetrieveColumnAsString(session, Documents, documentsColumns["key"], Encoding.Unicode),
					Data = data,
					Etag = new Guid(Api.RetrieveColumn(session, Documents, documentsColumns["etag"])),
					Metadata = JObject.Parse(json)
				};
				yield return new Tuple<JsonDocument, int>(doc, id);
			} while (Api.TryMoveNext(session, Documents));
			hasMoreWork.Value = false;
		}

		public Guid AddDocument(string key, string data, Guid? etag, string metadata)
		{
			Api.JetSetCurrentIndex(session, Documents, "by_key");
			Api.MakeKey(session, Documents, key, Encoding.Unicode, MakeKeyGrbit.NewKey);
			var isUpdate = Api.TrySeek(session, Documents, SeekGrbit.SeekEQ);
			if (isUpdate)
			{
			    EnsureNotLockedByTransaction(key, null);
			    EnsureDocumentEtagMatch(key, etag, "PUT");
			}
			else
			{
                EnsureDocumentIsNotCreatedInAnotherTransaction(key, Guid.NewGuid());
			}
		    Guid newEtag;
			DocumentDatabase.UuidCreateSequential(out newEtag);

			using (var update = new Update(session, Documents, isUpdate ? JET_prep.Replace : JET_prep.Insert))
			{
				Api.SetColumn(session, Documents, documentsColumns["key"], key, Encoding.Unicode);
				Api.SetColumn(session, Documents, documentsColumns["data"], Encoding.UTF8.GetBytes(data));
				Api.SetColumn(session, Documents, documentsColumns["etag"], newEtag.ToByteArray());
				Api.SetColumn(session, Documents, documentsColumns["metadata"], metadata, Encoding.Unicode);

				update.Save();
			}
			logger.DebugFormat("Inserted a new document with key '{0}', doc length: {1}, update: {2}, ",
			                   key, data.Length, isUpdate);

		    return newEtag;
		}

	    private void EnsureDocumentEtagMatch(string key, Guid? etag, string method)
	    {
	        var existingEtag = new Guid(Api.RetrieveColumn(session, Documents, documentsColumns["etag"]));
	        if (existingEtag != etag && etag != null)
	        {
	            throw new ConcurrencyException(method + " attempted on document '" + key +
	                                           "' using a non current etag")
	            {
	                ActualETag = etag.Value,
	                ExpectedETag = existingEtag
	            };
	        }
	    }

        private void EnsureDocumentEtagMatchInTransaction(string key, Guid? etag)
        {
            Api.JetSetCurrentIndex(session, DocumentsModifiedByTransactions, "by_key");
            Api.MakeKey(session, DocumentsModifiedByTransactions, key, Encoding.Unicode, MakeKeyGrbit.NewKey);
            Guid existingEtag;
            if (Api.TrySeek(session, DocumentsModifiedByTransactions, SeekGrbit.SeekEQ))
            {
                if (Api.RetrieveColumnAsBoolean(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["delete_document"]) == true)
                    return; // we ignore etags on deleted documents
                existingEtag = new Guid(Api.RetrieveColumn(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["etag"]));
            }
            else
            {
                existingEtag = new Guid(Api.RetrieveColumn(session, Documents, documentsColumns["etag"]));
            }
            if (existingEtag != etag && etag != null)
            {
                throw new ConcurrencyException("PUT attempted on document '" + key +
                                               "' using a non current etag")
                {
                    ActualETag = etag.Value,
                    ExpectedETag = existingEtag
                };
            }
        }

	    public Guid AddDocumentInTransaction(TransactionInformation transactionInformation, string key, string data, Guid? etag, string metadata)
        {
            Api.JetSetCurrentIndex(session, Documents, "by_key");
            Api.MakeKey(session, Documents, key, Encoding.Unicode, MakeKeyGrbit.NewKey);
            var isUpdate = Api.TrySeek(session, Documents, SeekGrbit.SeekEQ);
            if (isUpdate)
            {
                EnsureNotLockedByTransaction(key, transactionInformation.Id);
                EnsureDocumentEtagMatchInTransaction(key, etag);
                using(var update = new Update(session, Documents,JET_prep.Replace))
                {
                    Api.SetColumn(session, Documents, documentsColumns["locked_by_transaction"], transactionInformation.Id.ToByteArray()); 
                    update.Save();
                }
            }
            else
            {
                EnsureDocumentIsNotCreatedInAnotherTransaction(key, transactionInformation.Id);
            }
            EnsureTransactionExists(transactionInformation);
            Guid newEtag;
            DocumentDatabase.UuidCreateSequential(out newEtag);

            Api.JetSetCurrentIndex(session, DocumentsModifiedByTransactions, "by_key");
            Api.MakeKey(session, DocumentsModifiedByTransactions, key, Encoding.Unicode, MakeKeyGrbit.NewKey);
	        var isUpdateInTransaction = Api.TrySeek(session, DocumentsModifiedByTransactions, SeekGrbit.SeekEQ);

            using (var update = new Update(session, DocumentsModifiedByTransactions, isUpdateInTransaction ? JET_prep.Replace : JET_prep.Insert))
            {
                Api.SetColumn(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["key"], key, Encoding.Unicode);
                Api.SetColumn(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["data"], Encoding.UTF8.GetBytes(data));
                Api.SetColumn(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["etag"], newEtag.ToByteArray());
                Api.SetColumn(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["metadata"], metadata, Encoding.Unicode);
                Api.SetColumn(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["delete_document"], false);
                Api.SetColumn(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["locked_by_transaction"], transactionInformation.Id.ToByteArray());

                update.Save();
            }
	        logger.DebugFormat("Inserted a new document with key '{0}', doc length: {1}, update: {2}, in transaction: {3}",
	                           key, data.Length, isUpdate, transactionInformation.Id);

	        return newEtag;
        }

	    private void EnsureDocumentIsNotCreatedInAnotherTransaction(string key, Guid txId)
	    {
	        Api.JetSetCurrentIndex(session, DocumentsModifiedByTransactions,"by_key");
	        Api.MakeKey(session,DocumentsModifiedByTransactions, key, Encoding.Unicode, MakeKeyGrbit.NewKey);
            if (Api.TrySeek(session, DocumentsModifiedByTransactions, SeekGrbit.SeekEQ) == false)
                return;
	        byte[] docTxId = Api.RetrieveColumn(session, DocumentsModifiedByTransactions,documentsModifiedByTransactionsColumns["locked_by_transaction"]);
            if(new Guid(docTxId) != txId)
	        {
	            throw new ConcurrencyException("A document with key: '" + key+"' is currently created in another transaction");
	        }
	    }

	    private void EnsureTransactionExists(TransactionInformation transactionInformation)
	    {
            Api.JetSetCurrentIndex(session, Transactions, "by_tx_id");
            Api.MakeKey(session, Transactions, transactionInformation.Id.ToByteArray(), MakeKeyGrbit.NewKey);
	        var isUpdate = Api.TrySeek(session, Transactions, SeekGrbit.SeekEQ);
            using(var update = new Update(session, Transactions, isUpdate ? JET_prep.Replace : JET_prep.Insert))
            {
                Api.SetColumn(session, Transactions,transactionsColumns["tx_id"], transactionInformation.Id.ToByteArray());
                Api.SetColumn(session, Transactions, transactionsColumns["timeout"],
                              DateTime.UtcNow + transactionInformation.Timeout);
                update.Save();
            }
	    }

	    public void DeleteDocument(string key, Guid? etag)
		{
			Api.JetSetCurrentIndex(session, Documents, "by_key");
			Api.MakeKey(session, Documents, key, Encoding.Unicode, MakeKeyGrbit.NewKey);
			if (Api.TrySeek(session, Documents, SeekGrbit.SeekEQ) == false)
			{
				logger.DebugFormat("Document with key '{0}' was not found, and considered deleted", key);
				return;
			}

			EnsureDocumentEtagMatch(key, etag,"DELETE");
		    EnsureNotLockedByTransaction(key, null);

		    Api.JetDelete(session, Documents);
			logger.DebugFormat("Document with key '{0}' was deleted", key);
		}


        public void DeleteDocumentInTransaction(TransactionInformation transactionInformation, string key, Guid? etag)
        {
            Api.JetSetCurrentIndex(session, Documents, "by_key");
            Api.MakeKey(session, Documents, key, Encoding.Unicode, MakeKeyGrbit.NewKey);
            if (Api.TrySeek(session, Documents, SeekGrbit.SeekEQ) == false)
            {
                logger.DebugFormat("Document with key '{0}' was not found, and considered deleted", key);
                return;
            }

            EnsureNotLockedByTransaction(key, transactionInformation.Id);
            EnsureDocumentEtagMatchInTransaction(key, etag);

            using (var update = new Update(session, Documents, JET_prep.Replace))
            {
                Api.SetColumn(session, Documents, documentsColumns["locked_by_transaction"], transactionInformation.Id.ToByteArray());
                update.Save();
            }
            EnsureTransactionExists(transactionInformation);

            Guid newEtag;
            DocumentDatabase.UuidCreateSequential(out newEtag);

            Api.JetSetCurrentIndex(session, DocumentsModifiedByTransactions, "by_key");
            Api.MakeKey(session, DocumentsModifiedByTransactions, key, Encoding.Unicode, MakeKeyGrbit.NewKey);
            var isUpdateInTransaction = Api.TrySeek(session, DocumentsModifiedByTransactions, SeekGrbit.SeekEQ);

            using (var update = new Update(session, DocumentsModifiedByTransactions, isUpdateInTransaction ? JET_prep.Replace : JET_prep.Insert))
            {
                Api.SetColumn(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["key"], key, Encoding.Unicode);
                Api.SetColumn(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["data"], 
                    Api.RetrieveColumn(session,Documents, documentsColumns["data"]));
                Api.SetColumn(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["etag"], newEtag.ToByteArray());
                Api.SetColumn(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["metadata"],
                    Api.RetrieveColumnAsString(session, Documents, documentsColumns["metadata"],Encoding.Unicode), Encoding.Unicode);
                Api.SetColumn(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["delete_document"], true);
                Api.SetColumn(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["locked_by_transaction"], transactionInformation.Id.ToByteArray());

                update.Save();
            }
        }

	    private void EnsureNotLockedByTransaction(string key, Guid? currentTransaction)
	    {
	        byte[] txId = Api.RetrieveColumn(session, Documents, documentsColumns["locked_by_transaction"]);
	        if (txId == null)
	        {
	            return;
	        }
	        var guid = new Guid(txId);
            if (currentTransaction != null && guid == currentTransaction.Value)
                return;

	        Api.JetSetCurrentIndex(session, Transactions, "by_tx_id");
            Api.MakeKey(session, Transactions, txId,MakeKeyGrbit.NewKey);
            if(Api.TrySeek(session, Transactions, SeekGrbit.SeekEQ)==false)
            {
                //This is a bug, probably... because it means that we have a missing
                // transaction, we are going to reset it
                ResetTransactionOnCurrentDocument();
                return;
            }
	        var timeout = Api.RetrieveColumnAsDateTime(session, Transactions, transactionsColumns["timeout"]);
            if(DateTime.UtcNow > timeout)// the timeout for the transaction has passed
            {
                RollbackTransaction(guid);
                return;
            }
	        throw new ConcurrencyException("Document '" + key + "' is locked by transacton: " + guid);
	    }

	    public void RollbackTransaction(Guid txId)
	    {
	        CompleteTransaction(txId, doc =>
	        {
	            Api.MakeKey(session, Documents, doc.Key, Encoding.Unicode, MakeKeyGrbit.NewKey);
	            if (Api.TrySeek(session, Documents, SeekGrbit.SeekEQ))
	            {
	                ResetTransactionOnCurrentDocument();
	            }
	        });
	    }

	    public void CompleteTransaction(Guid txId, Action<DocumentInTransactionData> perDocumentModified)
	    {
	        Api.JetSetCurrentIndex(session, Transactions, "by_tx_id");
	        Api.MakeKey(session, Transactions, txId, MakeKeyGrbit.NewKey);
	        if (Api.TrySeek(session, Transactions, SeekGrbit.SeekEQ))
	            Api.JetDelete(session, Transactions);
            
	        Api.JetSetCurrentIndex(session, DocumentsModifiedByTransactions, "by_tx");
	        Api.MakeKey(session, DocumentsModifiedByTransactions, txId.ToByteArray(), MakeKeyGrbit.NewKey);
	        if (Api.TrySeek(session, DocumentsModifiedByTransactions, SeekGrbit.SeekEQ) == false)
	            return;
	        Api.MakeKey(session, DocumentsModifiedByTransactions, txId.ToByteArray(), MakeKeyGrbit.NewKey);
	        Api.JetSetIndexRange(session, DocumentsModifiedByTransactions,
	                             SetIndexRangeGrbit.RangeInclusive | SetIndexRangeGrbit.RangeUpperLimit);

	        do
	        {
	            var documentInTransactionData = new DocumentInTransactionData
	            {
	                Data = Encoding.UTF8.GetString(Api.RetrieveColumn(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["data"])),
	                Delete = Api.RetrieveColumnAsBoolean(session,DocumentsModifiedByTransactions,documentsModifiedByTransactionsColumns["delete_document"]).Value,
	                Etag = new Guid(Api.RetrieveColumn(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["etag"])),
	                Key = Api.RetrieveColumnAsString(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["key"],Encoding.Unicode),
	                Metadata = Api.RetrieveColumnAsString(session, DocumentsModifiedByTransactions, documentsModifiedByTransactionsColumns["metadata"], Encoding.Unicode),
	            };
                Api.JetDelete(session, DocumentsModifiedByTransactions);
                perDocumentModified(documentInTransactionData);
	        } while (Api.TryMoveNext(session, DocumentsModifiedByTransactions));
	    }

	    private void ResetTransactionOnCurrentDocument()
	    {
	        using(var update = new Update(session, Documents, JET_prep.Replace))
	        {
	            Api.SetColumn(session, Documents, documentsColumns["locked_by_transaction"], null);
	            update.Save();
	        }
	    }

	    public void AddTask(Task task)
		{
			using (var update = new Update(session, Tasks, JET_prep.Insert))
			{
				Api.SetColumn(session, Tasks, tasksColumns["task"], task.AsString(), Encoding.Unicode);
				Api.SetColumn(session, Tasks, tasksColumns["for_index"], task.Index, Encoding.Unicode);

				update.Save();
			}
			if (logger.IsDebugEnabled)
				logger.DebugFormat("New task '{0}'", task.AsString());
		}

		public void AddAttachment(string key, Guid? etag, byte[] data, string headers)
		{
			Api.JetSetCurrentIndex(session, Files, "by_name");
			Api.MakeKey(session, Files, key, Encoding.Unicode, MakeKeyGrbit.NewKey);
			var isUpdate = Api.TrySeek(session, Files, SeekGrbit.SeekEQ);
			if (isUpdate)
			{
				var existingEtag = new Guid(Api.RetrieveColumn(session, Files, filesColumns["etag"]));
				if (existingEtag != etag && etag != null)
				{
					throw new ConcurrencyException("PUT attempted on attachment '" + key +
						"' using a non current etag")
					{
						ActualETag = etag.Value,
						ExpectedETag = existingEtag
					};
				}
			}

			Guid newETag;
			DocumentDatabase.UuidCreateSequential(out newETag);
			using (var update = new Update(session, Files, isUpdate ? JET_prep.Replace : JET_prep.Insert))
			{
				Api.SetColumn(session, Files, filesColumns["name"], key, Encoding.Unicode);
				Api.SetColumn(session, Files, filesColumns["data"], data);
				Api.SetColumn(session, Files, filesColumns["etag"], newETag.ToByteArray());
				Api.SetColumn(session, Files, filesColumns["metadata"], headers, Encoding.Unicode);

				update.Save();
			}
			logger.DebugFormat("Adding attachment {0}", key);
		}

		public void DeleteAttachment(string key, Guid? etag)
		{
			Api.JetSetCurrentIndex(session, Files, "by_name");
			Api.MakeKey(session, Files, key, Encoding.Unicode, MakeKeyGrbit.NewKey);
			if (Api.TrySeek(session, Files, SeekGrbit.SeekEQ) == false)
			{
				logger.DebugFormat("Attachment with key '{0}' was not found, and considered deleted", key);
				return;
			}
			var fileEtag = new Guid(Api.RetrieveColumn(session, Files, filesColumns["etag"]));
			if (fileEtag != etag && etag != null)
			{
				throw new ConcurrencyException("DELETE attempted on attachment '" + key +
					"' using a non current etag")
				{
					ActualETag = etag.Value,
					ExpectedETag = fileEtag
				};
			}

			Api.JetDelete(session, Files);
			logger.DebugFormat("Attachment with key '{0}' was deleted", key);
		}

		public Attachment GetAttachment(string key)
		{
			Api.JetSetCurrentIndex(session, Files, "by_name");
			Api.MakeKey(session, Files, key, Encoding.Unicode, MakeKeyGrbit.NewKey);
			if (Api.TrySeek(session, Files, SeekGrbit.SeekEQ) == false)
			{
				return null;
			}

			var metadata = Api.RetrieveColumnAsString(session, Files, filesColumns["metadata"], Encoding.Unicode);
			return new Attachment
			{
				Data = Api.RetrieveColumn(session, Files, filesColumns["data"]),
				Etag = new Guid(Api.RetrieveColumn(session, Files, filesColumns["etag"])),
				Metadata = JObject.Parse(metadata)
			};
		}

		public string GetFirstTask()
		{
			Api.MoveBeforeFirst(session, Tasks);
			while (Api.TryMoveNext(session, Tasks))
			{
				try
				{
					Api.JetGetLock(session, Tasks, GetLockGrbit.Write);
				}
				catch (EsentErrorException e)
				{
					if (e.Error != JET_err.WriteConflict)
						throw;
				}
				return Api.RetrieveColumnAsString(session, Tasks, tasksColumns["task"], Encoding.Unicode);
			}
			return null;
		}

		public void CompleteCurrentTask()
		{
			Api.JetDelete(session, Tasks);
		}

		public void PushTx()
		{
			innerTxCount++;
		}

		public void PopTx()
		{
			innerTxCount--;
		}

		public int GetDocumentsCount()
		{
			int val;
			Api.JetIndexRecordCount(session, Documents, out val, 0);
			return val;
		}

		public void SetCurrentIndexStatsTo(string index)
		{
			Api.JetSetCurrentIndex(session, IndexesStats, "by_key");
			Api.MakeKey(session, IndexesStats, index, Encoding.Unicode, MakeKeyGrbit.NewKey);
			if (Api.TrySeek(session, IndexesStats, SeekGrbit.SeekEQ) == false)
				throw new InvalidOperationException("There is no index named: " + index);
		}

		public void IncrementIndexingAttempt()
		{
			Api.EscrowUpdate(session, IndexesStats, indexesStatsColumns["attempts"], 1);
		}

		public void IncrementSuccessIndexing()
		{
			Api.EscrowUpdate(session, IndexesStats, indexesStatsColumns["successes"], 1);
		}

		public void IncrementIndexingFailure()
		{
			Api.EscrowUpdate(session, IndexesStats, indexesStatsColumns["errors"], 1);
		}

		public IEnumerable<IndexStats> GetIndexesStats()
		{
			Api.MoveBeforeFirst(session, IndexesStats);
			while (Api.TryMoveNext(session, IndexesStats))
			{
				yield return new IndexStats
				{
					Name = Api.RetrieveColumnAsString(session, IndexesStats, indexesStatsColumns["key"]),
					IndexingAttempts =
						Api.RetrieveColumnAsInt32(session, IndexesStats, indexesStatsColumns["attempts"]).Value,
					IndexingSuccesses =
						Api.RetrieveColumnAsInt32(session, IndexesStats, indexesStatsColumns["successes"]).Value,
					IndexingErrors =
						Api.RetrieveColumnAsInt32(session, IndexesStats, indexesStatsColumns["errors"]).Value,
				};
			}
		}

		public void AddIndex(string name)
		{
			using (var update = new Update(session, IndexesStats, JET_prep.Insert))
			{
				Api.SetColumn(session, IndexesStats, indexesStatsColumns["key"], name, Encoding.Unicode);

				update.Save();
			}
		}

		public void DeleteIndex(string name)
		{
			Api.JetSetCurrentIndex(session, IndexesStats, "by_key");
			Api.MakeKey(session, IndexesStats, name, Encoding.Unicode, MakeKeyGrbit.NewKey);
			if (Api.TrySeek(session, IndexesStats, SeekGrbit.SeekEQ) == false)
				return;
			Api.JetDelete(session, IndexesStats);
		}

		public void DecrementIndexingAttempt()
		{
			Api.EscrowUpdate(session, IndexesStats, indexesStatsColumns["attempts"], -1);
		}

		public IndexFailureInformation GetFailureRate(string index)
		{
			SetCurrentIndexStatsTo(index);
			return new IndexFailureInformation
			{
				Name = index,
				Attempts = Api.RetrieveColumnAsInt32(session, IndexesStats, indexesStatsColumns["attempts"]).Value,
				Errors = Api.RetrieveColumnAsInt32(session, IndexesStats, indexesStatsColumns["errors"]).Value,
				Successes = Api.RetrieveColumnAsInt32(session, IndexesStats, indexesStatsColumns["successes"]).Value
			};
		}

		public void PutMappedResult(string view, string docId, string reduceKey, string data)
		{
			Api.JetSetCurrentIndex(session, MappedResults, "by_pk");
			Api.MakeKey(session, MappedResults, view, Encoding.Unicode, MakeKeyGrbit.NewKey);
			Api.MakeKey(session, MappedResults, docId, Encoding.Unicode, MakeKeyGrbit.None);
			Api.MakeKey(session, MappedResults, reduceKey, Encoding.Unicode, MakeKeyGrbit.None);
			var isUpdate = Api.TrySeek(session, MappedResults, SeekGrbit.SeekEQ);

			using (var update = new Update(session, MappedResults, isUpdate ? JET_prep.Replace : JET_prep.Insert))
			{
				Api.SetColumn(session, MappedResults, mappedResultsColumns["view"], view, Encoding.Unicode);
				Api.SetColumn(session, MappedResults, mappedResultsColumns["document_key"], docId, Encoding.Unicode);
				Api.SetColumn(session, MappedResults, mappedResultsColumns["reduce_key"], reduceKey, Encoding.Unicode);
				Api.SetColumn(session, MappedResults, mappedResultsColumns["data"], Encoding.UTF8.GetBytes(data));

				update.Save();
			}
		}

		public IEnumerable<string> GetMappedResults(string view, string reduceKey)
		{
			Api.JetSetCurrentIndex(session, MappedResults, "by_view_and_reduce_key");
			Api.MakeKey(session, MappedResults, view, Encoding.Unicode, MakeKeyGrbit.NewKey);
			Api.MakeKey(session, MappedResults, reduceKey, Encoding.Unicode, MakeKeyGrbit.None);
			if (Api.TrySeek(session, MappedResults, SeekGrbit.SeekEQ) == false)
				yield break;

			Api.MakeKey(session, MappedResults, view, Encoding.Unicode, MakeKeyGrbit.NewKey);
			Api.MakeKey(session, MappedResults, reduceKey, Encoding.Unicode, MakeKeyGrbit.None);
			Api.JetSetIndexRange(session, MappedResults, SetIndexRangeGrbit.RangeUpperLimit | SetIndexRangeGrbit.RangeInclusive);

			do
			{
				var bytes = Api.RetrieveColumn(session, MappedResults, mappedResultsColumns["data"]);
				yield return Encoding.UTF8.GetString(bytes);
			} while (Api.TryMoveNext(session, MappedResults));
		}

		public IEnumerable<string> DeleteMappedResultsForDocumentId(string documentId, string view)
		{
			Api.JetSetCurrentIndex(session, MappedResults, "by_view_and_doc_key");
			Api.MakeKey(session, MappedResults, view, Encoding.Unicode, MakeKeyGrbit.NewKey);
			Api.MakeKey(session, MappedResults, documentId, Encoding.Unicode, MakeKeyGrbit.None);
			if (Api.TrySeek(session, MappedResults, SeekGrbit.SeekEQ) == false)
				return new string[0];

			var reduceKeys = new HashSet<string>();
			Api.MakeKey(session, MappedResults, view, Encoding.Unicode, MakeKeyGrbit.NewKey);
			Api.MakeKey(session, MappedResults, documentId, Encoding.Unicode, MakeKeyGrbit.None);
			Api.JetSetIndexRange(session, MappedResults, SetIndexRangeGrbit.RangeUpperLimit | SetIndexRangeGrbit.RangeInclusive);
			do
			{
				var reduceKey = Api.RetrieveColumnAsString(session, MappedResults, mappedResultsColumns["reduce_key"],
				                                           Encoding.Unicode);
				reduceKeys.Add(reduceKey);
				Api.JetDelete(session, MappedResults);
			} while (Api.TryMoveNext(session, MappedResults));
			return reduceKeys;
		}

		public void DeleteMappedResultsForView(string view)
		{
			Api.JetSetCurrentIndex(session, MappedResults, "by_view");
			Api.MakeKey(session, MappedResults, view, Encoding.Unicode, MakeKeyGrbit.NewKey);
			if (Api.TrySeek(session, MappedResults, SeekGrbit.SeekEQ) == false)
				return;
			Api.MakeKey(session, MappedResults, view, Encoding.Unicode, MakeKeyGrbit.NewKey);
			Api.JetSetIndexRange(session, MappedResults, SetIndexRangeGrbit.RangeUpperLimit | SetIndexRangeGrbit.RangeInclusive);

			do
			{
				Api.JetDelete(session, MappedResults);
			} while (Api.TryMoveNext(session, MappedResults));
		}

	}
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Authors

No Biography provided

Paul B.

United States United States
I've been a software developer since 1996 and have enjoyed C# since 2003. I have a Bachelor's degree in Computer Science and for some reason, a Master's degree in Business Administration. I currently do software development contracting/consulting.

| Advertise | Privacy | Mobile
Web03 | 2.8.141022.2 | Last Updated 28 Apr 2010
Article Copyright 2010 by Ayende @ Rahien, Paul B.
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid