Click here to Skip to main content
15,884,836 members
Articles / Desktop Programming / Windows Forms

Transferring Data from a Single XML Stream into Multiple Tables with One Forward-Only Read

Rate me:
Please Sign up or sign in to vote.
4.75/5 (12 votes)
30 Sep 2011CPOL13 min read 43K   763   32  
Using parallel implementations of SqlBulkCopy to achieve fast data transfer from a single XML source into multiple tables.
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Windows.Forms;
using System.Xml.Linq;
using System.Drawing;
using System.Linq;
using System.IO;
using System;

using Rob.DataConn;
using Rob.Utils;

namespace Rob.XmlDataTransfer
{
    public partial class frmDataTransfer : Form
    {
        private IEnumerable<ListItem> results = Enumerable.Empty<ListItem>();
        private string initialConnString;
        private string workingDatabase;
        private volatile XmlDataReader currentReader;
        private object readerSyncObject = new object();
        private Task currentTask;
        private Action transferAction;
        private int totalExpectedRowCount;
        private int currentRowCount;
        private bool isSetUp = false;

        public frmDataTransfer()
        {
            InitializeComponent();
            transferAction = () => startNewReader();
        }

        private void frmDataTransfer_FormClosing(object sender, FormClosingEventArgs e)
        {
            if (isSetUp)
                if (DialogResult.Yes == MessageBox.Show(string.Format("Do you want to tear down the temporary database {0}?", workingDatabase), "Housekeeping", MessageBoxButtons.YesNo))
                    tearDown();
        }

        private void frmDataTransfer_Load(object sender, EventArgs e)
        {
            listResults.DisplayMember = "Value";
        }

        private void cmdTransfer_Click(object sender, EventArgs e)
        {
            transferAction();
        }

        private void startNewReader()
        {
            if (currentReader == null)
                lock (readerSyncObject)
                    if (currentReader == null)
                    {
                        XElement transferInstructions = getTransferInstructions();
                        // This is where you can pass a stream into the XmlDataReader constructor.  It is read
                        // on a forward-only basis, and multiple tables are populated in a single pass-through.
                        // In this case, the URL that is passed in gets turned into a forward-only stream using
                        // XmlReader.Create().
                        currentReader = new XmlDataReader(transferInstructions, "SampleData\\busTimetable.xml");
                        currentReader.RowsUploaded += new EventHandler<RowsUploadedEventArgs>(currentReader_RowsUploaded);
                        currentReader.TableUploaded += new EventHandler<TableUploadedEventArgs>(currentReader_TableUploaded);
                        currentTask = new Task(() => currentReader.executeBulkCopy(initialConnString, workingDatabase));
                        currentTask.ContinueWith(task =>
                        {
                            cleanUp(task);
                        });
                        currentTask.Start();
                        writeResult("Started the transfer process.");
                        cmdDataTransfer.Text = "CANCEL TRANSFER";
                        cmdDataTransfer.ForeColor = Color.DarkRed;
                        transferAction = () => cancelCurrentReader();
                    }
        }

        private void cancelCurrentReader()
        {
            if (currentReader != null)
                lock (readerSyncObject)
                    if (currentReader != null)
                    {
                        currentReader.cancel();
                        Invoke(new Report(writeResult), "The upload has been cancelled.");
                    }
        }

        private delegate void Report(string table);
        private void tableLoaded(string table)
        {
            writeResult(string.Format("Finished Loading the table {0}", table));
        }

        private delegate void RowReport(int currentRows, int totalRows, double proportionComplete);
        private void rowsUploaded(int currentRows, int totalRows, double proportionComplete)
        {
            writeResult(string.Format("Uploaded {0} out of an expected {1} rows ({2:0.0%})", currentRows, totalRows, proportionComplete));
        }

        private void currentReader_TableUploaded(object sender, TableUploadedEventArgs e)
        {
            Invoke(new Report(tableLoaded), e.TableName);
            updateRowCount(e.RowCount);
        }

        private void currentReader_RowsUploaded(object sender, RowsUploadedEventArgs e)
        {
            updateRowCount(e.RowsUploaded);
        }
        private void updateRowCount(int rowCount)
        {
            currentRowCount += rowCount;
            Invoke(new RowReport(rowsUploaded), currentRowCount, totalExpectedRowCount, 
                Math.Min(1.0, (double)currentRowCount / totalExpectedRowCount));
        }

        private void cleanUp(Task task)
        {
            task.Dispose();
            clearCurrentReader();
            Invoke(new Reset(resetTransferButton));
            transferAction = () => startNewReader();
        }

        private delegate void Reset();
        private void resetTransferButton()
        {
            cmdDataTransfer.Text = "START TRANSFER";
            cmdDataTransfer.ForeColor = Color.DarkGreen;
            cmdDataTransfer.Enabled = false;
        }

        private void clearCurrentReader()
        {
            if (currentReader != null)
                lock (readerSyncObject)
                    if (currentReader != null)
                    {
                        currentReader.Dispose();
                        currentReader = null;
                        currentRowCount = 0;
                        totalExpectedRowCount = 0;
                        currentTask = null;
                    }
        }

        private XElement getTransferInstructions()
        {
            // In this case, I'm reading from a file that was copied from the project directory.
            // This file tells you which table you're going to populate: with a small tweak, it
            // can also tell you where to find the stream you want to load into your database.
            // That means that in theory you can load an entirely new stream into a new set of
            // tables without the need for any recompilation, provided you are happy enough
            // with the meta-language you want to build up.
            XElement transferElement = XElement.Load("TransferInstructions\\txcXmlReader.xml");
            readExpectedRowCount(transferElement);
            return transferElement;
        }

        private void readExpectedRowCount(XElement element)
        {
            string tableName = XmlUtils.readAttribute(element, "DatabaseTableName");
            if (tableName != null)
                totalExpectedRowCount += XmlUtils.readAttribute<int>(element, "expectedRowCount", val => int.Parse(val));
            foreach (XElement childElement in element.Elements())
                readExpectedRowCount(childElement);
        }

        private void setUp()
        {
            initialConnString = txtInitialConnString.Text;
            workingDatabase = txtWorkingDatabase.Text;

            ConnUtils.execute(initialConnString, conn => ConnUtils.executeCommands(conn, string.Format("CREATE DATABASE [{0}]", workingDatabase)));

            string createTablePath = "SampleData\\SetUp-CreateTables.sql";
            if (File.Exists(createTablePath))
                using (FileStream stream = File.OpenRead(createTablePath))
                using (StreamReader reader = new StreamReader(stream))
                    ConnUtils.execute(initialConnString, conn =>
                        {
                            conn.ChangeDatabase(workingDatabase);
                            ConnUtils.executeCommands(conn, reader.ReadToEnd());
                        });

            txtInitialConnString.ReadOnly = true;
            txtWorkingDatabase.ReadOnly = true;
            cmdSetup.Enabled = false;
            cmdDataTransfer.Enabled = true;
            cmdTearDown.Enabled = true;
            writeResult("Setup procedure completed.");
            isSetUp = true;
        }

        private void tearDown()
        {
            string dropTablePath = "SampleData\\TearDown-DropTables.sql";
            if (File.Exists(dropTablePath))
                using (FileStream stream = File.OpenRead(dropTablePath))
                using (StreamReader reader = new StreamReader(stream))
                    ConnUtils.execute(initialConnString, conn =>
                        {
                            conn.ChangeDatabase(workingDatabase);
                            ConnUtils.executeCommands(conn, reader.ReadToEnd());
                        });

            ConnUtils.execute(initialConnString, conn => ConnUtils.executeCommands(conn, string.Format("DROP DATABASE [{0}]", workingDatabase)));

            txtInitialConnString.ReadOnly = false;
            txtWorkingDatabase.ReadOnly = false;
            cmdSetup.Enabled = true;
            cmdDataTransfer.Enabled = false;
            cmdTearDown.Enabled = false;
            writeResult("Teardown procedure completed.");
            isSetUp = false;
        }

        private void writeResult(string result)
        {
            results = addResultToList(result).ToList();
            listResults.DataSource = results;
            listResults.SelectedItem = results.Last();
        }

        private IEnumerable<ListItem> addResultToList(string newValue)
        {
            foreach (ListItem item in results)
                yield return item;
            yield return new ListItem { Value = newValue };
        }

        private void cmdSetup_Click(object sender, EventArgs e)
        {
            setUp();
        }

        private void cmdTearDown_Click(object sender, EventArgs e)
        {
            tearDown();
        }
    }

    public class ListItem { public string Value { get; set; } }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior) SpiegelSoft
United Kingdom United Kingdom
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions