Click here to Skip to main content
15,896,359 members
Articles / Desktop Programming / Windows Forms

Transferring Data from a Single XML Stream into Multiple Tables with One Forward-Only Read

Rate me:
Please Sign up or sign in to vote.
4.75/5 (12 votes)
30 Sep 2011CPOL13 min read 43.2K   763   32  
Using parallel implementations of SqlBulkCopy to achieve fast data transfer from a single XML source into multiple tables.
using System.Collections.Generic;
using System.Xml.Linq;
using System.Linq;
using System.Text;
using System.Data;
using System.Xml;
using System;

using Rob.Utils;
using System.Globalization;
using System.Diagnostics;
using System.Data.SqlClient;

namespace Rob.DataConn
{
    public class DatabaseTableWriter : IDataReader, IDisposable
    {
        #region Nested Classes
        private class DatabaseColumn
        {
            public string DatabaseColumnName { get; set; }
            public XName XmlColumnName { get; set; }
            public string FormulaName { get; set; }
            public IEnumerable<string> XmlColumnNames { get; set; }
            public string ValueAttribute { get; set; }
            public Type ColumnType { get; set; }
            public int ColumnIndex { get; set; }
            public int Level { get; set; }
            public IEnumerable<string> Converters { get; set; }
        }
        #endregion

        #region Events
        public event EventHandler<RowsUploadedEventArgs> RowsUploaded;
        #endregion

        #region Constructors
        static DatabaseTableWriter()
        {
            valueConverter.Add("FirstLetter", val => val.ToString()[0].ToString());
            valueConverter.Add("UpperCase", val => val.ToString().ToUpper());
        }

        public DatabaseTableWriter(XElement tableElement, XmlDataReader dataReader, DatabaseTableWriter parentWriter = null)
        {
            this.parentTableWriter = parentWriter;
            this.dataReader = dataReader;
            NotifyAfter = XmlUtils.readAttribute<int>(tableElement, "notifyAfter", val => int.Parse(val));
            TableElementName = XmlUtils.readAttribute(tableElement, "XmlTableName");
            XElement[] columnElements = XmlUtils.readChildren(tableElement, "Column").ToArray();
            tableViewName = XmlUtils.readAttribute(tableElement, "DatabaseTableName");
            TableName = tableViewName;
            DatabaseColumn[] columns = columnElements.Select(columnElement => createColumn(columnElement)).ToArray();
            XmlColumns = columns.Where(column => column.XmlColumnName != null).ToDictionary(column => column.XmlColumnName.LocalName);
            FormulaColumns = columns.Where(column => column.FormulaName != null).ToDictionary(column => column.FormulaName);
            rowFields = new string[fieldCount];
            rowValues = new object[fieldCount];
            foreach (DatabaseColumn column in columns)
                rowFields[column.ColumnIndex] = column.DatabaseColumnName;
            childTableWriters = readChildren(tableElement, dataReader, this);
        }
        #endregion

        #region Private Fields
        private int fieldCount = 0;
        private volatile int rowCount = 0;
        protected bool cancelled = false;
        private XmlDataReader dataReader;
        private string[] rowFields;
        private object[] rowValues;
        private string tableViewName;
        private DatabaseTableWriter parentTableWriter;
        private Dictionary<string, DatabaseColumn> XmlColumns;
        private IEnumerable<DatabaseTableWriter> childTableWriters;
        private Dictionary<string, DatabaseColumn> FormulaColumns;
        private static Dictionary<string, Func<object, object>> valueConverter = new Dictionary<string, Func<object, object>>();
        #endregion

        #region Public Readonly Fields
        public readonly string TableElementName;
        public readonly ResetEventWrapper WriterResetEvent = new ResetEventWrapper(true);
        public readonly ResetEventWrapper ReaderResetEvent = new ResetEventWrapper(true);
        #endregion

        #region Readonly Properties
        public string TableName { get; private set; }
        public int RowCount { get { return rowCount; } }
        #endregion

        #region Public Properties
        public object this[string name] { get { return this[XmlColumns[name].ColumnIndex]; } }
        public bool IsDisposed { get; private set; }
        public bool IsClosed { get { return dataReader.Eof; } }
        public int RecordsAffected { get { return -1; } }
        public int FieldCount { get { return fieldCount; } }
        public object this[int i] { get { return rowValues[i]; } }
        public int Depth { get { return 0; } }
        public int NotifyAfter { get; protected set; }
        #endregion

        #region Public Methods
        public bool Read()
        {
            // You shouldn't need these Trace.WriteLine() statements (I've done all that
            // testing so you don't have to), but I strongly suggest keeping them here
            // just in case something goes wrong. Uncommenting them is a very quick and
            // easy diagnostic tool. If you do uncomment them, don't forget to uncomment the
            // corresponding statements in XmlDataReader.cs.
            clearRowValues();
            while (!IsClosed && !cancelled)
            {
                if (!WriterResetEvent.set())
                    return false;
                //Trace.WriteLine(string.Format("TableWriter {0} is SET.", TableElementName));
                //Trace.WriteLine(string.Format("TableWriter {0} is waiting for signal from Reader.", TableElementName));
                if (!ReaderResetEvent.wait())
                    return false;
                //Trace.WriteLine(string.Format("TableWriter {0} received signal from Reader.", TableElementName));
                if (!cancelled && dataReader.canRead(this))
                {
                    ++rowCount;
                    return true;
                }
            }
            return false;
        }

        public void cancel()
        {
            cancelled = true;
            WriterResetEvent.Dispose();
            ReaderResetEvent.Dispose();
        }

        public int GetOrdinal(string name)
        {
            return XmlColumns[name].ColumnIndex;
        }

        public bool readerIsAtTheStartOfAColumnElement(XmlReader xmlReader, Stack<string> columnNameStack)
        {
            if (xmlReader.NodeType != XmlNodeType.Element)
                return false;

            foreach (string columnName in XmlColumns.Keys)
                if (columnNameMatchesStack(columnName, columnNameStack))
                    return true;

            return false;
        }

        public object readColumnValue(XmlReader xmlReader, Stack<string> columnNameStack)
        {
            DatabaseColumn column = findXmlColumn(columnNameStack);
            if (column.ValueAttribute != null)
                return readColumnValueAsAttribute(xmlReader, column);
            else
                return readColumnValueAsElementContent(xmlReader, columnNameStack, column);
        }

        public void addTreeStructure(List<DatabaseTableWriter> tableWriters)
        {
            tableWriters.Add(this);
            foreach (DatabaseTableWriter childTableWriter in childTableWriters)
                childTableWriter.addTreeStructure(tableWriters);
        }

        public void evaluateFormulaColumns()
        {
            foreach (string formula in FormulaColumns.Keys)
                rowValues[FormulaColumns[formula].ColumnIndex] = evaluateFormula(FormulaColumns[formula]);
        }

        public bool hasDatabaseTable()
        {
            return TableName != null;
        }

        public int GetValues(object[] values)
        {
            Array.Copy(rowValues, values, rowValues.Length);
            return fieldCount;
        }

        public string GetName(int i)
        {
            return rowFields[i];
        }

        public bool NextResult()
        {
            return false;
        }

        public DataTable GetSchemaTable()
        {
            return null;
        }

        public bool GetBoolean(int i)
        {
            return bool.Parse(this[i].ToString());
        }

        public byte GetByte(int i)
        {
            return byte.Parse(this[i].ToString(), CultureInfo.CurrentCulture);
        }

        public char GetChar(int i)
        {
            return char.Parse(this[i].ToString());
        }

        public IDataReader GetData(int i)
        {
            if (i == 0)
                return this;
            else
                return null;
        }

        public string GetDataTypeName(int i)
        {
            return typeof(string).FullName;
        }

        public DateTime GetDateTime(int i)
        {
            return DateTime.Parse(this[i].ToString());
        }

        public decimal GetDecimal(int i)
        {
            return decimal.Parse(this[i].ToString());
        }

        public double GetDouble(int i)
        {
            return double.Parse(this[i].ToString());
        }

        public Type GetFieldType(int i)
        {
            return typeof(string);
        }

        public float GetFloat(int i)
        {
            return float.Parse(this[i].ToString());
        }

        public Guid GetGuid(int i)
        {
            return Guid.Parse(this[i].ToString());
        }

        public short GetInt16(int i)
        {
            return Int16.Parse(this[i].ToString());
        }

        public int GetInt32(int i)
        {
            return Int32.Parse(this[i].ToString());
        }

        public long GetInt64(int i)
        {
            return Int64.Parse(this[i].ToString());
        }

        public virtual string GetString(int i)
        {
            return this[i].ToString();
        }

        public virtual object GetValue(int i)
        {
            return this[i];
        }

        public virtual bool IsDBNull(int i)
        {
            return this[i] is DBNull;
        }

        public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferOffset, int length)
        {
            return copyFieldToArray(i, fieldOffset, buffer, bufferOffset, length);
        }

        public long GetChars(int i, long fieldOffset, char[] buffer, int bufferOffset, int length)
        {
            return copyFieldToArray(i, fieldOffset, buffer, bufferOffset, length);
        }

        public void Dispose() 
        {
            WriterResetEvent.Dispose();
            ReaderResetEvent.Dispose();
            if (!IsClosed)
                Close();
            IsDisposed = true;
        }

        public void Close()
        {
            dataReader.close();
        }
        #endregion

        #region Protected Methods
        private void fireRemainingEvents()
        {
            WriterResetEvent.set();
        }

        private void propagateException(Exception ex)
        {
            // This stops the xmlReader in its tracks, and aborts all related threads.
            dataReader.abort();
            throw ex;
        }
        #endregion

        #region Private Methods
        private static IEnumerable<DatabaseTableWriter> readChildren(XElement tableElement, XmlDataReader dataReader, DatabaseTableWriter parentWriter)
        {
            foreach (XElement childElement in XmlUtils.readChildren(tableElement, "Table"))
                yield return dataReader.createDatabaseTableWriter(childElement, parentWriter);
        }

        private object evaluateFormula(DatabaseColumn databaseColumn)
        {
            string path = databaseColumn.FormulaName;
            List<string> pathComponents = StringUtils.splitAndTrim(path, '/').ToList();
            DatabaseTableWriter currentTableWriter = this;
            for (int i = 0; i < pathComponents.Count; ++i)
                if (pathComponents[i].Equals(".."))
                    currentTableWriter = currentTableWriter.parentTableWriter;
                else
                    return currentTableWriter.rowValues[currentTableWriter.XmlColumns[pathComponents[i]].ColumnIndex];
            return null;
        }

        private DatabaseColumn createColumn(XElement columnElement)
        {
            return new DatabaseColumn()
            {
                DatabaseColumnName = XmlUtils.readAttribute(columnElement, "DatabaseColumnName"),
                XmlColumnName = XmlUtils.readAttribute(columnElement, "XmlColumnName"),
                XmlColumnNames = StringUtils.splitAndTrim(XmlUtils.readAttribute(columnElement, "XmlColumnName"), '.'),
                FormulaName = XmlUtils.readAttribute(columnElement, "XmlFormula"),
                ValueAttribute = XmlUtils.readAttribute(columnElement, "ValueAttribute"),
                ColumnType = Type.GetType(XmlUtils.readAttribute(columnElement, "ColumnType")),
                Level = XmlUtils.readAttribute<int>(columnElement, "Level", val => int.Parse(val)),
                Converters = StringUtils.splitAndTrim(XmlUtils.readAttribute(columnElement, "Converters")),
                ColumnIndex = fieldCount++
            };
        }

        private void clearRowValues()
        {
            foreach (string columnKey in XmlColumns.Keys)
                rowValues[XmlColumns[columnKey].ColumnIndex] = DBNull.Value;
        }

        private bool xmlReaderIsAtTheStartOfAColumnElement(XmlReader xmlReader)
        {
            if (xmlReader.NodeType != XmlNodeType.Element)
                return false;

            foreach (string columnElement in XmlColumns.Keys)
                if (xmlReader.IsStartElement(columnElement))
                    return true;

            return false;
        }

        private bool columnNameMatchesStack(string columnName, Stack<string> columnNameStack)
        {
            List<string> xmlColumns = XmlColumns[columnName].XmlColumnNames.ToList();
            int offset = -1;
            for (int index = xmlColumns.Count - 1; index >= 0; --index)
                if (!xmlColumns[index].Equals(columnNameStack.Skip(++offset).First()))
                    return false;
            return true;
        }

        private object readColumnValueAsAttribute(XmlReader xmlReader, DatabaseColumn column)
        {
            if (xmlReader.MoveToAttribute(column.ValueAttribute))
            {
                xmlReader.ReadAttributeValue();
                return (rowValues[column.ColumnIndex] = applyConverters(xmlReader.Value, column));
            }
            return null;
        }

        private object readColumnValueAsElementContent(XmlReader xmlReader, Stack<string> columnNameStack, DatabaseColumn column)
        {
            columnNameStack.Pop();
            return (rowValues[column.ColumnIndex] = applyConverters(
                xmlReader.ReadElementContentAs(column.ColumnType, null), column));
        }

        private object applyConverters(object val, DatabaseColumn column)
        {
            column.Converters.ToList().ForEach(converterKey => val = valueConverter[converterKey](val));
            return val;
        }

        private DatabaseColumn findXmlColumn(Stack<string> columnNameStack)
        {
            string key = columnNameStack.First();
            if (XmlColumns.ContainsKey(key))
                return XmlColumns[key];

            for (int i = 1; i < columnNameStack.Count; ++i)
            {
                key = string.Format("{0}.{1}", columnNameStack.Skip(i).First(), key);
                if (XmlColumns.ContainsKey(key))
                    return XmlColumns[key];
            }
            throw new KeyNotFoundException(string.Format("Xml Column not found for stack [{0}].",
                string.Join("].[", columnNameStack.ToArray())));
        }

        private long copyFieldToArray(int field, long fieldOffset, Array destinationArray, int destinationOffset, int length)
        {
            if (length == 0)
                return 0;

            string value = this[field].ToString();

            if (value == null)
                value = string.Empty;

            Debug.Assert(destinationArray.GetType() == typeof(char[]) || destinationArray.GetType() == typeof(byte[]));

            if (destinationArray.GetType() == typeof(char[]))
                Array.Copy(value.ToCharArray((int)fieldOffset, length), 0, destinationArray, destinationOffset, length);
            else
            {
                char[] chars = value.ToCharArray((int)fieldOffset, length);
                byte[] source = new byte[chars.Length]; ;

                for (int i = 0; i < chars.Length; i++)
                    source[i] = Convert.ToByte(chars[i]);

                Array.Copy(source, 0, destinationArray, destinationOffset, length);
            }

            return length;
        }

        public void executeBulkCopy(string connString, string workingDatabase)
        {
            ConnUtils.execute(connString, conn =>
                {
                    conn.ChangeDatabase(workingDatabase);
                    if (TableName != null)
                    {
                        try
                        {
                            SqlBulkCopy bulkCopy = new SqlBulkCopy(conn);
                            bulkCopy.NotifyAfter = NotifyAfter;
                            bulkCopy.SqlRowsCopied += new SqlRowsCopiedEventHandler(SqlRowsCopied);
                            bulkCopy.DestinationTableName = string.Format("[dbo].[{0}]", TableName);
                            bulkCopy.BulkCopyTimeout = 0;
                            bulkCopy.WriteToServer(this);
                            bulkCopy.SqlRowsCopied -= new SqlRowsCopiedEventHandler(SqlRowsCopied);
                            bulkCopy.Close();
                            fireRemainingEvents();
                        }
                        catch (Exception ex)
                        {
                            propagateException(ex);
                        }
                        finally
                        {
                            // I am removing the connection from the pool so that I can tear
                            // the database down when I have finished with it.  You shouldn't
                            // have to do this: closing the connection will be enough.
                            SqlConnection.ClearPool(conn);
                            conn.Close();
                        }
                    }
                });
        }

        private void SqlRowsCopied(object sender, SqlRowsCopiedEventArgs e)
        {
            if (cancelled)
                e.Abort = true;
            if (RowsUploaded != null)
                RowsUploaded(this, new RowsUploadedEventArgs(TableName, rowCount));
            rowCount = 0;
        }
        #endregion
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior) SpiegelSoft
United Kingdom United Kingdom
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions