using System.Collections.Generic;
using System.Xml.Linq;
using System.Linq;
using System.Text;
using System.Data;
using System.Xml;
using System;
using Rob.Utils;
using System.Globalization;
using System.Diagnostics;
using System.Data.SqlClient;
namespace Rob.DataConn
{
public class DatabaseTableWriter : IDataReader, IDisposable
{
#region Nested Classes
private class DatabaseColumn
{
public string DatabaseColumnName { get; set; }
public XName XmlColumnName { get; set; }
public string FormulaName { get; set; }
public IEnumerable<string> XmlColumnNames { get; set; }
public string ValueAttribute { get; set; }
public Type ColumnType { get; set; }
public int ColumnIndex { get; set; }
public int Level { get; set; }
public IEnumerable<string> Converters { get; set; }
}
#endregion
#region Events
public event EventHandler<RowsUploadedEventArgs> RowsUploaded;
#endregion
#region Constructors
static DatabaseTableWriter()
{
valueConverter.Add("FirstLetter", val => val.ToString()[0].ToString());
valueConverter.Add("UpperCase", val => val.ToString().ToUpper());
}
public DatabaseTableWriter(XElement tableElement, XmlDataReader dataReader, DatabaseTableWriter parentWriter = null)
{
this.parentTableWriter = parentWriter;
this.dataReader = dataReader;
NotifyAfter = XmlUtils.readAttribute<int>(tableElement, "notifyAfter", val => int.Parse(val));
TableElementName = XmlUtils.readAttribute(tableElement, "XmlTableName");
XElement[] columnElements = XmlUtils.readChildren(tableElement, "Column").ToArray();
tableViewName = XmlUtils.readAttribute(tableElement, "DatabaseTableName");
TableName = tableViewName;
DatabaseColumn[] columns = columnElements.Select(columnElement => createColumn(columnElement)).ToArray();
XmlColumns = columns.Where(column => column.XmlColumnName != null).ToDictionary(column => column.XmlColumnName.LocalName);
FormulaColumns = columns.Where(column => column.FormulaName != null).ToDictionary(column => column.FormulaName);
rowFields = new string[fieldCount];
rowValues = new object[fieldCount];
foreach (DatabaseColumn column in columns)
rowFields[column.ColumnIndex] = column.DatabaseColumnName;
childTableWriters = readChildren(tableElement, dataReader, this);
}
#endregion
#region Private Fields
private int fieldCount = 0;
private volatile int rowCount = 0;
protected bool cancelled = false;
private XmlDataReader dataReader;
private string[] rowFields;
private object[] rowValues;
private string tableViewName;
private DatabaseTableWriter parentTableWriter;
private Dictionary<string, DatabaseColumn> XmlColumns;
private IEnumerable<DatabaseTableWriter> childTableWriters;
private Dictionary<string, DatabaseColumn> FormulaColumns;
private static Dictionary<string, Func<object, object>> valueConverter = new Dictionary<string, Func<object, object>>();
#endregion
#region Public Readonly Fields
public readonly string TableElementName;
public readonly ResetEventWrapper WriterResetEvent = new ResetEventWrapper(true);
public readonly ResetEventWrapper ReaderResetEvent = new ResetEventWrapper(true);
#endregion
#region Readonly Properties
public string TableName { get; private set; }
public int RowCount { get { return rowCount; } }
#endregion
#region Public Properties
public object this[string name] { get { return this[XmlColumns[name].ColumnIndex]; } }
public bool IsDisposed { get; private set; }
public bool IsClosed { get { return dataReader.Eof; } }
public int RecordsAffected { get { return -1; } }
public int FieldCount { get { return fieldCount; } }
public object this[int i] { get { return rowValues[i]; } }
public int Depth { get { return 0; } }
public int NotifyAfter { get; protected set; }
#endregion
#region Public Methods
public bool Read()
{
// You shouldn't need these Trace.WriteLine() statements (I've done all that
// testing so you don't have to), but I strongly suggest keeping them here
// just in case something goes wrong. Uncommenting them is a very quick and
// easy diagnostic tool. If you do uncomment them, don't forget to uncomment the
// corresponding statements in XmlDataReader.cs.
clearRowValues();
while (!IsClosed && !cancelled)
{
if (!WriterResetEvent.set())
return false;
//Trace.WriteLine(string.Format("TableWriter {0} is SET.", TableElementName));
//Trace.WriteLine(string.Format("TableWriter {0} is waiting for signal from Reader.", TableElementName));
if (!ReaderResetEvent.wait())
return false;
//Trace.WriteLine(string.Format("TableWriter {0} received signal from Reader.", TableElementName));
if (!cancelled && dataReader.canRead(this))
{
++rowCount;
return true;
}
}
return false;
}
public void cancel()
{
cancelled = true;
WriterResetEvent.Dispose();
ReaderResetEvent.Dispose();
}
public int GetOrdinal(string name)
{
return XmlColumns[name].ColumnIndex;
}
public bool readerIsAtTheStartOfAColumnElement(XmlReader xmlReader, Stack<string> columnNameStack)
{
if (xmlReader.NodeType != XmlNodeType.Element)
return false;
foreach (string columnName in XmlColumns.Keys)
if (columnNameMatchesStack(columnName, columnNameStack))
return true;
return false;
}
public object readColumnValue(XmlReader xmlReader, Stack<string> columnNameStack)
{
DatabaseColumn column = findXmlColumn(columnNameStack);
if (column.ValueAttribute != null)
return readColumnValueAsAttribute(xmlReader, column);
else
return readColumnValueAsElementContent(xmlReader, columnNameStack, column);
}
public void addTreeStructure(List<DatabaseTableWriter> tableWriters)
{
tableWriters.Add(this);
foreach (DatabaseTableWriter childTableWriter in childTableWriters)
childTableWriter.addTreeStructure(tableWriters);
}
public void evaluateFormulaColumns()
{
foreach (string formula in FormulaColumns.Keys)
rowValues[FormulaColumns[formula].ColumnIndex] = evaluateFormula(FormulaColumns[formula]);
}
public bool hasDatabaseTable()
{
return TableName != null;
}
public int GetValues(object[] values)
{
Array.Copy(rowValues, values, rowValues.Length);
return fieldCount;
}
public string GetName(int i)
{
return rowFields[i];
}
public bool NextResult()
{
return false;
}
public DataTable GetSchemaTable()
{
return null;
}
public bool GetBoolean(int i)
{
return bool.Parse(this[i].ToString());
}
public byte GetByte(int i)
{
return byte.Parse(this[i].ToString(), CultureInfo.CurrentCulture);
}
public char GetChar(int i)
{
return char.Parse(this[i].ToString());
}
public IDataReader GetData(int i)
{
if (i == 0)
return this;
else
return null;
}
public string GetDataTypeName(int i)
{
return typeof(string).FullName;
}
public DateTime GetDateTime(int i)
{
return DateTime.Parse(this[i].ToString());
}
public decimal GetDecimal(int i)
{
return decimal.Parse(this[i].ToString());
}
public double GetDouble(int i)
{
return double.Parse(this[i].ToString());
}
public Type GetFieldType(int i)
{
return typeof(string);
}
public float GetFloat(int i)
{
return float.Parse(this[i].ToString());
}
public Guid GetGuid(int i)
{
return Guid.Parse(this[i].ToString());
}
public short GetInt16(int i)
{
return Int16.Parse(this[i].ToString());
}
public int GetInt32(int i)
{
return Int32.Parse(this[i].ToString());
}
public long GetInt64(int i)
{
return Int64.Parse(this[i].ToString());
}
public virtual string GetString(int i)
{
return this[i].ToString();
}
public virtual object GetValue(int i)
{
return this[i];
}
public virtual bool IsDBNull(int i)
{
return this[i] is DBNull;
}
public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferOffset, int length)
{
return copyFieldToArray(i, fieldOffset, buffer, bufferOffset, length);
}
public long GetChars(int i, long fieldOffset, char[] buffer, int bufferOffset, int length)
{
return copyFieldToArray(i, fieldOffset, buffer, bufferOffset, length);
}
public void Dispose()
{
WriterResetEvent.Dispose();
ReaderResetEvent.Dispose();
if (!IsClosed)
Close();
IsDisposed = true;
}
public void Close()
{
dataReader.close();
}
#endregion
#region Protected Methods
private void fireRemainingEvents()
{
WriterResetEvent.set();
}
private void propagateException(Exception ex)
{
// This stops the xmlReader in its tracks, and aborts all related threads.
dataReader.abort();
throw ex;
}
#endregion
#region Private Methods
private static IEnumerable<DatabaseTableWriter> readChildren(XElement tableElement, XmlDataReader dataReader, DatabaseTableWriter parentWriter)
{
foreach (XElement childElement in XmlUtils.readChildren(tableElement, "Table"))
yield return dataReader.createDatabaseTableWriter(childElement, parentWriter);
}
private object evaluateFormula(DatabaseColumn databaseColumn)
{
string path = databaseColumn.FormulaName;
List<string> pathComponents = StringUtils.splitAndTrim(path, '/').ToList();
DatabaseTableWriter currentTableWriter = this;
for (int i = 0; i < pathComponents.Count; ++i)
if (pathComponents[i].Equals(".."))
currentTableWriter = currentTableWriter.parentTableWriter;
else
return currentTableWriter.rowValues[currentTableWriter.XmlColumns[pathComponents[i]].ColumnIndex];
return null;
}
private DatabaseColumn createColumn(XElement columnElement)
{
return new DatabaseColumn()
{
DatabaseColumnName = XmlUtils.readAttribute(columnElement, "DatabaseColumnName"),
XmlColumnName = XmlUtils.readAttribute(columnElement, "XmlColumnName"),
XmlColumnNames = StringUtils.splitAndTrim(XmlUtils.readAttribute(columnElement, "XmlColumnName"), '.'),
FormulaName = XmlUtils.readAttribute(columnElement, "XmlFormula"),
ValueAttribute = XmlUtils.readAttribute(columnElement, "ValueAttribute"),
ColumnType = Type.GetType(XmlUtils.readAttribute(columnElement, "ColumnType")),
Level = XmlUtils.readAttribute<int>(columnElement, "Level", val => int.Parse(val)),
Converters = StringUtils.splitAndTrim(XmlUtils.readAttribute(columnElement, "Converters")),
ColumnIndex = fieldCount++
};
}
private void clearRowValues()
{
foreach (string columnKey in XmlColumns.Keys)
rowValues[XmlColumns[columnKey].ColumnIndex] = DBNull.Value;
}
private bool xmlReaderIsAtTheStartOfAColumnElement(XmlReader xmlReader)
{
if (xmlReader.NodeType != XmlNodeType.Element)
return false;
foreach (string columnElement in XmlColumns.Keys)
if (xmlReader.IsStartElement(columnElement))
return true;
return false;
}
private bool columnNameMatchesStack(string columnName, Stack<string> columnNameStack)
{
List<string> xmlColumns = XmlColumns[columnName].XmlColumnNames.ToList();
int offset = -1;
for (int index = xmlColumns.Count - 1; index >= 0; --index)
if (!xmlColumns[index].Equals(columnNameStack.Skip(++offset).First()))
return false;
return true;
}
private object readColumnValueAsAttribute(XmlReader xmlReader, DatabaseColumn column)
{
if (xmlReader.MoveToAttribute(column.ValueAttribute))
{
xmlReader.ReadAttributeValue();
return (rowValues[column.ColumnIndex] = applyConverters(xmlReader.Value, column));
}
return null;
}
private object readColumnValueAsElementContent(XmlReader xmlReader, Stack<string> columnNameStack, DatabaseColumn column)
{
columnNameStack.Pop();
return (rowValues[column.ColumnIndex] = applyConverters(
xmlReader.ReadElementContentAs(column.ColumnType, null), column));
}
private object applyConverters(object val, DatabaseColumn column)
{
column.Converters.ToList().ForEach(converterKey => val = valueConverter[converterKey](val));
return val;
}
private DatabaseColumn findXmlColumn(Stack<string> columnNameStack)
{
string key = columnNameStack.First();
if (XmlColumns.ContainsKey(key))
return XmlColumns[key];
for (int i = 1; i < columnNameStack.Count; ++i)
{
key = string.Format("{0}.{1}", columnNameStack.Skip(i).First(), key);
if (XmlColumns.ContainsKey(key))
return XmlColumns[key];
}
throw new KeyNotFoundException(string.Format("Xml Column not found for stack [{0}].",
string.Join("].[", columnNameStack.ToArray())));
}
private long copyFieldToArray(int field, long fieldOffset, Array destinationArray, int destinationOffset, int length)
{
if (length == 0)
return 0;
string value = this[field].ToString();
if (value == null)
value = string.Empty;
Debug.Assert(destinationArray.GetType() == typeof(char[]) || destinationArray.GetType() == typeof(byte[]));
if (destinationArray.GetType() == typeof(char[]))
Array.Copy(value.ToCharArray((int)fieldOffset, length), 0, destinationArray, destinationOffset, length);
else
{
char[] chars = value.ToCharArray((int)fieldOffset, length);
byte[] source = new byte[chars.Length]; ;
for (int i = 0; i < chars.Length; i++)
source[i] = Convert.ToByte(chars[i]);
Array.Copy(source, 0, destinationArray, destinationOffset, length);
}
return length;
}
public void executeBulkCopy(string connString, string workingDatabase)
{
ConnUtils.execute(connString, conn =>
{
conn.ChangeDatabase(workingDatabase);
if (TableName != null)
{
try
{
SqlBulkCopy bulkCopy = new SqlBulkCopy(conn);
bulkCopy.NotifyAfter = NotifyAfter;
bulkCopy.SqlRowsCopied += new SqlRowsCopiedEventHandler(SqlRowsCopied);
bulkCopy.DestinationTableName = string.Format("[dbo].[{0}]", TableName);
bulkCopy.BulkCopyTimeout = 0;
bulkCopy.WriteToServer(this);
bulkCopy.SqlRowsCopied -= new SqlRowsCopiedEventHandler(SqlRowsCopied);
bulkCopy.Close();
fireRemainingEvents();
}
catch (Exception ex)
{
propagateException(ex);
}
finally
{
// I am removing the connection from the pool so that I can tear
// the database down when I have finished with it. You shouldn't
// have to do this: closing the connection will be enough.
SqlConnection.ClearPool(conn);
conn.Close();
}
}
});
}
private void SqlRowsCopied(object sender, SqlRowsCopiedEventArgs e)
{
if (cancelled)
e.Abort = true;
if (RowsUploaded != null)
RowsUploaded(this, new RowsUploadedEventArgs(TableName, rowCount));
rowCount = 0;
}
#endregion
}
}