Click here to Skip to main content
13,706,450 members
Click here to Skip to main content
Add your own
alternative version

Stats

11K views
19 bookmarked
Posted 20 Sep 2017
Licenced CPOL

Brief Introduction of a Continuous SQL-stream Sending and Processing System (Part 2: MySQL)

, 27 May 2018
Rate this:
Please Sign up or sign in to vote.
Application of SocketPro onto various databases for continuous inline request/result batching and real-time stream processing with bi-directional asynchronous data transferring

Introduction

Most of client server database systems only support synchronous communication between client and backend database by use of blocking socket and some chatty protocol that requires a client or server to wait for an acknowledgement before sending a new chunk of data. The wait time, which is also called as latency, could be starting from a few tenths for a local area network (LAN) to hundreds of milliseconds for a wide area network (WAN). Large wait times can significantly degrade the quality of an application.

Fortunately, UDAParts has developed a powerful and secure communication framework named as SocketPro, which is written with continuous inline request/result batching and real-time stream processing capabilities by use of asynchronous data transferring and parallel computation for the best network efficiency, development simplicity, performance, scalability, and many great and even unique features at the site (https://github.com/udaparts/socketpro).

Further, UDAParts has applied the powerful SocketPro framework onto a number of popular databases such as SQLite, MySQL and MS SQL Server as well as others through ODBC drivers to support continuous SQL-stream sending and processing. Additionally, most of these components for databases are totally free forever to the public. For reduction of learning complexity, I recommend you study the SQL-stream sample for SQLite (Part 1: SQLite) first before playing these MySQL sample projects as SQLite and MySQL samples share the same client API functions.

MySQL is currently the most popular open-source client-server distributed database management system. After studying MySQL server plugin features, UDAParts has applied SocketPro SQL-stream technology onto MySQL, and developed a plug-in to support continuous SQL statements sending and processing at server side for the best performance and scalability. Further, UDAParts has compared SQL-stream technology with MySQL Connector/Net in performance. Our performance study shows that SQL-stream technology can be up to one thousand times faster than MySQL Connector/Net on WAN.

Source Codes and Samples

All related source codes and samples are located at https://github.com/udaparts/socketpro. After cloning it into your computer by GIT, pay attention to the subdirectory mysql inside the directory socketpro/stream_sql. SocketPro MySQL server plugin source code is located at the directory socketpro/stream_sql/smysql. Further, you can see these samples are created from .NET, C/C++, Java and Python development environments. However, we use C# code (socketpro/stream_sql/mysql/test_csahrp) for client development at this article for explanation.

In addition to the above samples, you can find performance study samples by use of MySQL sample database sakila at the directory socketpro/stream_sql/mysql/DBPerf. The sub directory contains three performance study projects, cppperf, netperf and mysqlperf, which are written with C++/SocketPro SQL streaming, .NET/SocketPro SQL streaming, and ADO.NET provider technologies, respectively.

Further, SocketPro MySQL server plugin supports data table update events (DELETE, INSERT and UPDATE) through triggers. You can use this feature to push update events of selected tables onto clients. The sample project is located in the directory socketpro/stream_sql/mysql/test_cache.

You should distribute system libraries inside the directory of socketpro/bin into your system directory before running these sample applications.

With regards to SocketPro communication framework, you may also refer to its development guide documentation at socketpro/doc/SocketPro development guide.pdf.

Register SocketPro MySQL SQL-streaming Plugin and its Configuration Database

As described in this site, register MySQL SQL-stream plugin by calling statement INSTALL PLUGIN UDAParts_SQL_Streaming SONAME ‘libsmysql.so’ from the application mysql. If successful, you should see a new database sp_streaming_db created as shown in the below Figure 1.

MySQL SQL stream configuration database

Figure 1: SocketPro SQL-streaming configuration database sp_streaming_db and table config

The configuration database has three simple tables, config, service and permission as shown in the above Figure 1. It is expected that SocketPro MySQL SQL-streaming plugin supports industrial security standard SSL3/TLSv1.x to secure communication between client and server. By default, a SocketPro client can use either IP v4 or v6 to access MySQL database at port number 20902. Pay attention to the record cached_tables. If you set its value properly, all connected SocketPro clients can see data changes within these tables (for example, table actor, country, category and language within database sakila) in real time. Referring to the sample test_cache at directory socketpro/stream_sql/mysql, you can use the real-time cache feature to improve your middle tier performance and scalability by reducing data trips between middle tier and database.

One SocketPro server is capable of supporting many services at the same time by use of one TCP port. If you like, you can enable websocket from SocketPro MySQL SQL-streaming plugin by setting value to ‘1’ for record enable_http_websocket. Further, you can also embed other services by setting value properly of record services as shown in the above Figure 1. Once changing any one or more values within the table config, you should restart MySQL. Otherwise, the changes will not function correctly.

In regards to the table permission, SocketPro MySQL SQL-streaming technology uses its records to authenticate clients for embedded services as shown in the following Figure 2. MySQL SQL-streaming plugin uses the two tables mysql.user and sp_streaming_db.permission to authenticate all clients for all services. However, its SQL-streaming service does not use records within the table sp_streaming_db.permission for authentication.

MySQL SQL stream permission table

Figure 2: Three users (root, user_one and user_two) allowed for SocketPro asynchronous persistent message queue service (service id=257)

Under most cases, you are not required to touch the table service. It is noted that SocketPro MySQL server plugin supports MySQL server version 8.0.11 or later.

Main Function

SocketPro is written from bottom to support parallel computation by use of one or more pools of non-blocking sockets. Each of the pools may be made of one or more threads, and each of the threads hosts one or more non-blocking sockets at client side. However, we just use one pool for clear demonstration with this sample at client side as shown in the below code snippet 1.

static void Main(string[] args)
{
    Console.WriteLine("Remote host: ");
    string host = Console.ReadLine();
    CConnectionContext cc = new CConnectionContext(host, 20902, "root", "Smash123");
    using (CSocketPool<cmysql> spMysql = new CSocketPool<cmysql>()) {
        //start one socket pool having 1 worker thread hosting 1 non-block socket
        if (!spMysql.StartSocketPool(cc, 1, 1)) {
            Console.WriteLine("Failed in connecting to remote async mysql server");
            Console.WriteLine("Press any key to close the application ......");
            Console.Read();
            return;
        }
        CMysql mysql = spMysql.Seek(); //get an async handler

        //start to stream all types of requests including SQL statements
        bool ok = mysql.Open("", dr); //open a default MySQL database

        //create a container to receive all queries data
        List<KeyValuePair<CDBColumnInfoArray, CDBVariantArray>> ra = 
                       new List<KeyValuePair<CDBColumnInfoArray, CDBVariantArray>>();

        CMysql.DRows r = (handler, rowData) => {
            //rowset data come here
            int last = ra.Count - 1;
            KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item = ra[last];
            item.Value.AddRange(rowData); //populate record data into receiving container ra
        };

        CMysql.DRowsetHeader rh = (handler) => {
            //rowset header comes here
            KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item =  
                       new KeyValuePair<CDBColumnInfoArray, CDBVariantArray>
                                    (handler.ColumnInfo, new CDBVariantArray());
            ra.Add(item); //populate query column meta into receiving container ra
        };
            
        TestCreateTables(mysql); //there are 2 DDL requests inside the call
        ok = mysql.Execute("delete from employee;delete from company", er);
        TestPreparedStatements(mysql); //there are 2 requests (1 prepare + 
                                       //1 parameterized statements) inside the call
        InsertBLOBByPreparedStatement(mysql); //there are 2 requests (1 prepare + 
                                              //1 parameterized statements) inside the call
        ok = mysql.Execute("SELECT * from company;select * from employee;select curtime()", er, r, rh);
        CDBVariantArray vPData = new CDBVariantArray();
        //first set
        vPData.Add(1); //input
        vPData.Add(1.4); //inputoutput
        vPData.Add(0); //output

        //second set
        vPData.Add(2); //input
        vPData.Add(2.5); //inputoutput
        vPData.Add(0); //output

        //Test MySQL stored procedure
        TestStoredProcedure(mysql, ra, vPData); //there are 2 requests (1 prepare + 
                                                //1 parameterized statements) inside the call

        //end streaming all types of requests

        ok = mysql.WaitAll(); //make sure all streamed requests are processed and returned

        Console.WriteLine();
        Console.WriteLine("There are {0} output data returned", mysql.Outputs * 2);

        int index = 0;
        Console.WriteLine();
        Console.WriteLine("+++++ Start rowsets +++");
        foreach (KeyValuePair<CDBColumnInfoArray, CDBVariantArray> it in ra) {
            Console.Write("Statement index = {0}", index);
            if (it.Key.Count > 0)
                Console.WriteLine(", rowset with columns = {0}, records = {1}.", 
                                  it.Key.Count, it.Value.Count / it.Key.Count);
            else
                Console.WriteLine(", no rowset received.");
            ++index;
        }
        Console.WriteLine("+++++ End rowsets +++");
        Console.WriteLine();
        Console.WriteLine("Press any key to close the application ......");
        Console.Read();
    }
}
Code snippet 1: Main function for demonstration of SocketPro MySQL SQL-stream system at client side

Starting one socket pool: The above code snippet 1 starts one socket pool which only has one worker thread that only hosts one non-blocking socket for demonstration clarity by use of one instance of connection context. However, it is noted that you can create multiple pools within one client application if necessary. Afterwards, we get one asynchronous MySQL handler.

Opening database: We can send a request for opening a MySQL server database. If the first input is an empty or null string as shown in this example, we are opening one default database for a connected user, for example. If you like to open a specified database, you can simply give a non-empty valid database name string. In addition, you need to set a callback or Lambda expression for tracking returning error message from server side if you like. It is noted that SocketPro supports only asynchronous data transferring between client and server so that a request could be inputted with one or more callbacks for processing returning data. This is completely different from synchronous data transferring. In addition, we create an instance of container ra that is used to receive all sets of records in coming queries.

Streaming SQL statements: Keep in mind that SocketPro supports streaming all types of any number of requests on one non-blocking socket session effortlessly by design. Certainly, we can easily stream all SQL statements as well as others. All SocketPro SQL-stream services support this unique feature for the best network efficiency, which will significantly improve data accessing performance. As far as we know, you cannot find such a wonderful feature from other technologies. If you find one, please let us know. Like normal database accessing APIs, SocketPro SQL-stream technology supports manual transaction too as shown in the previous article.

Waiting until all processed: Since SocketPro only supports asynchronous data transferring, SocketPro must have a way to wait until all requests and returning results are sent, processed and returned. SocketPro does come with this method WaitAll at client side to serve this purpose. If you like, you can use this method to convert all asynchronous requests into synchronous ones.

TestCreateTables, TestPreparedStatements and InsertBLOBByPreparedStatement

The above code snippet 1 has the three function calls, TestCreateTables, TestPreparedStatements and InsertBLOBByPreparedStatement, but we don't want to re-explain them again because they are truly the same as ones in the previous article. Let’s focus executing MySQL stored procedures with input-output and output parameters.

TestStoredProcedure

MySQL fully supports stored procedures. SocketPro SQL-stream technology does too. Further, SocketPro SQL-stream technology supports executing multiple sets of MySQL stored procedures with input-output and output parameters in one call as shown in the above code snippet 1. The below code snippet 2 shows how to call a MySQL stored procedure which may have input, input/output and output parameters and return multiple sets of records.

static void TestStoredProcedure(CMysql mysql, List<KeyValuePair<CDBColumnInfoArray, 
                                CDBVariantArray>> ra, CDBVariantArray vPData) {
    bool ok = mysql.Prepare("call sp_TestProc(?,?,?)", dr);
    CMysql.DRows r = (handler, rowData) => {
        //rowset data come here if available
        int last = ra.Count - 1;
        KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item = ra[last];
        item.Value.AddRange(rowData); //populate record data into receiving container ra
    };
    CMysql.DRowsetHeader rh = (handler) => {
        //rowset header comes here if available
        KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item = 
        new KeyValuePair<CDBColumnInfoArray, CDBVariantArray>(handler.ColumnInfo, new CDBVariantArray());
        ra.Add(item); //populate query column meta into receiving container ra
    };
    ok = mysql.Execute(vPData, er, r, rh);
}
Code snippet 2: Call MySQL stored procedure which returns multiple sets of records and output parameters

It is very simple to call stored procedure through SocketPro SQL-stream technology at end as shown in the above code snippet 2. It is noted that all output parameter data will be directly copied into the passing parameter data array vPData. The callback rh is called when record set meta data comes if available. Whenever an array of record data comes, the callback r will be called. You can populate all queried meta and record data into an arbitrary container like ra, for example, from the two callbacks.

Performance Study

SocketPro SQL-stream technology has excellent performance in database data accessing for both query and update. You can see two MySQL performance test projects (cppperf and netperf) available at socketpro/stream_sql/mysql/DBPerf/. The first sample is written by C++ and the other by C#. A sample project mysqlperf writtern from C# is provided for you to compare SocketPro SQL-stream technology with MySQL .NET provider in performance.

See the performance study data of the below Figure 3, which is obtained from three cheap Google cloud virtual machines with solid state drive for free evaluation. All data are times required in millisecond for executing 10,000 queries and 50,000 inserts. The performance study is also focused on influence of network latency on MySQL accessing speed.

Figure 3: MySQL streaming performance study data of SocketPro SQL-stream technology on three cheap Google cloud virtual machines

Our performance study shows that it is easy to get query executed at the speed of 6,500 (10,000/1.54) times per second and socket connection. For inserting records, you can easily get the speed like 43,000 (50,000/1.17) inserts per second for MySQL on local area network (LAN, cross-machine, 0.2 ms/2.0 Gbps). On LAN, SocketPro streaming could improve 150% in performance over traditional non-streaming approach (SocketPro + Sync) for query. For SQL inserts, the improvement would be more than seven times (10,400/1,170 = 8.9). SocketPro streaming and in-line batching features make network efficiency superiorly high, which leads to the significantly improvement in comparison to existing MySQL socket communication approach.

Let’s consider wide area network (WAN, cross-region, 34 ms/40 Mbps). SocketPro SQL streaming query speed could be 5,000 (10,000/2.00) times per second and socket connection. For inserting records, the speed could easily be 17,600 records (50,000/2.84) per second. Contrarily, the query speed will be as low as 30 queries per second on WAN if a client uses traditional communication way (SocketPro+Sync/MySQL.NET Provider) for database accessing because of high latency as shown in the above Figure 8. SocketPro SQL streaming can be more than 170 (349000/2000 = 174.5) times in query faster than non-streaming technology, assuming database backend processing time is ignorable on high latency WAN (cross-region). If we consider SQL inserts, the improvement could be over 600 times (1,726,000/2840 = 607).

After analyzing the performance data in Figure 3, you will find SocketPro streaming technology is truly great for speeding up not only local but also remoting database accessing. Second, performance data for WAN would be much better if the test WAN have better network bandwidth. Further, SocketPro supports inline compression but this test study doesn’t use it. If SocketPro inline compression feature is employed, its streaming test data will be further improved on WAN. At last, the performance study is completed on cheap virtual machines with one or two CPUs only. The performance data will be better if dedicated machines are used for testing.

Executing SQLs in Parallel with Fault Auto Recovery

Parallel computation: After studying the previous two simple examples, it is time to study the coming third sample at the directory socketpro/samples/auto_recovery/(test_cplusplus|test_java|test_python|test_sharp). SocketPro is created from the bottom to support parallel computation. You can distribute multiple SQL statements onto different backend databases for processing concurrently. This feature is designed for improvement of application scalability as shown at the below code snippet 3.

using System;
using SocketProAdapter;
using SocketProAdapter.ClientSide;
class Program {
    static void Main(string[] args) {
        const int sessions_per_host = 2;
        string[] vHost = { "localhost", "192.168.2.172" };
        const int cycles = 10000;
        using (CSocketPool<CMysql> sp = new CSocketPool<CMysql>()) {
            //set a local message queue to backup requests for auto fault recovery
            sp.QueueName = "ar_sharp";
            
            //one thread enough
            CConnectionContext[,] ppCc = new CConnectionContext[1, vHost.Length * sessions_per_host];
            for (int n = 0; n < vHost.Length; ++n) {
                for (int j = 0; j < sessions_per_host; ++j) {
                    ppCc[0, n * sessions_per_host + j] = 
                         new CConnectionContext(vHost[n], 20902, "root", "Smash123");
                }
            }
            bool ok = sp.StartSocketPool(ppCc);
            if (!ok) {
                Console.WriteLine("No connection and press any key to close the application ......");
                Console.Read(); return;
            }
            string sql = "SELECT max(amount), min(amount), avg(amount) FROM payment";
            Console.WriteLine("Input a filter for payment_id"); string filter = Console.ReadLine();
            if (filter.Length > 0) sql += (" WHERE " + filter); var v = sp.AsyncHandlers;
            foreach (var h in v) {
                ok = h.Open("sakila", (hsqlite, res, errMsg) => {
                    if (res != 0) Console.WriteLine("Error code: {0}, error message: {1}", res, errMsg);
                });
            }
            int returned = 0;
            double dmax = 0.0, dmin = 0.0, davg = 0.0;
            SocketProAdapter.UDB.CDBVariantArray row = new SocketProAdapter.UDB.CDBVariantArray();
            CAsyncDBHandler.DExecuteResult er = (h, res, errMsg, affected, fail_ok, lastId) => {
                if (res != 0)
                    Console.WriteLine("Error code: {0}, error message: {1}", res, errMsg);
                else {
                    dmax += double.Parse(row[0].ToString());
                    dmin += double.Parse(row[1].ToString());
                    davg += double.Parse(row[2].ToString());
                }
                ++returned;
            };
            CAsyncDBHandler.DRows r = (h, vData) => {
                row.Clear(); row.AddRange(vData);
            };
            CMysql mysql = sp.SeekByQueue(); //get one handler for querying one record
            ok = mysql.Execute(sql, er, r);
            ok = mysql.WaitAll();
            Console.WriteLine("Result: max = {0}, min = {1}, avg = {2}", dmax, dmin, davg);
            returned = 0; dmax = 0.0; dmin = 0.0; davg = 0.0;
            Console.WriteLine("Going to get {0} queries for max, min and avg", cycles);
            for (int n = 0; n < cycles; ++n) {
                mysql = sp.SeekByQueue();
                ok = mysql.Execute(sql, er, r);
            }
            foreach (var h in v) {
                ok = h.WaitAll();
            }
            Console.WriteLine("Returned = {0}, max = {1}, min = {2}, avg = {3}", 
                               returned, dmax, dmin, davg);
            Console.WriteLine("Press any key to close the application ......"); Console.Read();
        }
    }
}
Code snippet 3: Demonstration of SocketPro parallel computation and fault auto recovery features

As shown in the above code snippet 3, we could start multiple non-blocking sockets to different machines (localhost, 192.168.2.172), and each of the two database machines has two sockets connected. The code opens a default database sakila for each of connections (foreach (var h in v) {......}). First, the code executes one query ‘SELECT max(amount), min(amount), avg(amount) FROM payment …’ for one record. At last, the code sends the query 10,000 times onto the two machines for parallel processing (for (int n = 0; n < cycles; ++n) {......}). Each of the records will be summed inside a Lambda expression (CAsyncDBHandler.DExecuteResult er = (h, res, errMsg, affected, fail_ok, lastId) => {......};) as a callback for method Execute. It is noted that you can create multiple pools for different services hosted on different machines. As you can see, SocketPro socket pool can be used to significantly improve application scalability.

Auto fault recovery: SocketPro can open a file locally, and save all request data into it before sending these requests onto a server through network. The file is called local message queue or client message queue. The idea is simple to back up all requests for automatic fault recovery. To use this feature, you must set a local message queue name (sp.QueueName = "ar_sharp";) as shown in the above code snippet 3. When we develop a real application, it is very common to write lots of code to deal with various communication errors properly. In fact, it is usually a challenge to software developers. SocketPro client message queue makes communication error handling very simple. Suppose the machine 192.168.2.172 is not accessible for one of whatever reasons like machine power off, unhandled exception, software/hardware maintenance and network unplug, and so on, the socket close event will be notified either immediately or sometime later. Once the socket pool finds a socket is closed, SocketPro will automatically merge all requests associated with the socket connection onto another socket which is not closed yet for processing.

To verify this feature, you can brutally down one of MySQL servers during executing the above queries, and see if the final results are correct.

It is noted that UDAParts has applied this feature to all SocketPro SQL-stream services, asynchronous persistent message queue service and remote file exchange service to simplify your development.

Points of interest

At last, SocketPro MySQL SQL-stream plugin doesn’t support cursors at all, but it does provide all required basic client/server database features. Further, the SQL-stream plugin does have the following unique features.

  1. Continuous inline request/result batching and real-time SQL-stream processing for the best network efficiency especially on WAN
  2. Bi-directional asynchronous data transferring between client and server by default, but all asynchronous requests can be converted into synchronous ones if required
  3. Superior performance and scalability because of powerful SocketPro communication architecture
  4. Real-time cache for table update, insert and delete as shown at the sample project test_cache at the directory socketpro/stream_sql/mysql/test_cache
  5. All requests are cancelable by executing the method Cancel of class CClientSocket at client side
  6. Both windows and Linux are supported
  7. Simple development for all supported development languages
  8. Both client and server components are thread-safe. They can be easily reused within your multi-threaded applications with much fewer thread related issues
  9. All requests can be backed up at client side and resent to another server automatically for processing in case a server is down for anyone of reasons -- fault auto recovery

History

  • 09/20/2017 ==> Initial release
  • 02/28/2018 ==> Add two new sections, Performance study and Executing SQLs in parallel with fault auto recovery
  • 05/27/2018 ==> Update MySQL server SQL-streaming plugin to support MySQL version 8.0.11 or later

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Yuancai (Charlie) Ye
Software Developer (Senior)
United States United States
Yuancai (Charlie) Ye, an experienced C/C++ software engineer, lives in Atlanta, Georgia. He is an expert at continuous inline request/result batching, real-time stream processing, asynchronous data transferring and parallel computation for the best communication throughput and latency. He has been working at SocketPro (https://github.com/udaparts/socketpro) for more than fifteen years.

You may also be interested in...

Pro

Comments and Discussions

 
-- There are no messages in this forum --
Permalink | Advertise | Privacy | Cookies | Terms of Use | Mobile
Web06-2016 | 2.8.180920.1 | Last Updated 27 May 2018
Article Copyright 2017 by Yuancai (Charlie) Ye
Everything else Copyright © CodeProject, 1999-2018
Layout: fixed | fluid