Click here to Skip to main content
15,867,594 members
Articles / Programming Languages / C++

Real-time Feed Distribution using Circular Shared Queue

Rate me:
Please Sign up or sign in to vote.
4.95/5 (40 votes)
22 Jan 2015CPOL10 min read 79.3K   1.9K   116   19
This article demonstrates how to forward real-time feed to multiple clients with different bandwidths.

Contents

Introduction

Real-time feed is a continuous flow of data that carries instantaneous information. That feed can be associated to one or several clients, and can be associated as a whole to all clients, or each client has its own parts (packets) of feed. We can put them in points to clarify our scope:

  1. Feed is distributed to one client.
  2. Feed is distributed to all clients.
    1. Feed is distributed between clients (each one takes his belonging data).
    2. The whole feed is distributed to all clients.

First one is the simplest case as we have only one client so we can store arrived data in a single queue, and manage it with simple queue management technique. Second one is our interest; the two sections (2-1, 2-2) are so different in handling. Number (2-1) means that the sender of the feed must include destination id in each sent packet. A good example for that section is network router. Network routers receive packets from senders and route them to the right destination. Section (2-2) means the feed is forwarded to all clients, so no need for packets to include destination id. Section (2-2) is the scope of this article. The final goal of the article is to distribute real-time feed between multiple clients with different bandwidths as in the following figure:

Image 1

Feed Distribution

Real time stream distribution is a critical issue that needs a perfect data structure to store and distribute real time feed efficiently. The system receives real-time feed and forwards it to all clients as in the following figure:

Image 2

Actually, most systems don't forward feed directly to clients. They have two options:

  1. Store it in a temporary storage then forward it to all clients.

    Image 3

  2. Process feed (transfer-compress-encrypt) then store results in a temporary storage, then send results to all clients.
    Image 4

There are many reasons for saving the feed or its processing results in a temporary storage before sending to clients:

  1. First, to isolate the receiving operation from the sending or processing operations.
  2. Second, to enable sending data in different threads so as to balance send operation depending on clients' bandwidths.

So, we need to decide what is the most suitable data structure to store the feed? And how can we send to clients of different bandwidths? To answer this question needs to assign our goals from this storage:

  • Fast data insertion and removal.
  • Receiving data doesn't be blocked while sending data from the same storage for each client.
  • Each Client has different offset in the storage depending on its bandwidth.

Circular Shared Queue

The best storage to achieve these goals is the pipe or the queue. Feed is received from one side and is sent from the other side; First In First Out (FIFO). A queue is a collection of entities linked in order. New entity is added to the tail of the queue and old entity is removed from the head terminal. So, queue is just a FIFO (First In First Out) data structure. The first added entity is the first manipulated one. Any Queue should keep its tail and head entities to handle addition and removal of them. Each entity in the queue points to the next entity.

Image 5

So, the choice is suitable for our needs because:

  • Insertion and removal is so fast with linked list structure.
  • We isolate receiving data from sending it in a separate threads, because both are working on the tail node.
  • The owner of the Shared Queue should keep an object for each client. This object is to keep a pointer to client node on the queue and the offset in that node.
  • So, we can manage the sending operation in many threads. Each thread sends to available clients. Each client object should pass its working node pointer and send offset to the Queue to send to each client depending on its location in the whole queue.
  • Previous point is the key for the whole article as it is the solution for slow (low bandwidth) clients. As a result, fast clients are always working in the tail node and their send offset are synchronized with the "receive offset" of the tail node of the queue.

Image 6

Shared Queue functions:

IsEmpty() Checks if the queue is empty
Enqueue() Adds new node at queue tail if needed
Dequeue() Gets node to send its data
Recv() Receives new arrived data to tail node
Send() Sends data to one client
MaxQueueLength Max queue length
Head Points to head node of the queue
Tail Points to tail node of the queue

In our case, we need some special cases to fulfill our need:

  • In real situation clients, speeds are not equal. Clients have different bandwidths. Bandwidth affects the quantity that each client can receive instantaneously. Fast Clients are always ready to receive data. So, slow clients may delay the removal of head entity of the queue, in addition, the whole sending process. The distributor agent must handle this case in some way to make sure fast clients receive real time data on time, and slow clients receive real time data as bandwidth allows.
  • Each queue node contains buffer of 32 K bytes to store received feed. So, the Enqueue() function doesn't add new node unless tail node receives its full 32 K bytes, then it adds a new node and adjusts the new tail.
    Image 7
    C++
    #define MAX_QUEUE_LENGTH	8
    #define MAX_NODE_LENGTH		32768
    
    // queue linked list node
    struct QNode {
    	QNode(int nSequence, int nLength) {
    		m_pBuffer = (char*)malloc(nLength);
    		Init(nSequence);
    	}
    	void Init(int nSequence) {
    		m_nTickCount = ::GetTickCount();
    		m_nSequence = nSequence;
    		m_nRecvOffset = 0;
    		m_pNext = NULL;
    	}
    	~QNode() {
    		free(m_pBuffer);
    	}
    public:
    	// node creation sequence in the queue
    	int m_nSequence;
    	// offset of received data in node buffer
    	int m_nRecvOffset;
    	// pointer to the next node in the queue
    	QNode* m_pNext;
    	// buffer of the node
    	char* m_pBuffer;
    	// creation tick count
    	unsigned long m_nTickCount;
    };

Enqueue

The standard usage of the function Enqueue is to add new node at queue tail, but in our queue it does only that if the tail node is full of data. In other words, the Queue uses the Enqueue function to get the working (not full) tail node. It also adds received buffer to queue node and increments "receive offset" (m_nRecvOffset) with amount of received data. The Enqueue function checks for tail "receive offset" to decide if it is full or not as in the following flow-chart figure:

Image 8

C++
//************************************
// Method:    Enqueue
// Access:    protected 
// Returns:   QNode*
// Purpose:   add new node at queue tail if needed
//************************************
QNode* Enqueue() {
	QNode* pQE;
	::EnterCriticalSection(&m_cs);
	if(m_pTail == NULL)
		// initialize first node in the list
		pQE = m_pHead = m_pTail = new QNode(0, m_nNodeLength);
	// check if last received node reached its maximum length
	else	if(m_pTail->m_nRecvOffset >= m_nNodeLength) {
		...
		// add new node to the list and let last node points to it
		pQE = m_pTail->m_pNext = new QNode(m_pTail->m_nSequence + 1, m_nNodeLength);
		// let Tail points to the new node
		m_pTail = pQE;
	}
	else 
		// in the middle of the node
		pQE = m_pTail;
	::LeaveCriticalSection(&m_cs);
	return pQE;
}

We can modify the Enqueue function to work as a circular queue.

Circular Queue

In circular queue the last node is connected back to the first node to make a circle, which is done each time I need to add a new node to receive more feed. So, Hear and Tail nodes are moved one node each time I need a new node, and the length between them is kept fixed, which represents queue length.
Image 9
Remember, Receiving data is done from tail side. Also, for real-time clients, send is done with client node pointer which is always the tail node for normal and fast clients. For slow non real-time clients, send starts from Head node and goes fast to tail node to work like real-time clients. Check Enqueue code that represents circular queue movements:

C++
//************************************
// Method:    Enqueue
// Access:    protected 
// Returns:   QNode*
// Purpose:   add new node at queue tail if needed
//************************************
QNode* Enqueue() {
	QNode* pQE;
	::EnterCriticalSection(&m_cs);
	if(m_pTail == NULL)
		// initialize first node in the list
		pQE = m_pHead = m_pTail = new QNode(0, m_nNodeLength);
	// check if last received node reached its maximum length
	else	if(m_pTail->m_nRecvOffset >= m_nNodeLength) {
		// check if queue reached its maximum length
		if(m_pTail->m_nSequence+1 >= m_nMaxQueueLength) {
			// keep next node
			QNode* pNext = m_pHead->m_pNext;
			// move head node to be new tail
			pQE = m_pTail->m_pNext = m_pHead;
			// initialize node to be reused as a new node
			pQE->Init(m_pTail->m_nSequence + 1);
			// point to next node
			m_pHead = pNext;
		}
		else
			// add new node to the list and let last node points to it
			pQE = m_pTail->m_pNext = new QNode(m_pTail->m_nSequence + 1, m_nNodeLength);
		// let Tail points to the new node
		m_pTail = pQE;
	}
	else 
		// in the middle of the node
		pQE = m_pTail;
	::LeaveCriticalSection(&m_cs);
	return pQE;
}

Dequeue

The queue uses its protected function Dequeue() to retrieve an node to send its data to client. Each client must have a pointer (initialized to NULL) to its working node and the offset in this node. The Dequeue function uses these parameters to adjust next working node and offset as in the following flow-chart. The Dequeue function doesn't remove nodes as in normal queue because shared queue sends data to multiple clients. Hence, all queue nodes is kept until they can be user again in the Enqueue function to implement a circular queue. The last point to mention is the type of the client, if it is a real-time then it should join the queue for the first time at its tail to receive last received data, or if it is not real-time it should join the queue at its head to receive all data.

Image 10

C++
//************************************
// Method:    Dequeue
// Access:    protected 
// Returns:   bool
// Purpose:   get head node to send its data
// Parameter: QNode * & pCurNode
// Parameter: int & nCurNodeSendOffset
//************************************
bool Dequeue(QNode*& pCurNode, int& nCurNodeSendOffset, unsigned int nDelay = 0)
{
	::EnterCriticalSection(&m_cs);
	// pCurNode = NULL for new client
	if(pCurNode == NULL || pCurNode->m_nSequence < m_pHead->m_nSequence)
		// check if client need real time data or all out of date data
		if(m_bRealTime)
			// point to the tail directly
			pCurNode = m_pTail, nCurNodeSendOffset = 
					m_pTail ? m_pTail->m_nRecvOffset : 0;
		else
			// point to the head to get all stored data (first in)
			pCurNode = m_pHead, nCurNodeSendOffset = 0;
	// check if received node reach its storage end
	else	if(nCurNodeSendOffset >= m_nNodeLength && pCurNode->m_pNext)
		// get next node and reset send offset
		pCurNode = pCurNode->m_pNext, nCurNodeSendOffset = 0;
	::LeaveCriticalSection(&m_cs);
	if (nDelay + m_nDelay > 0) {
		unsigned int nCurDelay = (::GetTickCount() - pCurNode->m_nTickCount) / 60000;
		// check if node should be delayed or not
		if (nCurDelay < nDelay + m_nDelay)
			return false;
	}
	// success if a node is found
	return pCurNode != NULL;
}

Recv

Shared queue receives data in two ways.
First is Recv(const char* pBuffer, int nLength), which is a simple call with the buffer to be saved in the queue.
Second is template<class RECEIVER> int <code>Recv(RECEIVER& receiver), which asks class of type RECEIVER to receive data.

Recv Raw Data

This is the first way to receive data. It receives buffer with any length and calls Enqueue function to get the working queue node to keep data. So, in many cases it needs to loop if received length is greater than m_nNodeLength.

Image 11

C++
//************************************
// Method:    Recv
// Access:    public 
// Returns:   int
// Purpose:   ask class of type RECEIVER to receive data in queue nodes
// Parameter: const char * pBuffer
// Parameter: int nLength
//************************************
int Recv(const char* pBuffer, int nLength)
{
	int nRecvLength = 0;
	while(nRecvLength < nLength)
	{
		QNode* pQE = Enqueue();
		if(pQE->m_nRecvOffset < m_nNodeLength)
		{	// receive in last node
			int nRecv = min(m_nNodeLength - pQE->m_nRecvOffset, 
					nLength - nRecvLength);
			memcpy(pQE->m_pBuffer+pQE->m_nRecvOffset, 
					pBuffer+nRecvLength, nRecv);
			// increment node offset with the received bytes
			pQE->m_nRecvOffset += nRecv;
			nRecvLength += nRecv;
		}
	}
	// return total received bytes
	return nRecvLength;
}

Recv Template

This is the second way to receive data. It asks class of type RECEIVER to receive data. It calls Enqueue function to get the working queue node to keep data. This function loops to request data from the RECEIVER Recv function till the enqueued node "receive offset" reaches m_nNodeLength.

Image 12

C++
//************************************
// Method:    Recv
// Access:    public 
// Returns:   int
// Purpose:	  ask class of type RECEIVER to receive data in queue nodes
// Parameter: RECEIVER& receiver
//************************************
template<class RECEIVER> int Recv(RECEIVER& receiver)
{	// get tail node to save received data
	QNode* pQE = Enqueue();
	int nRecvLength = 0, nRecv;
	while(pQE->m_nRecvOffset < m_nNodeLength)
	{	// receive in last node
		if((nRecv = receiver.Recv(pQE->m_pBuffer + 
		pQE->m_nRecvOffset, m_nNodeLength - pQE->m_nRecvOffset)) <= 0)
			return nRecv;
		// increment node offset with the received bytes
		pQE->m_nRecvOffset += nRecv;
		// increment total received bytes
		nRecvLength += nRecv;
	}
	// return total received bytes
	return nRecvLength;
}

Send Template

Although Send template has a simple Flow-Chart and code, it is the key of this article. To send to multiple clients, the owner of the queue should keep the "send node" and the "offset" in that node for each client. The key here is that each client in independent of other clients. Therefore, if the sender is a socket class and has a good connection it will be always synchronized with the receive process. If the sender connection is slow, it may lag, but it will not affect other clients. Client information (SendNode, NodeOffset) are passed to the Send function to be used as in the following Flow-Chart and Code.

Image 13

C++
//************************************
// Method:    Send
// Access:    public 
// Returns:   int
// Purpose:   delegate queue buffer send to a sender of class SENDER
// Parameter: SENDER& sender
//            the class that must implement Send(char*, int) function
// Parameter: QNode * & pCurNode
//            client current node pointer
// Parameter: int & nCurNodeSendOffset
//            current offset in client node
//************************************
template<class SENDER> int Send(SENDER& sender, 
	QNode*& pCurNode, int& nCurNodeSendOffset)
{	// get head node to send its data
	if(m_pHead == NULL || Dequeue(pCurNode, nCurNodeSendOffset) == false)
		// return false as no nodes to send
		return 0;
	// calculate bytes to send
	int nSendBytes = pCurNode->m_nRecvOffset - nCurNodeSendOffset;
	// send bytes to client
	if(nSendBytes <= 0 || (nSendBytes = sender.Send(pCurNode->m_pBuffer + nCurNodeSendOffset, nSendBytes)) <= 0)
		return nSendBytes;
	// increment sent bytes offset
	nCurNodeSendOffset += nSendBytes;
	// return total sent bytes
	return nSendBytes;
}

Usage

You can define any number of queues in your application. The code listed here defines one queue, one thread to receive feed, and one thread to distribute this feed to clients list. You can define many Send threads, each having its own clients list. So you can send to thousands of clients distributed between tens of threads.

C++
SharedQueue sq;

struct CFeedClient
{
	CFeedClient()
	{	// initialize send pointer and offset
		m_pCurSendElem = NULL;
		m_nCurSendElemOffset = 0;
	}
	QNode *m_pCurSendElem;
	int m_nCurSendElemOffset;
	SOCKET m_sock;
	int Send(const char* lpcs, int nLength)
	{	return ::send(m_sock, lpcs, nLength, 0);	}
	...
};

void RecvThreadFunc(LPVOID pParam)
{
	int nLength;
	char szbuf[10240];

	while(true)
	{
		// Recv feed from some where
		... // (fill szBuf and nLength), data can be processed or transformed
		// save received feed in the queue
		sq.Recv(szbuf, nLength);
	}
}

void SendThreadFunc(LPVOID pParam)
{
	vector<CFeedClient>* pvClients = (vector<CFeedClient>*)pParam;
	while(true)
		// send saved feed to thread clients
		for(vector<CFeedClient>::iterator client = 
			pvClients->begin(); client != pvClients->end(); client++)
			sq.Send(*client, client->m_pCurSendElem, 
			client->m_nCurSendElemOffset);
}

void Usage()
{
	vector<CFeedClient> vClients;
	// create Recv thread
	_beginthread(RecvThreadFunc, 0, NULL);
	// create Send thread to send saved feed to thread clients
	_beginthread(SendThreadFunc, 0, (LPVOID)&vClients);
	// Note: you can create multiple threads with each has its own clients list
}

Points of Interest

  1. Thread Safe

    SharedQueue is a thread safe class as it synchronizes changes of its linked list with critical section. Queue Linked List changes are limited to add or remove new nodes to the queue, which is done by Enqueue, Dequeue, or Clear functions. The other functions Recv and Send are independent, so no need to synchronize their access, and they are the only callers for Enqueue and Dequeue. So, the user of the shared queue can design one thread to receive feed and multiple threads to send it or its resultant as in section Feed Distribution.

  2. Send Timeout

    Sender may set "send timeout" to a small value to make sure all clients have equal chances or sending data, but in this case slow clients (slow connection) may lag than fast clients (fast connection). Slow client may accept delayed data than losing data.

  3. Sender Type

    "Sender Type" that is passed to the Send template can be a Socket class, that has a function Send, or it can be any type that has a Send function. User of this class can take data and transfer it to any type or do any processing like compression or encryption, then keep or send it.

  4. Thread Management

    To build a simple feed distribution system we need:

    1. One thread to receive feed in a primary queue.
    2. One thread to analyze feed and format a new feed suitable for clients and save it in a secondary queue.
    3. N threads to send feed from secondary queue to clients of each thread. And to join new clients to the threads we can use a simple Round-robin scheduling to add new clients. Round-robin will achieve a simple and fast Load balancing technique.

    Image 14

  5. Data keeping for Disconnect clients

    If a client disconnected due to network errors, our system should let it for a certain period in threads data hoping that it will reconnect again shortly. In this way the object regarding this client still keeping its offset in the queue and will continue from its last location in the data. So, it will not lose any part of the data. If the keeping period is exceeded, the systems should delete client object, and next time it reconnects, it will join in the queue tail to receive just arrived feed as a new client.

Updates

  • 12/07/2011: v0.9000 Initial Posted
  • 18/01/2015: v0.9001 Modify queue to be circular and remove garbage collection.

References

Thanks to...

God

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
Egypt Egypt

Comments and Discussions

 
Questionisolate receiving from sending in separate threads, because both are working on the tail node? Pin
ehaerim1-Dec-21 9:46
ehaerim1-Dec-21 9:46 
QuestionNo need for synchronized access to QNode::m_nRecvOffset between Recv and Send? Pin
ehaerim27-Nov-21 11:36
ehaerim27-Nov-21 11:36 
QuestionHow to use CSharedQueue in delay mode? What feasible delay scenarios can happen? Pin
ehaerim27-Nov-21 8:59
ehaerim27-Nov-21 8:59 
QuestionLooks very useful but better to have some good usages samples Pin
ehaerim10-Nov-21 20:47
ehaerim10-Nov-21 20:47 
GeneralMy vote of 5 Pin
Manoj Kumar Choubey28-Mar-12 0:01
professionalManoj Kumar Choubey28-Mar-12 0:01 
QuestionI like this Pin
Telal Hassan24-Mar-12 8:19
Telal Hassan24-Mar-12 8:19 
AnswerRe: I like this Pin
Hatem Mostafa24-Mar-12 9:26
Hatem Mostafa24-Mar-12 9:26 
GeneralMy vote of 5 Pin
Telal Hassan24-Mar-12 8:10
Telal Hassan24-Mar-12 8:10 
GeneralMy vote of 5 Pin
Teashirt220-Aug-11 16:39
Teashirt220-Aug-11 16:39 
Suggestionlock-free queue Pin
Paul Heil9-Aug-11 9:17
Paul Heil9-Aug-11 9:17 
GeneralRe: lock-free queue Pin
Hatem Mostafa20-Aug-11 1:19
Hatem Mostafa20-Aug-11 1:19 
GeneralMy vote of 5 Pin
ahaggag8-Aug-11 5:21
ahaggag8-Aug-11 5:21 
QuestionWhat is the difference to safmq? Pin
Member 42571217-Aug-11 0:43
Member 42571217-Aug-11 0:43 
AnswerRe: What is the difference to safmq? Pin
Hatem Mostafa7-Aug-11 1:17
Hatem Mostafa7-Aug-11 1:17 
QuestionFeeds and Queues Pin
FrankLaPiana18-Jul-11 12:53
FrankLaPiana18-Jul-11 12:53 
Nice article, good presentation on a somewhat complex subject.

Just some additional thoughts...

Feed is often a specific term used in financial processing, to refer to one (of a set) of sources, such as a finanical exchange. Since there are multiple exchanges, there are multiple feeds.

Of course, in the general term, feed can refer to any single source.

But getting back to the first term (multiple feeds), all running at fairly high speeds (thousands to millions of messages per second), then queing becomes very important.

As an example of how to improve queuing, it would be absolute bliss to have a lockless & contentionless queue to allow multiple producers to (single or multiple) consumers. The issue with multiple consumers is that of course you have to serialize the data so that it is sequentially applied to the storage and before being sent to consumers. You wouldn't want to send out-of-order data to a consumer.

With today's multi-core machines, it's practical to have multiple producers/multiple consumers to distribute the load across cores. In this case you'd probably want to use data decomposition.

Anyway, back to queues - using a spin-lock, or a CAS method, you can significantly reduce lock/contention overhead.

The critical_section has a spin variation, since XP, and should be used.

A user-mode spin-lock (usually this is CAS against the lock object itself) is very fast, and avoids the user-OS-user transition, but can result in random access into the queue when writing, since there is no order of acquisition.

The CAS method (perhaphs against a single-list) has the advantages of spin lock but doesn't allow multiple values to be updated in a safe manner.

Using a "double-queue" algorithm - ie, locking on the producer side, and then moving the entire list to the consumer side - can significantly reduce contention between the producer(s) and consumers. The consumer can then read against the consumer-side copy of the queue without locking.

If you have multiple consumers, and data-decomposition, you can actually have multiple queues to reduce queue counts and contention.
GeneralMy vote of 5 Pin
ThatsAlok14-Jul-11 1:59
ThatsAlok14-Jul-11 1:59 
GeneralMy vote of 5 Pin
Espen Harlinn13-Jul-11 9:25
professionalEspen Harlinn13-Jul-11 9:25 
QuestionGood job! Pin
LoveVc12-Jul-11 15:38
LoveVc12-Jul-11 15:38 
GeneralMy vote of 5 Pin
Wendelius12-Jul-11 4:22
mentorWendelius12-Jul-11 4:22 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.