Click here to Skip to main content
11,717,126 members (82,757 online)
Click here to Skip to main content

Dequeue workitems using MERGE

, 25 Feb 2014 CPOL 4.5K 5
Rate this:
Please Sign up or sign in to vote.
This article demonstrates how to safely dequeue items from a shared table

Introduction

In response to my previous article, a reader posted an interesting challenge: "could this technique be utilized, as means to thread-safely de-queue items from a shared table".

Background

The SQL MERGE command is a powerful tool in DBAs and Architects hands. It allows us to perform operations in one statement, which would require careful planning and synchronization otherwise. If I understood the reader correctly, the challenge he was faced with was, how to perform record updates and deletes from multiple threads and multiple servers, without violating thread-safety and processing same work item by multiple servers.

Using the code

As usual, we will start with the sample code first:

using System;
using System.Collections.Concurrent;
using System.Data.SqlClient;
using System.Reactive.Linq;
using System.Text;
using System.Xml.Linq;
 
namespace QueueWithMerge
{
    internal class Program
    {
        private static readonly ObjectPool<SqlConnection> ConnectionPool = new ObjectPool<SqlConnection>(() =>
        {
            var c = new SqlConnection(Scsb.ConnectionString);
            c.Open();
            return c;
        });
 
        private static readonly SqlConnectionStringBuilder Scsb = new SqlConnectionStringBuilder
        {
            DataSource = "localhost",
            InitialCatalog = "Scratch",
            IntegratedSecurity = true
        };
 
        private static void Main(string[] args)
        {
            IDisposable generator = Observable.Interval(TimeSpan.FromMilliseconds(50)).Take(10).Subscribe(i =>
            {
                SqlConnection c = ConnectionPool.GetObject();
                try
                {
                    var xdoc =
                        new XDocument(new XElement("Root", new XElement("Id", i),
                            new XElement("Guid", Guid.NewGuid().ToString())));
                    using (var command = new SqlCommand("INSERT INTO [dbo].[Queue] ([Payload]) VALUES (@payload)", c))
                    {
                        command.Parameters.AddWithValue("@payload",
                            Encoding.Unicode.GetBytes(xdoc.ToString(SaveOptions.DisableFormatting)));
                        command.ExecuteNonQuery();
                    }
                    ConnectionPool.PutObject(c);
                }
                catch (Exception ex)
                {
                    c.Dispose();
                    Console.WriteLine(ex.Message);
                }
            });
 
            const string consumerSql = @"MERGE [Scratch].[dbo].[Queue] WITH (TABLOCKX) as target
USING (SELECT TOP 1 Id, AgentId FROM [Scratch].[dbo].[Queue]) as source (Id, AgentId)
ON (target.Id = source.Id)
WHEN MATCHED AND target.AgentId IS NOT NULL
THEN DELETE
WHEN MATCHED 
THEN UPDATE SET target.AgentId = @agentId
OUTPUT $action, Inserted.Id, Inserted.AgentId, Inserted.Payload, Deleted.Id, Deleted.AgentId;
";
 
            Action<int> consumer = i =>
            {
                SqlConnection c = ConnectionPool.GetObject();
                try
                {
                    using (var command = new SqlCommand(consumerSql, c))
                    {
                        command.Parameters.AddWithValue("@agentId", i);
                        using (SqlDataReader reader = command.ExecuteReader())
                        {
                            if (reader.HasRows)
                                while (reader.Read())
                                {
                                    Console.WriteLine("Processing {0} Action {1} Agent {2}",
                                        reader.IsDBNull(1) ? reader.GetInt64(4) : reader.GetInt64(1),
                                        reader.GetString(0),
                                        reader.IsDBNull(5) ? "" : reader.GetInt32(5).ToString());
                                }
                        }
                    }
                    ConnectionPool.PutObject(c);
                }
                catch (Exception ex)
                {
                    c.Dispose();
                    Console.WriteLine(ex.Message);
                }
            };
 
            IDisposable con1 = Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(i => consumer.Invoke(1));
            IDisposable con2 = Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(i => consumer.Invoke(2));
            IDisposable con3 = Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(i => consumer.Invoke(3));
 
            Console.ReadKey();
        }
    }
 
}

I am using a few helper classes here, to arrive at the simulated environment faster. First, using ObjectPool<T> from Parallel Extensions Extras, I create as many database connections as needed by the multiple threads.

An object pool is a mechanism/pattern to avoid the repeated creation and destruction of objects.  When code is done with an object, rather than allowing it to be garbage collected (and finalized if it’s finalizable), you put the object back into a special collection known as an object pool.  Then, when you need an object, rather than always creating one, you ask the pool for one: if it has one, it gives it to you, otherwise it creates one and gives it to you.  In many situations where creation and destruction is expensive, and where many objects are needed but where only a few at a time are needed, this can result in significant performance gains.

Object pools are just as relevant in multi-threaded scenarios as they are in single-threaded scenarios, but of course when dealing with multiple threads, you need to synchronize correctly (unless a separate pool is maintained per thread, in which case you’re trading synchronization cost for potentially creating more objects than you otherwise would).  ParallelExtensionsExtras contains a simple ObjectPool<T> implementation in the ObjectPool.cs file, built on top of IProducerConsumerCollection<T>. 

Then with the help of Reactive Extensions I create a generator, which immediately starts queuing new records into the shared table. This simulates work requests being submitted by multiple distributed clients. This sample code is limited to just 10 request. The generator simply fires an event based on an interval, in this case, every 50 milliseconds.  

If anything goes wrong with creating the new records, I dispose of the database connection, instead of returning it back to the pool. This allows me not to have to deal with connections in a failed state.

The definition of consumer Action<int> allows me to reuse the same code for three separate consumers of data provided by the generator. Each consumer is started with its own unique id. Here again I use the Reactive Extensions Observable from Interval.

I hope by now you start realizing how adding a readily available library, simplifies our day-to-day. Steps and subroutines which used to take us days to write, now became one-liners. I could fill an entire book of all the uses for Rx (Reactive Extensions). Instead here is one from Microsoft you can download for free.  

The actual magic is performed by the SQL MERGE statement:

Performs insert, update, or delete operations on a target table based on the results of a join with a source table. For example, you can synchronize two tables by inserting, updating, or deleting rows in one table based on differences found in the other table. 

MERGE [Scratch].[dbo].[Queue] WITH (TABLOCKX) as target
USING (SELECT TOP 1 Id, AgentId FROM [Scratch].[dbo].[Queue]) as source (Id, AgentId)
ON (target.Id = source.Id)
WHEN MATCHED AND target.AgentId IS NOT NULL
THEN DELETE
WHEN MATCHED
THEN UPDATE SET target.AgentId = @agentId
OUTPUT $action, Inserted.Id, Inserted.AgentId, Inserted.Payload, Deleted.Id, Deleted.AgentId; 

After defining our source (single record) and target table, the logic can be translated as follows:

  • Select a single record from all available records as source 
  • If there is a record in target table, matched by record Id (primary key), where the agent has already been assigned (agent id IS NOT NULL), delete that record. In other words, this record has been already processed and needs to be removed.
  • Otherwise, assign the processing agent's Id, by the way of @agentId parameter.
  • In either case return rows updated (Inserted) or deleted (Deleted) as well as the action performed ($action) as a result set.

The C# consumer simply retrieves the values returned and is free to perform any operations on them, which is beyond the scope of this article. The process continues until all records have been removed from the table. 

Initially I was using the TABLOCK hint to lock the table during the UPDATE/DELETE operation, but through trial and error I have found a more exclusive TABLOCKX to be more predictable, with a very minor performance penalty, which was only observable in an extremely high throughput scenario.  

TABLOCK 

Specifies that the acquired lock is applied at the table level. The type of lock that is acquired depends on the statement being executed. For example, a SELECT statement may acquire a shared lock. By specifying TABLOCK, the shared lock is applied to the entire table instead of at the row or page level. If HOLDLOCK is also specified, the table lock is held until the end of the transaction.

TABLOCKX 

Specifies that an exclusive lock is taken on the table. 

In other words, TABLOCKX will lock the table exclusively for reads and updates, which is execatly what we need in this case, since a third of the MERGE operation is record read, while the rest is record updates. 

Here is the DDL you will need to create the sample queue table, but this technique should work for your implementation as well. 

USE [Scratch]
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
SET ANSI_PADDING ON
GO
CREATE TABLE [dbo].[Queue](
    [Id] [bigint] IDENTITY(1,1) NOT NULL,
    [Payload] [varbinary](max) NOT NULL,
    [CreatedOn] [datetime] NOT NULL,
    [AgentId] [int] NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
GO
SET ANSI_PADDING OFF
GO
ALTER TABLE [dbo].[Queue] ADD  CONSTRAINT [DF_Queue_CreatedOn]  DEFAULT (sysdatetime()) FOR [CreatedOn]
GO 

I truly enjoyed responding to this challenge. If you think MERGE could be used for other synchronized tasks, drop me a note.  

Until the next time ...

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Darek Danielewski
Architect BI Software, Inc.
United States United States
A seasoned IT Professional. Programming and data processing artist. Contributor to StackOverflow.

You may also be interested in...

Comments and Discussions

 
-- There are no messages in this forum --
| Advertise | Privacy | Terms of Use | Mobile
Web03 | 2.8.150901.1 | Last Updated 25 Feb 2014
Article Copyright 2014 by Darek Danielewski
Everything else Copyright © CodeProject, 1999-2015
Layout: fixed | fluid