/* psync - A PostgreSQL synchronization tool
*
* The contents of this file are subject to the terms
* of the Common Development and Distribution License
* (the "License"). You may not use this file except
* in compliance with the License.
*
* You can obtain a copy of the license at
* http://www.opensource.org/licenses/cddl1.php.
* See the License for the specific language governing
* permissions and limitations under the License.
*
* Copyright 2009 by martin.faust@e56.de
*/
using System;
using System.Collections.Generic;
using npq;
namespace psync {
/// <summary>
/// The synchronization mode
/// </summary>
public enum SyncMode {
Complete,
Incremental
}
/// <summary>
/// PostgreSQL sync
/// </summary>
public class PSync : IDisposable {
PostgreSQL src = null;
PostgreSQL dst = null;
List<string> srcTables;
List<string> dstTables;
bool delete = false;
SyncMode mode = SyncMode.Incremental;
/// <value>
/// Should tables on destination deleted that are not existing on source
/// </value>
public bool Delete { get { return delete; } set { delete = value; }}
/// <value>
/// The synchronization mode
/// </value>
public SyncMode Mode { get { return mode; } set { mode = value; }}
/// <summary>
/// Constructor
/// </summary>
/// <param name="_src">The configuration string for the source database</param>
/// <param name="_dst">The configuration string for the destination database</param>
public PSync(string _src, string _dst) {
src = new PostgreSQL(_src);
dst = new PostgreSQL(_dst);
}
/// <summary>
/// Start synchronization
/// </summary>
public void Start() {
srcTables = ListAllTables(src);
dstTables = ListAllTables(dst);
if (srcTables == null || dstTables == null)
return;
Console.WriteLine("Source has #{0} tables...", srcTables.Count);
Console.WriteLine("Destination has #{0} tables...", dstTables.Count);
Console.WriteLine("Synchronizing tables...");
switch(mode) {
case SyncMode.Complete:
{
foreach(string table in srcTables) {
if (!dstTables.Contains(table))
CreateTable(table);
SyncContent(table);
}
} break;
case SyncMode.Incremental:
{
foreach(string table in srcTables) {
if (dstTables.Contains(table))
continue;
CreateTable(table);
SyncContent(table);
}
} break;
}
if (delete) {
Console.WriteLine("Deleting tables...");
foreach(string table in dstTables) {
if (srcTables.Contains(table))
continue;
Console.WriteLine(" - {0}", table);
dst.Exec(string.Format("drop table {0}", table)).Dispose();
}
}
}
/// <summary>
/// Free resources
/// </summary>
public void Dispose () {
if (src != null)
src.Dispose();
if (dst != null)
dst.Dispose();
}
#region Private Methods
void Progress(string title, float p, int size) {
int x = (int) Math.Floor(p * size);
Console.Write("{0,20} |", title);
for(int i=0; i<size; i++) {
if (i <= x)
Console.Write("=");
else
Console.Write("-");
}
Console.Write("| {0,3}%\r", Math.Floor(p*100.0f));
}
List<string> PrimaryKeys(string table) {
Result result;
result = src.Exec(string.Format("SELECT kcu.column_name FROM information_schema.table_constraints tc JOIN information_schema.key_column_usage kcu ON tc.constraint_catalog = kcu.constraint_catalog AND tc.constraint_name = kcu.constraint_name WHERE kcu.table_name = '{0}' and tc.constraint_type ='PRIMARY KEY'", table));
List<string> pkeys = new List<string>();
for(int row=0; row<result.Rows; row++)
pkeys.Add(result[row, 0]);
result.Dispose();
return pkeys;
}
List<int> AllOidColumns(string table) {
Result result;
List<int> oids = new List<int>();
result = src.Exec(string.Format("select data_type from information_schema.columns where table_name = '{0}' order by ordinal_position", table));
for(int row=0; row<result.Rows; row++) {
if (result[row, "data_type"].ToLower() == "oid")
oids.Add(row);
}
result.Dispose();
return oids;
}
/// <summary>
/// Creates a table on destination
/// </summary>
/// <param name="table">Table to be created</param>
void CreateTable(string table) {
Console.WriteLine("Creating table {0}...", table);
Result result;
List<string> pkeys = PrimaryKeys(table);
result = src.Exec(string.Format("select column_name, is_nullable, data_type from information_schema.columns where table_name = '{0}' order by ordinal_position", table));
string command = string.Format("create table {0}(", table);
for(int row=0; row<result.Rows; row++) {
string name = result[row, "column_name"];
string type = result[row, "data_type"];
string options = (result[row, "is_nullable"]=="YES"?"":"not null");
if (pkeys.Contains(name))
options += " primary key";
string comma = (row+1 < result.Rows?",":"");
command += string.Format("{0} {1} {2}{3}", name, type, options, comma);
}
command += ")";
result.Dispose();
Console.WriteLine(command);
try {
dst.Exec(command).Dispose();
} catch(PostgreSQLException e) {
Console.WriteLine(e.Message);
}
}
void SyncContent(string table) {
List<int> oids = AllOidColumns(table);
if (oids.Count == 0)
SyncContentNoOids(table);
else
SyncContentWithOids(table, oids);
Console.WriteLine();
}
int CopyOid(int oid) {
//Console.WriteLine("Copying large object {0}...", oid);
LargeObject lsrc = new LargeObject(src);
LargeObject ldst = new LargeObject(dst);
int id = ldst.Create();
lsrc.Open(oid);
ldst.Open(id);
byte[] buf = new byte[2048];
int s;
while ((s = lsrc.Read(buf, 2048)) > 0)
ldst.Write(buf, s);
lsrc.Close();
ldst.Close();
return id;
}
void SyncContentWithOids(string table, List<int> oids) {
Result result;
string command, _select, _where, _from;
bool first;
List<string> primaryKeys = PrimaryKeys(table);
result = src.Exec(string.Format("select * from {0}", table));
//result.Dump();
for(int row=0; row<result.Rows; row++) {
Progress(table, (float) row / (float) result.Rows, 60);
_select = "select";
_from = " from";
_where = " where";
// Compose the select block
first = true;
foreach(int oid in oids) {
if (!first)
_select += ",";
// Get the md5 hash value for the oid-column
_select += string.Format(" md5({0})", result.ColumnName(oid));
first = false;
}
// Data from the table we are currently working on
_from += table;
// The row identified by the primary keys of the table
first = true;
foreach(string key in primaryKeys) {
if (!first)
_where += " and";
_where += string.Format(" {0}='{1}'", key, result[row, result.ColumnIndex(key)]);
first = false;
}
command = _select + _from + _where;
Result r1, r2;
try {
r1 = dst.Exec(command);
if (r1.Rows > 0) {
r2 = src.Exec(command);
// Compare large objects
for(int c=0; c<r1.Columns; c++) {
if (r1[0, c] != r2[0, c]) {
int id = CopyOid(result.GetInt(row, oids[c]));
command = string.Format("update {0} set {1}='{2}'{3}", table, result.ColumnName(oids[c]), id, _where);
dst.Exec(command).Dispose();
}
}
r1.Dispose();
r2.Dispose();
}
else {
r1.Dispose();
command = string.Format("insert into {0} values(", table);
first = true;
for(int col=0; col<result.Columns; col++) {
if (!first)
command += ",";
command += string.Format(" '{0}'", result[row, col]);
first = false;
}
command += ")";
dst.Exec(command).Dispose();
foreach(int oid in oids) {
int id = CopyOid(result.GetInt(row, oid));
command = string.Format("update {0} set {1}='{2}'{3}", table, result.ColumnName(oid), id, _where);
dst.Exec(command).Dispose();
}
//Console.WriteLine("Insert data into table");
}
} catch(PostgreSQLException e) {
// Console.WriteLine(e.Message);
foreach(int oid in oids) {
int id = CopyOid(result.GetInt(row, oid));
command = string.Format("update {0} set {1}='{2}'{3}", table, result.ColumnName(oid), id, _where);
dst.Exec(command).Dispose();
}
}
}
result.Dispose();
Progress(table, 1.0f, 60);
}
void SyncContentNoOids(string table) {
Result result;
bool first;
List<string> pkeys = PrimaryKeys(table);
result = src.Exec(string.Format("select * from {0}", table));
for(int row=0; row<result.Rows; row++) {
Progress(table, (float) row / (float) result.Rows, 60);
string command;
// try update first
command = string.Format("update {0} set ", table);
first = true;
for(int col=0; col<result.Columns; col++) {
if (pkeys.Contains(result.ColumnName(col)))
continue;
if (!first)
command += ",";
command += string.Format("{0}='{1}'", result.ColumnName(col), result[row, col]);
first = false;
}
command += " where";
for(int i=0; i<pkeys.Count; i++) {
if (i != 0)
command += " and";
command += string.Format(" {0}='{1}'", pkeys[i], result[row, pkeys[i]]);
}
Result tmp = dst.Exec(command);
if (tmp.AffectedRows == 1) {
tmp.Dispose();
continue;
}
tmp.Dispose();
// insert values
command = string.Format("insert into {0} values(", table);
first = true;
for(int col=0; col<result.Columns; col++) {
if (!first)
command += ",";
command += string.Format("'{0}'", result[row, col]);
first = false;
}
command += ")";
dst.Exec(command).Dispose();
}
result.Dispose();
Progress(table, 1.0f, 60);
}
/// <summary>
/// Lists all tables in a database
/// </summary>
List<string> ListAllTables(PostgreSQL db) {
Result result = null;
try {
List<string> tables = new List<string>();
result = db.Exec("select table_name from information_schema.tables where table_schema='public' and table_type='BASE TABLE'");
for(int row=0; row<result.Rows; row++)
tables.Add(result[row, 0]);
result.Dispose();
return tables;
} catch(PostgreSQLException e) {
Console.WriteLine(e.Message);
if (result != null && result.Valid)
result.Dispose();
return null;
}
}
#endregion
}
}