Click here to Skip to main content
15,893,594 members
Articles / Database Development / SQL Server / SQL Server 2008

Binary Copy files in T-SQL with helps of CLR

Rate me:
Please Sign up or sign in to vote.
4.17/5 (4 votes)
22 Jan 2013CPOL3 min read 14.7K   8   10  
Managing SQL backups can be more simple by using this FileRelay CLR
using Microsoft.SqlServer.Server;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Data.SqlTypes;
using System.Diagnostics;
using System.IO;
using System.Threading;



    public class MSSQL
    {
        public SqlConnection connection;
        public SqlCommand command;
        public SqlDataReader reader;
        public String conn_string;
        public Int32 retry;


        #region constructor
        public MSSQL(String sql_conn_string, Int32 retry)
        {
            retry = this.retry;
            conn_string = sql_conn_string;
            Connect(retry);
        }
        #endregion

        #region dispose - command
        public void DisposeCommand()
        {
            if (command != null)
            {
                command.Dispose();
                command = null;
            }

        }
        #endregion

        #region dispose - reader
        public void DisposeReader()
        {
            if (reader != null)
            {
                reader.Close();
                reader.Dispose();
                reader = null;
            }

        }
        #endregion

        #region dispose - connection
        public void DisposeConnection()
        {
            if (connection != null)
            {
                connection.Close();
                connection.Dispose();
                connection = null;
            }

        }
        #endregion

        #region dispose - MSSQL
        public void Dispose()
        {
            DisposeCommand();
            DisposeReader();
            DisposeConnection();
            conn_string = String.Empty;
        }
        #endregion


        #region connect
        public Boolean Connect(Int32 retry)
        {
            Boolean is_success = false;

            while (!is_success && retry-- > 0)
            {
                try
                {
                    if (connection == null)
                    {
                        connection = new SqlConnection(conn_string);
                    }

                    if (connection.State != ConnectionState.Open)
                    {
                        connection.Open();
                    }

                    is_success = true;
                }
                catch
                {
                }
            }

            return is_success;
        }
        #endregion

        #region create - new command
        public void CreateNewCommand()
        {
            command = new SqlCommand();
        }
        #endregion

        #region add - params
        public void AddParameter(String param_name, Object param_value)
        {
            command.Parameters.Add(new SqlParameter(String.Format("{0}{1}", "@", param_name), param_value));
        }
        #endregion

        #region add - params
        public void AddParameter(String name, Object value, SqlDbType type, ParameterDirection direction)
        {
            SqlParameter p = new SqlParameter();

            p.ParameterName = "@" + name;
            p.SqlDbType = type;
            p.Value = value;
            p.Direction = direction;

            command.Parameters.Add(p);

            p = null;
        }
        #endregion

        #region execute - Ad Hoc
        public Boolean ExecuteAdHoc(String sql_command, Int32 retry, bool is_non_query)
        {
            Boolean is_success = false;

            command = new SqlCommand();


            while (!is_success && retry-- > 0)
            {
                if (Connect(retry))
                {

                    try
                    {
                        command.Connection = connection;
                        command.CommandType = CommandType.Text;
                        command.CommandText = sql_command;

                        if (is_non_query)
                        {
                            command.ExecuteNonQuery();
                        }
                        else
                        {
                            reader = command.ExecuteReader();
                        }

                        is_success = true;
                        retry = 0;
                    }
                    catch
                    {
                    }
                }
            }

            command.Dispose();

            return is_success;
        }
        #endregion


        #region execute - procedure
        public Boolean ExecuteProcedure(String proc_name, Int32 retry, bool is_non_query)
        {
            Boolean is_success = false;


            while (!is_success && retry-- > 0)
            {
                if (Connect(retry))
                {

                    try
                    {
                        command.Connection = connection;

                        command.CommandType = CommandType.StoredProcedure;
                        command.CommandText = proc_name;

                        if (is_non_query)
                        {
                            command.ExecuteNonQuery();
                        }
                        else
                        {
                            reader = command.ExecuteReader();
                        }

                        is_success = true;
                    }
                    catch
                    {
                    }
                }
            }

            command.Dispose();

            return is_success;
        }
        #endregion

        #region execute - function
        public Object ExecuteFunction(String func_name, Int32 retry)
        {
            Boolean is_success = false;
            SqlParameter par = null;
            Object res = null;

            while (!is_success && retry-- > 0)
            {
                if (Connect(retry))
                {
                    try
                    {
                        command.Connection = connection;
                        command.CommandType = CommandType.StoredProcedure;
                        command.CommandText = func_name;
                        command.ExecuteNonQuery();

                        is_success = true;

                        par = command.Parameters["@return"];

                        if (par.Value != DBNull.Value)
                        {
                            res = (Object)par.Value;
                        }
                    }
                    catch
                    {
                    }
                }
            }

            command.Dispose();

            return res;
        }
        #endregion


    }


    public class FileStreaming
    {

        public String instance_name = String.Empty;
        public String source_file = String.Empty;
        public String target_file = String.Empty;
        public String target_conn_string = String.Empty;

        public Int32 copy_block_size;
        public Int64 source_size;
        public Int64 target_size;


        public Int64 pos = 0;
        public Int32 required = 0;



        #region constructor
        public FileStreaming(String inst_name, String source_file_stream, String target_file_stream, String target_server_conn_string, Int32 stream_copy_batch_mb)
        {
            instance_name = inst_name;
            source_file = source_file_stream;
            target_file = target_file_stream;
            target_conn_string = target_server_conn_string;
            copy_block_size = 1024 * 1000 * stream_copy_batch_mb;
        }
        #endregion


        #region relay
        [SqlFunction(DataAccess = DataAccessKind.Read)]
        public Boolean Relay()
        {

            Stopwatch timer = new Stopwatch();

            Boolean is_success = false;
            FileInfo source_file_info = null;


            BinaryReader br = null;
            MSSQL target_sql = null;

            FileRelayStat stat = null;

            byte[] bytes = null;

            try
            {

                timer.Start();

                FileRelay.FindStat(instance_name, ref stat);

                source_file_info = new FileInfo(source_file);

                source_size = source_file_info.Length;

                stat.source_size = source_size;
                if (source_size == 0)
                {

                    throw new Exception("source file size must be grather than 0");
                }

                if (target_file == String.Empty)
                {
                    throw new Exception("target file ad path cannot be empty");
                }

                target_sql = new MSSQL(target_conn_string, 10);

                GetTargetFileSize(ref target_sql);

                stat.target_start_size = stat.target_size = target_size;

                stat.current_step = "Open file to read: " + source_file;


                br = new BinaryReader(File.Open(source_file, FileMode.Open, FileAccess.Read));
                Boolean is_write_ok = true;

                stat.current_step = "Start Relaying";

                while (is_write_ok && target_size < source_size)
                {

                    pos = target_size;
                    required = (target_size + copy_block_size > source_size) ? (Int32)(source_size - target_size) : copy_block_size;

                    // Seek to our required position.
                    br.BaseStream.Seek(pos, SeekOrigin.Begin);


                    // Read the next X-bytes.
                    bytes = br.ReadBytes(required);

                    stat.ellapsed_ms_reader_last = timer.ElapsedMilliseconds;
                    stat.ellapsed_ms_reader_total += timer.ElapsedMilliseconds;
                    stat.ellapsed_ms_total += timer.ElapsedMilliseconds;
                    timer.Restart();

                    is_write_ok = RelayBytes(ref bytes, ref target_sql, 10);

                    stat.ellapsed_ms_writer_last = timer.ElapsedMilliseconds;
                    stat.ellapsed_ms_writer_total += timer.ElapsedMilliseconds;
                    stat.ellapsed_ms_total += timer.ElapsedMilliseconds;
                    timer.Restart();

                    stat.target_size = target_size += copy_block_size;
                    bytes = null;

                    stat.EstimateTimes();

                }


                if (is_write_ok)
                {
                    is_success = true;
                }

            }
            catch (Exception e)
            {
                throw new Exception(e.Message.ToString());
            }
            finally
            {
                source_file_info = null;
                if (br != null)
                {
                    br.Close();
                    br.Dispose();
                    br = null;
                }
                bytes = null;
                timer = null;
            }


            return is_success;
        }
        #endregion


        #region relay - chunk of bytes
        [SqlFunction(DataAccess = DataAccessKind.Read)]
        public Boolean RelayBytes(ref byte[] bytes, ref MSSQL sql, Int32 retry)
        {
            Boolean is_success = false;

            try
            {
                sql.CreateNewCommand();
                sql.AddParameter("target_file_stream", target_file, SqlDbType.VarChar, ParameterDirection.Input);
                sql.AddParameter("position", target_size, SqlDbType.BigInt, ParameterDirection.Input);
                sql.AddParameter("bytes", bytes, SqlDbType.VarBinary, ParameterDirection.Input);
                sql.AddParameter("retry", retry, SqlDbType.Int, ParameterDirection.Input);
                sql.AddParameter("return", DBNull.Value, SqlDbType.Bit, ParameterDirection.ReturnValue);

                Object res = sql.ExecuteFunction("clr_filerelay_write_bytes", 10);

                if (res != null)
                {
                    is_success = (Boolean)res;
                }

            }
            catch (Exception e)
            {
                throw new Exception(e.Message);
            }
            finally
            {
                sql.DisposeReader();
            }


            return is_success;
        }
        #endregion


        #region get - target file size
        public void GetTargetFileSize(ref MSSQL sql)
        {

            try
            {
                sql.CreateNewCommand();
                sql.AddParameter("filename", target_file, SqlDbType.VarChar, ParameterDirection.Input);
                sql.AddParameter("return", DBNull.Value, SqlDbType.Int, ParameterDirection.ReturnValue);

                Object res = sql.ExecuteFunction("clr_filerelay_get_file_size", 10);


                if (res != null)
                {
                    target_size = (Int64)res;
                }

            }
            catch (Exception e)
            {
                throw new Exception(e.Message);
            }


        }
        #endregion





    }

   
    public class FileRelayStat
    {
        public String current_step;
        public String source_file_stream;
        public String target_file_stream;
        public Int32 stream_copy_batch_bytes;
        public Int64 source_size = 0;
        public Int64 target_start_size = 0;
        public Int64 target_size = 0;
        public Int64 ellapsed_ms_total = 0;
        public Int64 ellapsed_ms_reader_total = 0;
        public Int64 ellapsed_ms_reader_last = 0;
        public Int64 ellapsed_ms_writer_total = 0;
        public Int64 ellapsed_ms_writer_last = 0;

        public Int64 estimated_sec_remaining = 0;
        public Int32 percent_completed = 0;
        public Int32 avg_kb_reads_per_sec = 0;
        public Int32 avg_kb_writes_per_sec = 0;
        public Int64 total_bytes_copied = 0;

        public FileRelayStat(String current_step, String src_file_stream, String trgt_file_stream, Int32 strm_copy_batch_bytes)
        {
            source_file_stream = src_file_stream;
            target_file_stream = trgt_file_stream;
            stream_copy_batch_bytes = strm_copy_batch_bytes;
        }

        public void EstimateTimes()
        {
            total_bytes_copied = target_size - target_start_size;
            estimated_sec_remaining = (source_size - target_size) * (ellapsed_ms_total / 1000) / total_bytes_copied;
            percent_completed = (Int32)(target_size * 100 / source_size);
            avg_kb_reads_per_sec = (Int32)(total_bytes_copied / ellapsed_ms_reader_total);
            avg_kb_writes_per_sec = (Int32)(total_bytes_copied / ellapsed_ms_writer_total);
        }

    }


    public class FileRelayError
    {
        public String instance;
        public String message;
    }


    public class FileRelay
    {
        public static Object sync_root = new Object();
        public static Dictionary<String, FileRelayStat> Stat = new Dictionary<String, FileRelayStat>();
        public static Dictionary<Int32, FileRelayError> Errors = new Dictionary<Int32, FileRelayError>();


        #region file relay

        #region Copy
        [SqlFunction(DataAccess = DataAccessKind.Read)]
        public static Boolean Copy(String instance_name, String source_file_stream, String target_file_stream, String target_server_conn_string, Int32 stream_copy_batch_mb)
        {
            Boolean is_done = false;
            FileStreaming fs;
            FileRelayStat stat = null;

            try
            {
                stat = new FileRelayStat("Stat - Initialized", source_file_stream, target_file_stream, stream_copy_batch_mb);

                CreateStatInstance(instance_name, stat);

                fs = new FileStreaming(instance_name, source_file_stream, target_file_stream, target_server_conn_string, stream_copy_batch_mb);

                stat.current_step = "FileStreaming - Initialized";

                is_done = fs.Relay();

                stat.current_step = "Relay - Finished";

            }
            catch (Exception e)
            {

                FileRelay.AddError(instance_name, e.Message);

            }
            finally
            {
                fs = null;
                stat = null;
                RemoveStatInstance(instance_name);
            }

            return is_done;
        }
        #endregion

        #region WriteBytes
        [SqlFunction(DataAccess = DataAccessKind.Read)]
        public static Boolean WriteBytes(String target_file_stream, Int64 position, byte[] bytes, Int32 retry)
        {
            Boolean is_success = false;

            while (!is_success && retry-- > 0)
            {
                BinaryWriter bw = null;

                try
                {
                    bw = new BinaryWriter(File.Open(target_file_stream, FileMode.OpenOrCreate, FileAccess.Write));
                    bw.BaseStream.Seek(position, SeekOrigin.Begin);
                    bw.Write(bytes);

                    is_success = true;
                }
                catch
                {
                    Thread.Sleep(1000);
                }
                finally
                {
                    bw.Flush();
                    bw.Close();
                    bw.Dispose();
                }

            }

            return is_success;
        }
        #endregion

        #region GetFileSize
        [SqlFunction(DataAccess = DataAccessKind.Read)]
        public static Int64 GetFileSize(String filename)
        {
            Int64 file_size = 0;
            FileInfo fi = null;

            try
            {
                fi = new FileInfo(filename);
                file_size = fi.Length;
            }
            catch
            {
            }
            finally
            {
                fi = null;
            }

            return file_size;
        }
        #endregion

        #endregion


        #region stat

        #region stat - GetStat - instance
        [SqlFunction(FillRowMethodName = "GetStat", TableDefinition = "current_step nvarchar(1000), source_file_stream nvarchar(4000), target_file_stream nvarchar(4000), stream_copy_batch_bytes int, source_size int, target_start_size int, target_size int, ellapsed_sec_total int,  ellapsed_sec_reader_total int, ellapsed_sec_reader_last int, ellapsed_sec_writer_total int, ellapsed_sec_writer_last int, estimated_sec_remaining int, percent_completed int, avg_kb_reads_per_sec int, avg_kb_writes_per_sec int, total_bytes_copied int")]
        public static IEnumerable GetInstanceStats(String instance_name)
        {
            FileRelayStat frs = null;
            Dictionary<Int32, FileRelayStat> relay_stats = new Dictionary<Int32, FileRelayStat>();


            lock (FileRelay.sync_root)
            {
                FileRelay.Stat.TryGetValue(instance_name, out frs);

                if (frs != null)
                {
                    relay_stats.Add(1, frs);
                }
            }

            return relay_stats;
        }
        #endregion

        #region stat - GetStat : fill row
        private static void GetStat(Object obj,
                                    out SqlString current_step,
                                    out SqlString source_file_stream,
                                    out SqlString target_file_stream,
                                    out SqlInt32 stream_copy_batch_bytes,
                                    out SqlInt64 source_size,
                                    out SqlInt64 target_start_size,
                                    out SqlInt64 target_size,
                                    out SqlInt32 ellapsed_sec_total,
                                    out SqlInt32 ellapsed_sec_reader_total,
                                    out SqlInt32 ellapsed_sec_reader_last,
                                    out SqlInt32 ellapsed_sec_writer_total,
                                    out SqlInt32 ellapsed_sec_writer_last,
                                    out SqlInt32 estimated_sec_remaining,
                                    out SqlInt32 percent_completed,
                                    out SqlInt32 avg_kb_reads_per_sec,
                                    out SqlInt32 avg_kb_writes_per_sec,
                                    out SqlInt64 total_bytes_copied)
        {

            KeyValuePair<Int32, FileRelayStat> stat = (KeyValuePair<Int32, FileRelayStat>)obj;

            current_step = new SqlString(stat.Value.current_step);
            source_file_stream = new SqlString(stat.Value.source_file_stream);
            target_file_stream = new SqlString(stat.Value.target_file_stream);
            stream_copy_batch_bytes = new SqlInt32(stat.Value.stream_copy_batch_bytes);
            source_size = new SqlInt64(stat.Value.source_size);
            target_start_size = new SqlInt64(stat.Value.target_start_size);
            target_size = new SqlInt64(stat.Value.target_size);
            ellapsed_sec_total = new SqlInt32((Int32)stat.Value.ellapsed_ms_total / 1000);
            ellapsed_sec_reader_total = new SqlInt32((Int32)stat.Value.ellapsed_ms_reader_total / 1000);
            ellapsed_sec_reader_last = new SqlInt32((Int32)stat.Value.ellapsed_ms_reader_last / 1000);
            ellapsed_sec_writer_total = new SqlInt32((Int32)stat.Value.ellapsed_ms_writer_total / 1000);
            ellapsed_sec_writer_last = new SqlInt32((Int32)stat.Value.ellapsed_ms_writer_last / 1000);
            estimated_sec_remaining = new SqlInt32((Int32)stat.Value.estimated_sec_remaining);
            percent_completed = new SqlInt32(stat.Value.percent_completed);
            avg_kb_reads_per_sec = new SqlInt32(stat.Value.avg_kb_reads_per_sec);
            avg_kb_writes_per_sec = new SqlInt32(stat.Value.avg_kb_writes_per_sec);
            total_bytes_copied = new SqlInt64(stat.Value.total_bytes_copied);

        }
        #endregion

        #region stat - remove an instance
        [SqlFunction(DataAccess = DataAccessKind.Read)]
        private static void RemoveStatInstance(String instance_name)
        {
            lock (FileRelay.sync_root)
            {
                if (Stat.ContainsKey(instance_name))
                {
                    Stat.Remove(instance_name);
                }
            }
        }
        #endregion

        #region stat - create an instance
        [SqlFunction(DataAccess = DataAccessKind.Read)]
        private static void CreateStatInstance(String instance_name, FileRelayStat stat)
        {
            lock (FileRelay.sync_root)
            {
                if (Stat.ContainsKey(instance_name))
                {
                    throw new Exception("Instance Name already exists: " + instance_name);
                }
                else
                {
                    Stat.Add(instance_name, stat);
                }
            }
        }
        #endregion

        #region stat - modify instance
        [SqlFunction(DataAccess = DataAccessKind.Read)]
        public static void ModifyStat(String instance_name, FileRelayStat stat)
        {
            lock (FileRelay.sync_root)
            {
                if (Stat.ContainsKey(instance_name))
                {
                    FileRelay.Stat[instance_name] = stat;
                }
            }
        }
        #endregion

        #region stat - find an instance
        [SqlFunction(DataAccess = DataAccessKind.Read)]
        public static void FindStat(String instance_name, ref FileRelayStat stat)
        {
            lock (FileRelay.sync_root)
            {
                if (Stat.ContainsKey(instance_name))
                {
                    stat = FileRelay.Stat[instance_name];
                }
            }

        }
        #endregion 
       
        #endregion


        #region error

        #region error - add
        [SqlFunction(DataAccess = DataAccessKind.Read)]
        public static void AddError(String instance, String message)
        {
            lock (sync_root)
            {
                int key = Errors.Count;
                Errors.Add(key++, new FileRelayError() { instance = instance, message = message });
            }
        }
        #endregion

        #region errors - clear
        [SqlFunction(DataAccess = DataAccessKind.Read)]
        public static Boolean ClearErrors()
        {
            lock (sync_root)
            {
                Errors.Clear();
            }

            return true;
        }
        #endregion

        #region error - ListErrors - all
        [SqlFunction(FillRowMethodName = "GetError", TableDefinition = "instance nvarchar(1000), error_msg nvarchar(max)")]
        public static IEnumerable ListErrors()
        {
            return Errors;
        }
        #endregion

        #region error - ListErrors : fill rows
        private static void GetError(Object obj, out SqlString instance, out SqlString error_msg)
        {
            KeyValuePair<Int32, FileRelayError> error = (KeyValuePair<Int32, FileRelayError>)obj;

            instance = new SqlString(error.Value.instance);
            error_msg = new SqlString(error.Value.message);
        }
        #endregion

        
        #endregion

    }

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Database Developer
Hungary Hungary
2010-2012 - DB. Architect & Senior DB. Developer - www.livejasmin.com
2008-2010 - Senior DB. Developer - www.logmein.com
2006-2008 - DBA & DB. Developer - www.eurotaxglass.co.uk

Comments and Discussions