Brief Introduction of a Continuous SQL-stream Sending and Processing System (Part 1: SQLite)





5.00/5 (15 votes)
Application of SocketPro onto various databases for continuous inline request/result batching and real-time stream processing with bi-directional asynchronous data transferring
Introduction
Most of client server database systems only support synchronous communication between client and backend database by use of blocking socket and some chatty protocol that requires a client or server to wait for an acknowledgement before sending a new chunk of data. The wait time, which is also called as latency, could be starting from one tenth for a local area network (LAN) to hundreds of milliseconds for a wide area network (WAN). Large wait times can significantly degrade the quality of an application.
Fortunately, UDAParts has developed a powerful and secure communication framework named as SocketPro, which is written with continuous inline request/result batching and real-time stream processing capabilities by use of asynchronous data transferring and parallel computation for the best network efficiency, development simplicity, performance, scalability, and many great and even unique features at the site (https://github.com/udaparts/socketpro). For details, refer to its simple development guide file at ../socketpro/doc/dev_guide.htm. Specifically, it is highly recommended that you have a quick glance with SocketPro design ideas at the file ../socketpro/doc/sp_arch.htm.
Further, UDAParts has applied the powerful SocketPro framework onto a number of popular databases such as SQLite, MySQL/MariaDB and MS SQL as well as others through ODBC drivers to support continuous SQL-stream sending and processing. Additionally, most of these components for databases are totally free forever to the public with opened source codes for you to study and extend them to meet your complex needs.
For reduction of learning complexity, we use SQLite database as the first sample for the first article, and MySQL as the second sample for the second article.
Source Codes and Samples
All related source codes and samples are located at https://github.com/udaparts/socketpro. After cloning it into your computer by GIT, pay attention to the subdirectory usqlite inside the directory socketpro/stream_sql/usqlite.
You can see these samples are created from .NET, C/C++, Java, Python and nodejs development environments. They can be compiled and run on either Linux or windows platforms. UDAParts also distributes pre-compiled test server application all_servers
as shown at the doc page ../socketpro/doc/get_started.htm. Please follow this documentation guide to distribute SocketPro components. We are going to use the sample server all_servers
for coming testings.
The coming test codes come from the file ../socketpro/stream_sql/usqlite/test_sharp/Program.cs. We are going to focus C# development within this short article.
Two Basic Structures, ErrInfo and SQLExeInfo
The below code snippet 1 lists the two basic structures, ErrInfo
and SQLExeInfo
, for various DB-related request returning results.
//../socketpro/src/SproAdapter/socketerror.cs
public class ErrInfo
{
public ErrInfo(int res, string errMsg)
{
ec = res;
em = (null == errMsg) ? "" : errMsg;
}
public int ec = 0;
public string em = "";
public override string ToString()
{
string s = "ec: " + ec.ToString() + ", em: " + ((null == em) ? "" : em);
return s;
}
};
//../socketpro/src/sproadapter/asyncdbhandler.cs
public class SQLExeInfo : ErrInfo
{
public long affected;
public uint oks;
public uint fails;
public object lastId;
public SQLExeInfo(int res, string errMsg, long aff, uint oks, uint fails, object id)
: base(res, errMsg)
{
affected = aff;
this.oks = oks;
this.fails = fails;
lastId = id;
}
public override string ToString()
{
String s = base.ToString();
s += (", affected: " + affected);
s += (", oks: " + oks);
s += (", fails: " + fails);
s += (", lastId: " + lastId);
return s;
}
}
The first one, ErrInfo
, is used for returning results of database requests, open
, close
, prepare
, beginTrans
and endTrans
. It is easily understandable to you, because they are returned just with two data, an error code ec and its correspoding error message em
.
The second one, SQLExeInfo
, is designed for returning results of various SQL statements, execute
and executeBatch
. In addtion to an error code and its correspoding message, executing a SQL statement could return the number of records affected and last insert identification number, which correspond to the two members affected
and lastId
of the structure SQLExeInfo
, respectively. SocketPro database plugins support executing a complex SQL statement which consists of multiple basic SQL statements. Executing a complex SQL statement could generate multiple results of multiple basic SQL statements. SocketPro database plugins will count their successful ones and failure ones, and return them to client. Therefore, there are two members oks and fails within the structure for successes and failures, respectively. However, no matter how many failures happens with a batch of basic SQL statements, SocketPro database plugins always return one error code and its message of the first failed SQL statement only.
Main Function
SocketPro is written from bottom to support parallel computation by use of one or more pools of non-blocking sockets. Each of the pools may be made of one or more threads and each of the threads hosts one or more non-blocking sockets at client side. However, we just use one pool for clear demonstration here, and the pool is made of one thread and one socket for this sample at client side as shown at the below Code snippet 2.
using System;
using System.Collections.Generic;
using SocketProAdapter;
using SocketProAdapter.ClientSide;
using SocketProAdapter.UDB;
using System.Threading.Tasks;
using KeyValue = System.Collections.Generic.KeyValuePair
<SocketProAdapter.UDB.CDBColumnInfoArray,SocketProAdapter.UDB.CDBVariantArray>;
class Program
{
static readonly string m_wstr;
static readonly string m_str;
static Program()
{
// ......
}
static void Main(string[] args)
{
Console.WriteLine("Remote host: ");
string host = Console.ReadLine();
CConnectionContext cc = new CConnectionContext(host, 20901,
"usqlite_client", "password_for_usqlite");
using (CSocketPool<CSqlite> spSqlite = new CSocketPool<CSqlite>())
{
//spSqlite.QueueName = "qsqlite";
if (!spSqlite.StartSocketPool(cc, 1)) //line 29, one async socket/one worker thread
{
Console.WriteLine("Failed in connecting to remote async sqlite server");
Console.WriteLine("Press any key to close the application ......");
Console.Read();
return;
}
CSqlite sqlite = spSqlite.Seek(); //line 36
//a container for receiving all tables data
List<KeyValue> lstRowset = new List<KeyValue>(); //line 38
try
{
//stream all DB requests with in-line batching for the best network efficiency
//open a global database at server side because an empty string is given
var topen = sqlite.open(""); //line 43
//prepare two test tables, COMPANY and EMPLOYEE
Task<CAsyncDBHandler.SQLExeInfo>[] vT = TestCreateTables(sqlite);
var tbt = sqlite.beginTrans(); //line 46, start manual transaction
//test both prepare and query statements
var tp0 = TestPreparedStatements(sqlite, lstRowset);
//test both prepare and query with reading/updating BLOB and large text
var tp1 = TestBLOBByPreparedStatement(sqlite, lstRowset);
var tet = sqlite.endTrans(); //line 51, end manual transaction
var vB = TestBatch(sqlite, lstRowset); //line 52
Console.WriteLine("All DB/SQL requests streamed & waiting for their results");
Console.WriteLine(topen.Result); //line 55
foreach (var e in vT)
{
Console.WriteLine(e.Result);
}
Console.WriteLine(tbt.Result);
Console.WriteLine(tp0.Result);
Console.WriteLine(tp1.Result);
Console.WriteLine(tet.Result);
foreach (var e in vB)
{
Console.WriteLine(e.Result);
} //line 67
}
catch (AggregateException ex) //line 69
{
foreach (Exception e in ex.InnerExceptions)
{
//An exception from server (CServerError), Socket closed after
//sending a request (CSocketError) or request canceled (CSocketError),
Console.WriteLine(e);
}
}
catch (CSocketError ex)
{
//Socket is already closed before sending a request
Console.WriteLine(ex);
}
catch (Exception ex)
{
//bad operations such as invalid arguments, bad operations and
//de-serialization errors, and so on
Console.WriteLine(ex);
} //line 88
//display received rowsets
int index = 0;
Console.WriteLine();
Console.WriteLine("+++++ Start rowsets +++");
foreach (KeyValue it in lstRowset)
{
Console.Write("Statement index = {0}", index);
if (it.Key.Count > 0)
Console.WriteLine(", rowset with columns: {0}, records: {1}.",
it.Key.Count, it.Value.Count / it.Key.Count);
else
Console.WriteLine(", no rowset received.");
++index;
}
Console.WriteLine("+++++ End rowsets +++");
Console.WriteLine();
Console.WriteLine("Press any key to close the application ......");
Console.Read();
}
}
static Task<CAsyncDBHandler.SQLExeInfo>[] TestBatch(CSqlite sqlite, List<KeyValue> ra)
{
// ......
}
static Task<CAsyncDBHandler.SQLExeInfo> TestPreparedStatements(CSqlite sqlite, List<KeyValue> ra)
{
// ......
}
static Task<CAsyncDBHandler.SQLExeInfo> TestBLOBByPreparedStatement(CSqlite sqlite, List<KeyValue> ra)
{
// ......
}
static Task<CAsyncDBHandler.SQLExeInfo>[] TestCreateTables(CSqlite sqlite)
{
// ......
}
}
Starting one socket pool: The above code snippet 2 starts one socket pool at line 29 which only has one worker thread that only hosts one non-blocking socket for demonstration clarity by use of one instance of connection context. However, you can create multiple pools within one client application if necessary. Afterwards, we get one asynchronous sqlite handler at line 36.
Opening database: We can send a request to open a sqlite server database as shown at line 43. If the first input is an empty or null string
as shown in this example, we are opening one instance of server global database usqlite.db, for example. If you like to create an own database, you can simply give a non-empty valid string
. Its returning result topen
is a task for an expected structure ErrInfo
in the future.
Streaming SQL statements: Keep in mind that SocketPro supports streaming all types of any number of requests on one non-blocking socket session effortlessly by design. Certainly, we are able to stream all SQL statements as well as others as shown at line 43 through 52. All SocketPro SQL-stream services support this particular feature for the best network efficiency, which significantly improves data accessing performance. As far as we know, you cannot find such a wonderful feature from other technologies. If you find one, please let us know. Like normal database accessing APIs, SocketPro SQL-stream technology supports manual transaction too as shown at lines 46 and 51. We are going to elaborate the four functions, TestCreateTables
, TestPreparedStatements
, TestBLOBByPreparedStatement
and TestBatch
in successive sections. It is noted that all the four methods immediately return one or multiple of tasks for structure SQLExeInfo
in the future.
It is noted that SocketPro supports only asynchronous data transferring between client and server so that all requests and results are streamed and batched with inline algorithm at both client and server sides for the best network efficiency. This is completely different from synchronous data transferring.
Waiting until all processed: Since SocketPro uses asynchronous data transferring by default, SocketPro must provide a way to wait until all requests and returning results are sent, returned and processed. As shown at line 55 through 67, we can use task property Result
to wait until completion. In addition, you can also use the key word await
or task method Wait
to wait until the completion of all DB/SQL requests.
Error Handling: The code at line 69 through 88 shows you how to deal with a varity of errors, which includes server error CServerError
, and socket communication and request canceled error CSocketError
as well as other types of errors.
TestCreateTables
This function is internally made of sending two SQL DDL statements for creating two tables, COMPANY
and EMPLOYEE
, as shown in the below code snippet 3.
static Task<CAsyncDBHandler.SQLExeInfo>[] TestCreateTables(CSqlite sqlite)
{
var v = new Task<CAsyncDBHandler.SQLExeInfo>[2];
v[0] = sqlite.execute("CREATE TABLE COMPANY(ID INT8 PRIMARY KEY NOT NULL,name CHAR(64)"+
"NOT NULL,ADDRESS varCHAR(256)not null,Income float not null)");
v[1] = sqlite.execute("CREATE TABLE EMPLOYEE(EMPLOYEEID INT8 PRIMARY KEY NOT NULL unique,"+
"CompanyId INT8 not null,name NCHAR(64)NOT NULL,JoinDate DATETIME not null " +
"default(datetime('now')),IMAGE BLOB,DESCRIPTION NTEXT,Salary real,"+
"FOREIGN KEY(CompanyId)REFERENCES COMPANY(id))");
return v;
}
You can execute any number of SQL statements in stream without waiting for prior request returning as shown in the code snippet 3. Each of the requests takes one input SQL statement, and immediately returns a task for a structure SQLExeInfo
in the future. Again, this is different from common database accessing approach as SocketPro uses asynchronous data transferring for communication.
TestPreparedStatements
SocketPro SQL-stream technology supports preparing SQL statement just like common database accessing APIs. Particularly, SocketPro SQL-stream technology even supports preparing multiple SQL statements at one shot for SQLite server database as shown in the below code snippet 4.
static Task<CAsyncDBHandler.SQLExeInfo> TestPreparedStatements(CSqlite sqlite, List<KeyValue> ra)
{
//a complex SQL statement combined with query and insert prepare statements
sqlite.Prepare("Select datetime('now');" + //line 4
"INSERT OR REPLACE INTO COMPANY(ID,NAME,ADDRESS,Income)VALUES(?,?,?,?)"); //line 5
CDBVariantArray vData = new CDBVariantArray();
vData.Add(1);
vData.Add("Google Inc.");
vData.Add("1600 Amphitheatre Parkway, Mountain View, CA 94043, USA");
vData.Add(66000000000.0);
vData.Add(2);
vData.Add("Microsoft Inc.");
vData.Add("700 Bellevue Way NE- 22nd Floor, Bellevue, WA 98804, USA");
vData.Add(93600000000.0);
vData.Add(3);
vData.Add("Apple Inc.");
vData.Add("1 Infinite Loop, Cupertino, CA 95014, USA");
vData.Add(234000000000.0);
//send three sets of parameterized data in one shot for processing
return sqlite.execute(vData, (handler, rowData) =>
{
//rowset data come here
int last = ra.Count - 1;
KeyValue item = ra[last];
item.Value.AddRange(rowData);
}, (handler) =>
{
//rowset header meta info comes here
KeyValue item = new KeyValue(handler.ColumnInfo, new CDBVariantArray());
ra.Add(item);
});
}
It is noted that the sample preparing SQL statement consists of one query and one insert
statement. When the function is called, a client will expect three sets of records returned and three records inserted into the table COMPANY
. The sample is designed for demonstrating the power of SocketPro SQL-stream technology. In reality, you probably don't prepare a combined SQL statement having multiple basic SQL statements. If you use a parameterized statement, you are required to send a prepare request first as shown at lines 4 and 5. After obtaining an array of data as shown in the above code snippet 4, you can send multiple sets of parameter data for processing from client to server in one single shot at the end. If you have a large amount of data, you could call the method execute
repeatedly without needing to prepare a statement again.
Next, we need more details for how to handle returning record sets. The method execute
has two callbacks or Lambda expressions for the second and third input parameters except the first input for parameter data array. Whenever a record set is coming, the second callback ((handler) =>{ ......}) will be automatically called by SQLite client handler for record set column meta information. If actual records are available, the first callback ((handler, rowData) => { ......}) will be called and you can populate data into a container ra
. If we take this code snippet 4 as a sample, the two callbacks will be called three times, but it is expected that the times of calling the first callback is dependent on both the number of records and the size of one record.
TestBLOBByPreparedStatement
Now, you can see SocketPro SQL-stream technology provides all required features for accessing a backend database. Further, we are going to demonstrate how to handle large binary and text objects within SocketPro-stream technology. Usually, it is difficult to access large objects inside databases efficiently. However, it is truly very simple with SocketPro SQL-stream technology for both development and efficiency as shown at the below code snippet 5.
After looking through the code snippet 5, you would find that this code snippet is really the same as the previous code snippet 4 although this code snippet is longer. Therefore, this approach is really a good thing for a software developer to reuse SocketPro SQL-stream technology for handling all types of database table fields in the same coding style for easy development.
SocketPro always divides a large binary or text object into chunks first at both client and server sides. Afterwards, SocketPro sends these smaller chunks to the other side. At the end, SocketPro will reconstruct the original large binary or text object from collected smaller chunks. This happens silently at run time for reduction of memory foot print.
static Task<CAsyncDBHandler.SQLExeInfo> TestBLOBByPreparedStatement(CSqlite sqlite, List<KeyValue> ra)
{
//a complex SQL statement combined with two insert and query prepare statements
sqlite.Prepare("insert or replace into employee(EMPLOYEEID,CompanyId,name,JoinDate,imag,"+
"DESCRIPTION,Salary)values(?,?,?,?,?,?,?);select * from employee where employeeid=?");
CDBVariantArray vData = new CDBVariantArray();
using (CScopeUQueue sbBlob = new CScopeUQueue())
{
//first set of data
vData.Add(1);
vData.Add(1); //google company id
vData.Add("Ted Cruz");
vData.Add(DateTime.Now);
sbBlob.Save(m_wstr);
vData.Add(sbBlob.UQueue.GetBuffer());
vData.Add(m_wstr);
vData.Add(254000.0);
vData.Add(1);
//second set of data
vData.Add(2);
vData.Add(1); //google company id
vData.Add("Donald Trump");
vData.Add(DateTime.Now);
sbBlob.UQueue.SetSize(0);
sbBlob.Save(m_str);
vData.Add(sbBlob.UQueue.GetBuffer());
vData.Add(m_str);
vData.Add(20254000.0);
vData.Add(2);
//third set of data
vData.Add(3);
vData.Add(2); //Microsoft company id
vData.Add("Hillary Clinton");
vData.Add(DateTime.Now);
sbBlob.Save(m_wstr);
vData.Add(sbBlob.UQueue.GetBuffer());
vData.Add(m_wstr);
vData.Add(6254000.0);
vData.Add(3);
}
//send three sets of parameterized data in one shot for processing
return sqlite.execute(vData, (handler, rowData) =>
{
//rowset data come here
int last = ra.Count - 1;
KeyValue item = ra[last];
item.Value.AddRange(rowData);
}, (handler) =>
{
//rowset header meta info comes here
KeyValue item = new KeyValue(handler.ColumnInfo, new CDBVariantArray());
ra.Add(item);
});
}
TestBatch
SocketPro also provides a special method to group all DB and SQL requests such as prepare
, beginTrans
, endTrans
and execute
into a large batch request as shown at the below code snippet 6.
static Task<CAsyncDBHandler.SQLExeInfo>[] TestBatch(CSqlite sqlite, List<KeyValue> ra)
{
var v = new Task<CAsyncDBHandler.SQLExeInfo>[2];
CDBVariantArray vParam = new CDBVariantArray();
vParam.Add(1); //ID
vParam.Add(2); //EMPLOYEEID
//there is no manual transaction if isolation is tiUnspecified
v[0] = sqlite.executeBatch(tagTransactionIsolation.tiUnspecified, //line 8
"Select datetime('now');select * from COMPANY where ID=?;"+
"select * from EMPLOYEE where EMPLOYEEID=?",
vParam, (handler, rowData) =>
{
//rowset data come here
int last = ra.Count - 1;
KeyValue item = ra[last];
item.Value.AddRange(rowData);
}, (handler) =>
{
//rowset header meta info comes here
KeyValue item = new KeyValue(handler.ColumnInfo, new CDBVariantArray());
ra.Add(item);
});
vParam.Clear();
vParam.Add(1); //ID
vParam.Add(2); //EMPLOYEEID
vParam.Add(2); //ID
vParam.Add(3); //EMPLOYEEID
//Same as sqlite.beginTrans();
//Select datetime('now');
//select * from COMPANY where ID=1;
//select * from COMPANY where ID=2;
//Select datetime('now');
//select * from EMPLOYEE where EMPLOYEEID=2;
//select * from EMPLOYEE where EMPLOYEEID=3
//ok = sqlite.endTrans(tagRollbackPlan.rpDefault);
v[1] = sqlite.executeBatch(tagTransactionIsolation.tiReadCommited, //line 36
"Select datetime('now');select * from COMPANY where ID=?;"+
"Select datetime('now');select * from EMPLOYEE where EMPLOYEEID=?",
vParam, (handler, rowData) =>
{
//rowset data come here
int last = ra.Count - 1;
KeyValue item = ra[last];
item.Value.AddRange(rowData);
}, (handler) =>
{
//rowset header meta info comes here
KeyValue item = new KeyValue(handler.ColumnInfo, new CDBVariantArray());
ra.Add(item);
}); //line 50
return v;
}
Inside the method TestBatch
, we call the method executeBatch
at line 8 with the first input as tiUnspecified
. The batch is excuted with querying three sets of records plus executing one prepare statement at server side because there is an arry of input parameter data. If the third input parameter is empty, the method executeBatch
would be the same as the method execute
.
We can also group a bunch of DB and SQL requests with the first input other than tiUnspecified
as shown at line 36 through 50, although the sample prepare statement does not have to require a manual transaction here. As commented inside the code snippet here, it involves these methods beginTrans
/endTrans
, prepare
, execute
, and so on.
The method executeBatch
has a number of adantages such as better performance and cleaner code as well as better integration with SocketPro client message queue for better failure auto-recovery, which will be discussed at the last section.
Performance Study
SocketPro SQL-stream technology has excellent performance in database data accessing for both query and update too. You can see two performance test projects (cppperf
and netperf
) available at ../socketpro/stream_sql/usqlite/DBPerf. The first sample is written by C++ and the other by C#. In addition, MySQL sakila
sample database, which is located in the directory ../socketpro/bin, is used for you to play after running the sample server all_servers
for creating a global SQLite database usqlite.db.
See the performance study data of the below Figure 1, which is obtained from three cheap Google cloud virtual machines with solid state drive for free evaluation. All data are time required in millisecond for executing 10,000 queries and 50,000 inserts. The performance study is also focused on influence of network latency on SQL accessing speed.
Figure 1: SQLite streaming performance study data of SocketPro SQL-stream technology on three cheap Google cloud virtual machines
Our performance study shows that it is easy to get query executed at the speed of 7,400 (10,000/1.36) times per second and socket connection. For inserting records, you can easily get the speed like 120,000 (50,000/0.42) inserts per second for SQLite on local area network (LAN, cross-machine). SocketPro streaming could improve 140% in performance over traditional non-streaming approach (SocketPro + Sync).
In regards to wide area network (WAN, cross-region), the query speed could be 4,000 (10,000/2.24) times per second and socket connection. For inserting records, the speed could easily be 20,000 records (50,000/2.51) per second. Contrarily, the query speed will be as low as 30 queries per second on WAN if a client uses traditional communication way (non-streaming) for database accessing because of high latency. SocketPro SQL streaming can be more than 150 (346000/2240) times faster over non-streaming technology if database backend processing time is ignorable in comparison to IO communication time on WAN (cross-region) having a high latency. After analyzing the performance data at the above Figure 1, you will find SocketPro streaming technology is truly great for speeding up not only local but also remoting database accessing.
The above performance study was completed on WAN having bandwidth around 40 Mbps for cross-region communication. It is imagined that performance data for WAN would be much better if the test WAN have better network bandwidth. Further, SocketPro supports inline compression too, but this test study doesn’t use this feature. If SocketPro inline compression feature is employed, its streaming test data will be further improved on WAN. At last, the performance study is completed on cheap virtual machines with one or two CPUs only. The performance data would be considerably improved if dedicated machines are used for testing.
Executing SQL statements in parallel with fault auto recovery
Parallel computation: After studying the previous two simple examples, it is time to study the coming third sample at the directory socketpro/samples/auto_recovery/(test_cplusplus|test_java|test_python|test_sharp). SocketPro is created from the bottom to support parallel computation. You can distribute multiple SQL statements onto different backend databases for processing concurrently. This feature is designed for improvement of application scalability as shown at the below code snippet 7.
using System;
using SocketProAdapter;
using SocketProAdapter.ClientSide;
class Program {
static void Main(string[] args) {
const int sessions_per_host = 2;
const int cycles = 10000;
string[] vHost = { "localhost", "192.168.2.172" };
using (CSocketPool<CSqlite> sp = new CSocketPool<CSqlite>()) {
sp.QueueName = "ar_sharp"; //set a local message queue to backup requests for auto fault recovery
CConnectionContext[,] ppCc = new CConnectionContext[1, vHost.Length * sessions_per_host]; //one thread enough
for (int n = 0; n < vHost.Length; ++n) {
for (int j = 0; j < sessions_per_host; ++j) {
ppCc[0, n * sessions_per_host + j] = new CConnectionContext(vHost[n], 20901, "AClientUserId", "Mypassword");
}
}
bool ok = sp.StartSocketPool(ppCc);
if (!ok) {
Console.WriteLine("There is no connection and press any key to close the application ......");
Console.Read(); return;
}
string sql = "SELECT max(amount), min(amount), avg(amount) FROM payment";
Console.WriteLine("Input a filter for payment_id");
string filter = Console.ReadLine();
if (filter.Length > 0) sql += (" WHERE " + filter);
var v = sp.AsyncHandlers;
foreach (var h in v) {
ok = h.Open("sakila.db", (hsqlite, res, errMsg) => {
if (res != 0)
Console.WriteLine("Error code: {0}, error message: {1}", res, errMsg);
});
}
int returned = 0;
double dmax = 0.0, dmin = 0.0, davg = 0.0;
SocketProAdapter.UDB.CDBVariantArray row = new SocketProAdapter.UDB.CDBVariantArray();
CAsyncDBHandler.DExecuteResult er = (h, res, errMsg, affected, fail_ok, lastId) => {
if (res != 0)
Console.WriteLine("Error code: {0}, error message: {1}", res, errMsg);
else {
dmax += double.Parse(row[0].ToString());
dmin += double.Parse(row[1].ToString());
davg += double.Parse(row[2].ToString());
}
++returned;
};
CAsyncDBHandler.DRows r = (h, vData) => {
row.Clear();
row.AddRange(vData);
};
CSqlite sqlite = sp.SeekByQueue(); //get one handler for querying one record
ok = sqlite.Execute(sql, er, r);
ok = sqlite.WaitAll();
Console.WriteLine("Result: max = {0}, min = {1}, avg = {2}", dmax, dmin, davg);
returned = 0;
dmax = 0.0; dmin = 0.0; davg = 0.0;
Console.WriteLine("Going to get {0} queries for max, min and avg", cycles);
for (int n = 0; n < cycles; ++n) {
sqlite = sp.SeekByQueue();
ok = sqlite.Execute(sql, er, r);
}
foreach (var h in v) {
ok = h.WaitAll();
}
Console.WriteLine("Returned = {0}, max = {1}, min = {2}, avg = {3}", returned, dmax, dmin, davg);
Console.WriteLine("Press any key to close the application ......"); Console.Read();
}
}
}
As shown in the above code snippet, we could start multiple non-blocking sockets to different machines (localhost, 192.168.2.172), and each of the two database machines has two sockets connected (const int sessions_per_host = 2;). The code opens a default database sakila.db (ok = h.Open("sakila.db" ......) for each of connections. First of all, the code executes one query ‘SELECT max(amount), min(amount), avg(amount) FROM payment
…’ at the beginning for one record. At last, the code sends the query 10,000 times onto the two machines for parallel processing. Each of records will be summed inside a Lambda expression as a callback for method Execute. It is noted that you can create multiple pools for different services hosted on different machines. As you can see, SocketPro socket pool can be used to significantly improve application scalability.
Auto fault recovery: SocketPro is able to open a file locally, and save all request data into it before sending these requests to a server through network. The file is called as local message queue or client message queue. The idea is simple to back up all requests for automatic fault recovery. To use this feature, you have to set a local message queue name (sp.QueueName = "ar_sharp";
). When we develop a real application, it is very common to write lots of code to deal with various communication errors properly. Actually, it is usually a challenge to software developers. SocketPro client message queue makes communication error handling very simple. Suppose the machine 192.168.2.172 is not accessible for one of whatever reasons like machine power off, unhandled exception, software/hardware maintenance and network unplug, and so on, the socket close event will be notified either immediately or sometime later. Once the socket pool finds a socket is closed, SocketPro will automatically merge all requests associated with the socket connection onto another socket which is not closed yet for processing.
To verify this feature, you can brutally down one of SQLite server (all_servers
) during executing the above query and see if the final results are correct.
It is noted that UDAParts has applied this feature to all SocketPro SQL-stream services, asynchronous persistent message queue service and remote file exchange service to simplify your development.
Points of Interest
SocketPro SQLite SQL-stream service provides all required basic client/server database features, but it does deliver the following unique features:
- Continuous inline request/result batching and real-time SQL-stream processing for the best network efficiency especially on WAN
- Bi-directional asynchronous data transferring between client and server, but all asynchronous requests can be converted into synchronous ones
- Superior performance and scalability because of powerful SocketPro communication architecture
- Real-time cache for table update, insert and delete. You can set a callback at client side for tracking table record add, delete and update events as shown at the sample project test_cache at the directory ../socketpro/stream_sql/usqlite/test_cache.
- All requests are cancelable by executing the method
Cancel
of classCClientSocket
at client side - Both windows and Linux are supported
- Simple development for all supported development languages
- Both client and server components are thread-safe. They can be easily reused within your multi-threaded applications with much fewer thread related issues
- All requests can be backed up at client side and resent to another server for processing in case a server is down for anyone of reasons – fault auto recovery
History
- 09/06/2017 ==> Initial release
- 09/30/2017 ==> Remove pictures and use code snippets instead as a codeproject.com officer suggested
- 03/28/2018 ==> Add two new sections, Performance study and Executing SQLs in parallel with fault auto recovery
- 12/30/2020 ==> Add a new section Two Basic Structures, ErrInfo and SQLExeInfo
- 12/30/2020 ==> Add a new section TestBatch
- 12/30/2020 ==> Main sample code updated with Task having less number of callbacks and fix a text error at code snippet 5