Click here to Skip to main content
15,893,644 members
Articles / Programming Languages / C#

PostgreSQL Synchronization Tool

Rate me:
Please Sign up or sign in to vote.
4.11/5 (4 votes)
19 Aug 2009CDDL3 min read 39.8K   688   19  
Idea and implementation of a simple and easy database synchronization tool.
/* psync - A PostgreSQL synchronization tool
 * 
 * The contents of this file are subject to the terms
 * of the Common Development and Distribution License
 * (the "License").  You may not use this file except
 * in compliance with the License.
 *
 * You can obtain a copy of the license at
 * http://www.opensource.org/licenses/cddl1.php.
 * See the License for the specific language governing
 * permissions and limitations under the License.
 *
 * Copyright 2009 by martin.faust@e56.de
 */
using System;
using System.Collections.Generic;
using npq;

namespace psync {
	
	/// <summary>
	/// The synchronization mode
	/// </summary>
	public enum SyncMode {
		Complete,
		Incremental
	}
	
	/// <summary>
	/// PostgreSQL sync
	/// </summary>
	public class PSync : IDisposable {
		PostgreSQL src = null;
		PostgreSQL dst = null;
		
		List<string> srcTables;
		List<string> dstTables;
		
		bool delete = false;
		SyncMode mode = SyncMode.Incremental;
		
		/// <value>
		/// Should tables on destination deleted that are not existing on source
		/// </value>
		public bool Delete { get { return delete; } set { delete = value; }}
		
		/// <value>
		/// The synchronization mode
		/// </value>
		public SyncMode Mode { get { return mode; } set { mode = value; }}
		
		/// <summary>
		/// Constructor
		/// </summary>
		/// <param name="_src">The configuration string for the source database</param>
		/// <param name="_dst">The configuration string for the destination database</param>
		public PSync(string _src, string _dst) {
			src = new PostgreSQL(_src);
			dst = new PostgreSQL(_dst);
		}
		
		/// <summary>
		/// Start synchronization
		/// </summary>
		public void Start() {
			srcTables = ListAllTables(src);
			dstTables = ListAllTables(dst);

			if (srcTables == null || dstTables == null)
				return;
			
			Console.WriteLine("Source has #{0} tables...", srcTables.Count);
			Console.WriteLine("Destination has #{0} tables...", dstTables.Count);
			
			Console.WriteLine("Synchronizing tables...");
							
			switch(mode) {
			case SyncMode.Complete:
			{
				foreach(string table in srcTables) {
					if (!dstTables.Contains(table))
						CreateTable(table);
					SyncContent(table);
				}
			} break;
			case SyncMode.Incremental:
			{
				foreach(string table in srcTables) {
					if (dstTables.Contains(table))
						continue;
					CreateTable(table);
					SyncContent(table);
				}
			} break;
			}
						
			if (delete) {
				Console.WriteLine("Deleting tables...");
				foreach(string table in dstTables) {
					if (srcTables.Contains(table))
						continue;
					Console.WriteLine(" - {0}", table);
					dst.Exec(string.Format("drop table {0}", table)).Dispose();
				}
			}
		}
		
		/// <summary>
		/// Free resources
		/// </summary>
		public void Dispose () {
			if (src != null)
				src.Dispose();
			if (dst != null)
				dst.Dispose();
		}

#region Private Methods

		void Progress(string title, float p, int size) {
			int x = (int) Math.Floor(p * size);
			Console.Write("{0,20} |", title);
			for(int i=0; i<size; i++) {
				if (i <= x)
					Console.Write("=");
				else
					Console.Write("-");
			}
			Console.Write("| {0,3}%\r", Math.Floor(p*100.0f));
		}
		
		List<string> PrimaryKeys(string table) {
			Result result;
			
			result = src.Exec(string.Format("SELECT kcu.column_name FROM information_schema.table_constraints tc JOIN information_schema.key_column_usage kcu ON tc.constraint_catalog = kcu.constraint_catalog AND tc.constraint_name = kcu.constraint_name WHERE kcu.table_name = '{0}' and tc.constraint_type ='PRIMARY KEY'", table));
			List<string> pkeys = new List<string>();
			
			for(int row=0; row<result.Rows; row++)
				pkeys.Add(result[row, 0]);
			result.Dispose();
			return pkeys;
		}
		
		List<int> AllOidColumns(string table) {
			Result result;
			List<int> oids = new List<int>();
			
			result = src.Exec(string.Format("select data_type from information_schema.columns where table_name = '{0}' order by ordinal_position", table));
			for(int row=0; row<result.Rows; row++) {
				if (result[row, "data_type"].ToLower() == "oid")
					oids.Add(row);
			}
			result.Dispose();
			return oids;
		}
		
		/// <summary>
		/// Creates a table on destination
		/// </summary>
		/// <param name="table">Table to be created</param>
		void CreateTable(string table) {
			Console.WriteLine("Creating table {0}...", table);
			Result result;
			
			List<string> pkeys = PrimaryKeys(table);
			result = src.Exec(string.Format("select column_name, is_nullable, data_type from information_schema.columns where table_name = '{0}' order by ordinal_position", table));
			string command = string.Format("create table {0}(", table);
			for(int row=0; row<result.Rows; row++) {
				string name = result[row, "column_name"];
				string type = result[row, "data_type"];
				string options = (result[row, "is_nullable"]=="YES"?"":"not null");

				if (pkeys.Contains(name))
					options += " primary key";
				
				string comma = (row+1 < result.Rows?",":"");
				command += string.Format("{0} {1} {2}{3}", name, type, options, comma);
			}
			command += ")";	
			result.Dispose();
			Console.WriteLine(command);
			
			try {
				dst.Exec(command).Dispose();
			} catch(PostgreSQLException e) {
				Console.WriteLine(e.Message);
			}
		}
		
		void SyncContent(string table) {
			List<int> oids = AllOidColumns(table);
			if (oids.Count == 0)
				SyncContentNoOids(table);
			else
				SyncContentWithOids(table, oids);
			Console.WriteLine();
		}
		
		int CopyOid(int oid) {
			//Console.WriteLine("Copying large object {0}...", oid);
			LargeObject lsrc = new LargeObject(src);
			LargeObject ldst = new LargeObject(dst);
			int id = ldst.Create();
			lsrc.Open(oid);
			ldst.Open(id);
			byte[] buf = new byte[2048];
			int s;
			while ((s = lsrc.Read(buf, 2048)) > 0)
    			ldst.Write(buf, s);
			
			lsrc.Close();
			ldst.Close();
			return id;
		}
		
		void SyncContentWithOids(string table, List<int> oids) {
			Result result;
			string command, _select, _where, _from;
			bool first;
			List<string> primaryKeys = PrimaryKeys(table);
			
			result = src.Exec(string.Format("select * from {0}", table));
			//result.Dump();
			for(int row=0; row<result.Rows; row++) {
				Progress(table, (float) row / (float) result.Rows, 60);
				_select = "select";
				_from = " from";
				_where = " where";
	
				// Compose the select block
				first = true;
				foreach(int oid in oids) {
					if (!first)
						_select += ",";
					// Get the md5 hash value for the oid-column
					_select += string.Format(" md5({0})", result.ColumnName(oid));
					first = false;	
				}
	
				// Data from the table we are currently working on
				_from += table;
	
				// The row identified by the primary keys of the table
				first = true;
				foreach(string key in primaryKeys) {
	  				if (!first)
	    				_where += " and";
	  				_where += string.Format(" {0}='{1}'", key, result[row, result.ColumnIndex(key)]);
	  				first = false;
				}
				command = _select + _from + _where;
				
				Result r1, r2;
				try {
					r1 = dst.Exec(command);
					if (r1.Rows > 0) {
						r2 = src.Exec(command);
						
						// Compare large objects
						for(int c=0; c<r1.Columns; c++) {
							if (r1[0, c] != r2[0, c]) {
								int id = CopyOid(result.GetInt(row, oids[c]));
								command = string.Format("update {0} set {1}='{2}'{3}", table, result.ColumnName(oids[c]), id, _where);
								dst.Exec(command).Dispose();
							}
						}
						
						r1.Dispose();
						r2.Dispose();
					}
					else {
						r1.Dispose();
						command = string.Format("insert into {0} values(", table);
						first = true;
						for(int col=0; col<result.Columns; col++) {
							if (!first)
								command += ",";
							command += string.Format(" '{0}'", result[row, col]);
							first = false;
						}
						command += ")";
						dst.Exec(command).Dispose();
						foreach(int oid in oids) {
							int id = CopyOid(result.GetInt(row, oid));
							command = string.Format("update {0} set {1}='{2}'{3}", table, result.ColumnName(oid), id, _where);
							dst.Exec(command).Dispose();
						}
						//Console.WriteLine("Insert data into table");
					}
				} catch(PostgreSQLException e) {
					// Console.WriteLine(e.Message);
					foreach(int oid in oids) {
						int id = CopyOid(result.GetInt(row, oid));
						command = string.Format("update {0} set {1}='{2}'{3}", table, result.ColumnName(oid), id, _where);
						dst.Exec(command).Dispose();
					}
				}			
			}
			result.Dispose();
			Progress(table, 1.0f, 60);
		}
		
		void SyncContentNoOids(string table) {
			Result result;
			bool first;
			List<string> pkeys = PrimaryKeys(table);
			
			result = src.Exec(string.Format("select * from {0}", table));
			for(int row=0; row<result.Rows; row++) {
				Progress(table, (float) row / (float) result.Rows, 60);
				string command;

				// try update first
				command = string.Format("update {0} set ", table);
				first = true;
				for(int col=0; col<result.Columns; col++) {
					if (pkeys.Contains(result.ColumnName(col)))
						continue;
					if (!first)
						command += ",";
					command += string.Format("{0}='{1}'", result.ColumnName(col), result[row, col]);
					first = false;						                
				}
				command += " where";
				for(int i=0; i<pkeys.Count; i++) {
					if (i != 0)
						command += " and";
					command += string.Format(" {0}='{1}'", pkeys[i], result[row, pkeys[i]]);						
				}
				Result tmp = dst.Exec(command);
				if (tmp.AffectedRows == 1) {
					tmp.Dispose();
					continue;
				}
				tmp.Dispose();

				// insert values
				command = string.Format("insert into {0} values(", table);
				first = true;
				for(int col=0; col<result.Columns; col++) {
					if (!first)
						command += ",";
					command += string.Format("'{0}'", result[row, col]);
					first = false;						                
				}
				command += ")";
				dst.Exec(command).Dispose();
			}
			result.Dispose();
			Progress(table, 1.0f, 60);
		}
		
		/// <summary>
		/// Lists all tables in a database
		/// </summary>
		List<string> ListAllTables(PostgreSQL db) {
			Result result = null;
			try {			
				List<string> tables = new List<string>();

				result = db.Exec("select table_name from information_schema.tables where table_schema='public' and table_type='BASE TABLE'");
				for(int row=0; row<result.Rows; row++)
					tables.Add(result[row, 0]);
				result.Dispose();
				return tables;
			} catch(PostgreSQLException e) {
				Console.WriteLine(e.Message);
				if (result != null && result.Valid)
					result.Dispose();
				return null;
			}
		}
#endregion
	}
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Common Development and Distribution License (CDDL)


Written By
Architect
Germany Germany
Utilizing my experience of industry-scale real-time graphics programming, design, development of software, as well as European research projects I want to bring in new ideas for creative projects.
Companies:
- 2009-now BTC AG: software for the renewable energy sector.
- 2007-2009 Digital Media: Audio, Graphics and GIS Web Services(http://maps.bremen.de)
- 2001-2007 artecLab://art/work/technology: Mixed Reality, Computer Games and eLearning
- 1998-2001 STN ATLAS Elektronik: real-time graphics for ground warfare simulation

For a complete resume see http://e56.de/download/resume.pdf

Comments and Discussions