using Microsoft.SqlServer.Server;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Data.SqlTypes;
using System.Diagnostics;
using System.IO;
using System.Threading;
public class MSSQL
{
public SqlConnection connection;
public SqlCommand command;
public SqlDataReader reader;
public String conn_string;
public Int32 retry;
#region constructor
public MSSQL(String sql_conn_string, Int32 retry)
{
retry = this.retry;
conn_string = sql_conn_string;
Connect(retry);
}
#endregion
#region dispose - command
public void DisposeCommand()
{
if (command != null)
{
command.Dispose();
command = null;
}
}
#endregion
#region dispose - reader
public void DisposeReader()
{
if (reader != null)
{
reader.Close();
reader.Dispose();
reader = null;
}
}
#endregion
#region dispose - connection
public void DisposeConnection()
{
if (connection != null)
{
connection.Close();
connection.Dispose();
connection = null;
}
}
#endregion
#region dispose - MSSQL
public void Dispose()
{
DisposeCommand();
DisposeReader();
DisposeConnection();
conn_string = String.Empty;
}
#endregion
#region connect
public Boolean Connect(Int32 retry)
{
Boolean is_success = false;
while (!is_success && retry-- > 0)
{
try
{
if (connection == null)
{
connection = new SqlConnection(conn_string);
}
if (connection.State != ConnectionState.Open)
{
connection.Open();
}
is_success = true;
}
catch
{
}
}
return is_success;
}
#endregion
#region create - new command
public void CreateNewCommand()
{
command = new SqlCommand();
}
#endregion
#region add - params
public void AddParameter(String param_name, Object param_value)
{
command.Parameters.Add(new SqlParameter(String.Format("{0}{1}", "@", param_name), param_value));
}
#endregion
#region add - params
public void AddParameter(String name, Object value, SqlDbType type, ParameterDirection direction)
{
SqlParameter p = new SqlParameter();
p.ParameterName = "@" + name;
p.SqlDbType = type;
p.Value = value;
p.Direction = direction;
command.Parameters.Add(p);
p = null;
}
#endregion
#region execute - Ad Hoc
public Boolean ExecuteAdHoc(String sql_command, Int32 retry, bool is_non_query)
{
Boolean is_success = false;
command = new SqlCommand();
while (!is_success && retry-- > 0)
{
if (Connect(retry))
{
try
{
command.Connection = connection;
command.CommandType = CommandType.Text;
command.CommandText = sql_command;
if (is_non_query)
{
command.ExecuteNonQuery();
}
else
{
reader = command.ExecuteReader();
}
is_success = true;
retry = 0;
}
catch
{
}
}
}
command.Dispose();
return is_success;
}
#endregion
#region execute - procedure
public Boolean ExecuteProcedure(String proc_name, Int32 retry, bool is_non_query)
{
Boolean is_success = false;
while (!is_success && retry-- > 0)
{
if (Connect(retry))
{
try
{
command.Connection = connection;
command.CommandType = CommandType.StoredProcedure;
command.CommandText = proc_name;
if (is_non_query)
{
command.ExecuteNonQuery();
}
else
{
reader = command.ExecuteReader();
}
is_success = true;
}
catch
{
}
}
}
command.Dispose();
return is_success;
}
#endregion
#region execute - function
public Object ExecuteFunction(String func_name, Int32 retry)
{
Boolean is_success = false;
SqlParameter par = null;
Object res = null;
while (!is_success && retry-- > 0)
{
if (Connect(retry))
{
try
{
command.Connection = connection;
command.CommandType = CommandType.StoredProcedure;
command.CommandText = func_name;
command.ExecuteNonQuery();
is_success = true;
par = command.Parameters["@return"];
if (par.Value != DBNull.Value)
{
res = (Object)par.Value;
}
}
catch
{
}
}
}
command.Dispose();
return res;
}
#endregion
}
public class FileStreaming
{
public String instance_name = String.Empty;
public String source_file = String.Empty;
public String target_file = String.Empty;
public String target_conn_string = String.Empty;
public Int32 copy_block_size;
public Int64 source_size;
public Int64 target_size;
public Int64 pos = 0;
public Int32 required = 0;
#region constructor
public FileStreaming(String inst_name, String source_file_stream, String target_file_stream, String target_server_conn_string, Int32 stream_copy_batch_mb)
{
instance_name = inst_name;
source_file = source_file_stream;
target_file = target_file_stream;
target_conn_string = target_server_conn_string;
copy_block_size = 1024 * 1000 * stream_copy_batch_mb;
}
#endregion
#region relay
[SqlFunction(DataAccess = DataAccessKind.Read)]
public Boolean Relay()
{
Stopwatch timer = new Stopwatch();
Boolean is_success = false;
FileInfo source_file_info = null;
BinaryReader br = null;
MSSQL target_sql = null;
FileRelayStat stat = null;
byte[] bytes = null;
try
{
timer.Start();
FileRelay.FindStat(instance_name, ref stat);
source_file_info = new FileInfo(source_file);
source_size = source_file_info.Length;
stat.source_size = source_size;
if (source_size == 0)
{
throw new Exception("source file size must be grather than 0");
}
if (target_file == String.Empty)
{
throw new Exception("target file ad path cannot be empty");
}
target_sql = new MSSQL(target_conn_string, 10);
GetTargetFileSize(ref target_sql);
stat.target_start_size = stat.target_size = target_size;
stat.current_step = "Open file to read: " + source_file;
br = new BinaryReader(File.Open(source_file, FileMode.Open, FileAccess.Read));
Boolean is_write_ok = true;
stat.current_step = "Start Relaying";
while (is_write_ok && target_size < source_size)
{
pos = target_size;
required = (target_size + copy_block_size > source_size) ? (Int32)(source_size - target_size) : copy_block_size;
// Seek to our required position.
br.BaseStream.Seek(pos, SeekOrigin.Begin);
// Read the next X-bytes.
bytes = br.ReadBytes(required);
stat.ellapsed_ms_reader_last = timer.ElapsedMilliseconds;
stat.ellapsed_ms_reader_total += timer.ElapsedMilliseconds;
stat.ellapsed_ms_total += timer.ElapsedMilliseconds;
timer.Restart();
is_write_ok = RelayBytes(ref bytes, ref target_sql, 10);
stat.ellapsed_ms_writer_last = timer.ElapsedMilliseconds;
stat.ellapsed_ms_writer_total += timer.ElapsedMilliseconds;
stat.ellapsed_ms_total += timer.ElapsedMilliseconds;
timer.Restart();
stat.target_size = target_size += copy_block_size;
bytes = null;
stat.EstimateTimes();
}
if (is_write_ok)
{
is_success = true;
}
}
catch (Exception e)
{
throw new Exception(e.Message.ToString());
}
finally
{
source_file_info = null;
if (br != null)
{
br.Close();
br.Dispose();
br = null;
}
bytes = null;
timer = null;
}
return is_success;
}
#endregion
#region relay - chunk of bytes
[SqlFunction(DataAccess = DataAccessKind.Read)]
public Boolean RelayBytes(ref byte[] bytes, ref MSSQL sql, Int32 retry)
{
Boolean is_success = false;
try
{
sql.CreateNewCommand();
sql.AddParameter("target_file_stream", target_file, SqlDbType.VarChar, ParameterDirection.Input);
sql.AddParameter("position", target_size, SqlDbType.BigInt, ParameterDirection.Input);
sql.AddParameter("bytes", bytes, SqlDbType.VarBinary, ParameterDirection.Input);
sql.AddParameter("retry", retry, SqlDbType.Int, ParameterDirection.Input);
sql.AddParameter("return", DBNull.Value, SqlDbType.Bit, ParameterDirection.ReturnValue);
Object res = sql.ExecuteFunction("clr_filerelay_write_bytes", 10);
if (res != null)
{
is_success = (Boolean)res;
}
}
catch (Exception e)
{
throw new Exception(e.Message);
}
finally
{
sql.DisposeReader();
}
return is_success;
}
#endregion
#region get - target file size
public void GetTargetFileSize(ref MSSQL sql)
{
try
{
sql.CreateNewCommand();
sql.AddParameter("filename", target_file, SqlDbType.VarChar, ParameterDirection.Input);
sql.AddParameter("return", DBNull.Value, SqlDbType.Int, ParameterDirection.ReturnValue);
Object res = sql.ExecuteFunction("clr_filerelay_get_file_size", 10);
if (res != null)
{
target_size = (Int64)res;
}
}
catch (Exception e)
{
throw new Exception(e.Message);
}
}
#endregion
}
public class FileRelayStat
{
public String current_step;
public String source_file_stream;
public String target_file_stream;
public Int32 stream_copy_batch_bytes;
public Int64 source_size = 0;
public Int64 target_start_size = 0;
public Int64 target_size = 0;
public Int64 ellapsed_ms_total = 0;
public Int64 ellapsed_ms_reader_total = 0;
public Int64 ellapsed_ms_reader_last = 0;
public Int64 ellapsed_ms_writer_total = 0;
public Int64 ellapsed_ms_writer_last = 0;
public Int64 estimated_sec_remaining = 0;
public Int32 percent_completed = 0;
public Int32 avg_kb_reads_per_sec = 0;
public Int32 avg_kb_writes_per_sec = 0;
public Int64 total_bytes_copied = 0;
public FileRelayStat(String current_step, String src_file_stream, String trgt_file_stream, Int32 strm_copy_batch_bytes)
{
source_file_stream = src_file_stream;
target_file_stream = trgt_file_stream;
stream_copy_batch_bytes = strm_copy_batch_bytes;
}
public void EstimateTimes()
{
total_bytes_copied = target_size - target_start_size;
estimated_sec_remaining = (source_size - target_size) * (ellapsed_ms_total / 1000) / total_bytes_copied;
percent_completed = (Int32)(target_size * 100 / source_size);
avg_kb_reads_per_sec = (Int32)(total_bytes_copied / ellapsed_ms_reader_total);
avg_kb_writes_per_sec = (Int32)(total_bytes_copied / ellapsed_ms_writer_total);
}
}
public class FileRelayError
{
public String instance;
public String message;
}
public class FileRelay
{
public static Object sync_root = new Object();
public static Dictionary<String, FileRelayStat> Stat = new Dictionary<String, FileRelayStat>();
public static Dictionary<Int32, FileRelayError> Errors = new Dictionary<Int32, FileRelayError>();
#region file relay
#region Copy
[SqlFunction(DataAccess = DataAccessKind.Read)]
public static Boolean Copy(String instance_name, String source_file_stream, String target_file_stream, String target_server_conn_string, Int32 stream_copy_batch_mb)
{
Boolean is_done = false;
FileStreaming fs;
FileRelayStat stat = null;
try
{
stat = new FileRelayStat("Stat - Initialized", source_file_stream, target_file_stream, stream_copy_batch_mb);
CreateStatInstance(instance_name, stat);
fs = new FileStreaming(instance_name, source_file_stream, target_file_stream, target_server_conn_string, stream_copy_batch_mb);
stat.current_step = "FileStreaming - Initialized";
is_done = fs.Relay();
stat.current_step = "Relay - Finished";
}
catch (Exception e)
{
FileRelay.AddError(instance_name, e.Message);
}
finally
{
fs = null;
stat = null;
RemoveStatInstance(instance_name);
}
return is_done;
}
#endregion
#region WriteBytes
[SqlFunction(DataAccess = DataAccessKind.Read)]
public static Boolean WriteBytes(String target_file_stream, Int64 position, byte[] bytes, Int32 retry)
{
Boolean is_success = false;
while (!is_success && retry-- > 0)
{
BinaryWriter bw = null;
try
{
bw = new BinaryWriter(File.Open(target_file_stream, FileMode.OpenOrCreate, FileAccess.Write));
bw.BaseStream.Seek(position, SeekOrigin.Begin);
bw.Write(bytes);
is_success = true;
}
catch
{
Thread.Sleep(1000);
}
finally
{
bw.Flush();
bw.Close();
bw.Dispose();
}
}
return is_success;
}
#endregion
#region GetFileSize
[SqlFunction(DataAccess = DataAccessKind.Read)]
public static Int64 GetFileSize(String filename)
{
Int64 file_size = 0;
FileInfo fi = null;
try
{
fi = new FileInfo(filename);
file_size = fi.Length;
}
catch
{
}
finally
{
fi = null;
}
return file_size;
}
#endregion
#endregion
#region stat
#region stat - GetStat - instance
[SqlFunction(FillRowMethodName = "GetStat", TableDefinition = "current_step nvarchar(1000), source_file_stream nvarchar(4000), target_file_stream nvarchar(4000), stream_copy_batch_bytes int, source_size int, target_start_size int, target_size int, ellapsed_sec_total int, ellapsed_sec_reader_total int, ellapsed_sec_reader_last int, ellapsed_sec_writer_total int, ellapsed_sec_writer_last int, estimated_sec_remaining int, percent_completed int, avg_kb_reads_per_sec int, avg_kb_writes_per_sec int, total_bytes_copied int")]
public static IEnumerable GetInstanceStats(String instance_name)
{
FileRelayStat frs = null;
Dictionary<Int32, FileRelayStat> relay_stats = new Dictionary<Int32, FileRelayStat>();
lock (FileRelay.sync_root)
{
FileRelay.Stat.TryGetValue(instance_name, out frs);
if (frs != null)
{
relay_stats.Add(1, frs);
}
}
return relay_stats;
}
#endregion
#region stat - GetStat : fill row
private static void GetStat(Object obj,
out SqlString current_step,
out SqlString source_file_stream,
out SqlString target_file_stream,
out SqlInt32 stream_copy_batch_bytes,
out SqlInt64 source_size,
out SqlInt64 target_start_size,
out SqlInt64 target_size,
out SqlInt32 ellapsed_sec_total,
out SqlInt32 ellapsed_sec_reader_total,
out SqlInt32 ellapsed_sec_reader_last,
out SqlInt32 ellapsed_sec_writer_total,
out SqlInt32 ellapsed_sec_writer_last,
out SqlInt32 estimated_sec_remaining,
out SqlInt32 percent_completed,
out SqlInt32 avg_kb_reads_per_sec,
out SqlInt32 avg_kb_writes_per_sec,
out SqlInt64 total_bytes_copied)
{
KeyValuePair<Int32, FileRelayStat> stat = (KeyValuePair<Int32, FileRelayStat>)obj;
current_step = new SqlString(stat.Value.current_step);
source_file_stream = new SqlString(stat.Value.source_file_stream);
target_file_stream = new SqlString(stat.Value.target_file_stream);
stream_copy_batch_bytes = new SqlInt32(stat.Value.stream_copy_batch_bytes);
source_size = new SqlInt64(stat.Value.source_size);
target_start_size = new SqlInt64(stat.Value.target_start_size);
target_size = new SqlInt64(stat.Value.target_size);
ellapsed_sec_total = new SqlInt32((Int32)stat.Value.ellapsed_ms_total / 1000);
ellapsed_sec_reader_total = new SqlInt32((Int32)stat.Value.ellapsed_ms_reader_total / 1000);
ellapsed_sec_reader_last = new SqlInt32((Int32)stat.Value.ellapsed_ms_reader_last / 1000);
ellapsed_sec_writer_total = new SqlInt32((Int32)stat.Value.ellapsed_ms_writer_total / 1000);
ellapsed_sec_writer_last = new SqlInt32((Int32)stat.Value.ellapsed_ms_writer_last / 1000);
estimated_sec_remaining = new SqlInt32((Int32)stat.Value.estimated_sec_remaining);
percent_completed = new SqlInt32(stat.Value.percent_completed);
avg_kb_reads_per_sec = new SqlInt32(stat.Value.avg_kb_reads_per_sec);
avg_kb_writes_per_sec = new SqlInt32(stat.Value.avg_kb_writes_per_sec);
total_bytes_copied = new SqlInt64(stat.Value.total_bytes_copied);
}
#endregion
#region stat - remove an instance
[SqlFunction(DataAccess = DataAccessKind.Read)]
private static void RemoveStatInstance(String instance_name)
{
lock (FileRelay.sync_root)
{
if (Stat.ContainsKey(instance_name))
{
Stat.Remove(instance_name);
}
}
}
#endregion
#region stat - create an instance
[SqlFunction(DataAccess = DataAccessKind.Read)]
private static void CreateStatInstance(String instance_name, FileRelayStat stat)
{
lock (FileRelay.sync_root)
{
if (Stat.ContainsKey(instance_name))
{
throw new Exception("Instance Name already exists: " + instance_name);
}
else
{
Stat.Add(instance_name, stat);
}
}
}
#endregion
#region stat - modify instance
[SqlFunction(DataAccess = DataAccessKind.Read)]
public static void ModifyStat(String instance_name, FileRelayStat stat)
{
lock (FileRelay.sync_root)
{
if (Stat.ContainsKey(instance_name))
{
FileRelay.Stat[instance_name] = stat;
}
}
}
#endregion
#region stat - find an instance
[SqlFunction(DataAccess = DataAccessKind.Read)]
public static void FindStat(String instance_name, ref FileRelayStat stat)
{
lock (FileRelay.sync_root)
{
if (Stat.ContainsKey(instance_name))
{
stat = FileRelay.Stat[instance_name];
}
}
}
#endregion
#endregion
#region error
#region error - add
[SqlFunction(DataAccess = DataAccessKind.Read)]
public static void AddError(String instance, String message)
{
lock (sync_root)
{
int key = Errors.Count;
Errors.Add(key++, new FileRelayError() { instance = instance, message = message });
}
}
#endregion
#region errors - clear
[SqlFunction(DataAccess = DataAccessKind.Read)]
public static Boolean ClearErrors()
{
lock (sync_root)
{
Errors.Clear();
}
return true;
}
#endregion
#region error - ListErrors - all
[SqlFunction(FillRowMethodName = "GetError", TableDefinition = "instance nvarchar(1000), error_msg nvarchar(max)")]
public static IEnumerable ListErrors()
{
return Errors;
}
#endregion
#region error - ListErrors : fill rows
private static void GetError(Object obj, out SqlString instance, out SqlString error_msg)
{
KeyValuePair<Int32, FileRelayError> error = (KeyValuePair<Int32, FileRelayError>)obj;
instance = new SqlString(error.Value.instance);
error_msg = new SqlString(error.Value.message);
}
#endregion
#endregion
}