Click here to Skip to main content
Click here to Skip to main content

Tagged as

Real-time Feed Distribution using Shared Queue

, 6 Aug 2011 CPOL
Rate this:
Please Sign up or sign in to vote.
This article demonstrates how to forward real-time feed to multiple clients with different bandwidths.

Contents

Introduction

Real-time feed is a continuous flow of data that carries instantaneous information. That feed can be associated to one or several clients, and can be associated as a whole to all clients, or each client has its own parts (packets) of feed. We can put them in points to clarify our scope:

  1. Feed is distributed to one client.
  2. Feed is distributed to all clients.
    1. Feed is distributed between clients (each one takes his belonging data).
    2. The whole feed is distributed to all clients.

First one is the simplest case as we have only one client so we can store arrived data in a single queue, and manage it with simple queue management technique. Second one is our interest; the two sections (2-1, 2-2) are so different in handling. Number (2-1) means that the sender of the feed must include destination id in each sent packet. A good example for that section is network router. Network routers receive packets from senders and route them to the right destination. Section (2-2) means the feed is forwarded to all clients, so no need for packets to include destination id. Section (2-2) is the scope of this article. The final goal of the article is to distribute real-time feed between multiple clients with different bandwidths as in the following figure:

Feed Distribution

Real time stream distribution is a critical issue that needs a perfect data structure to store and distribute real time feed efficiently. The system receives real-time feed and forwards it to all clients as in the following figure:

Actually, most systems don't forward feed directly to clients. They have two options:

  1. Store it in a temporary storage then forward it to all clients.

  2. Process feed (transfer-compress-encrypt) then store results in a temporary storage, then send results to all clients.

There are many reasons for saving the feed or its processing results in a temporary storage before sending to clients:

  1. First, to isolate the receiving operation from the sending or processing operations.
  2. Second, to enable sending data in different threads so as to balance send operation depending on clients' bandwidths.

So, we need to decide what is the most suitable data structure to store the feed? And how can we send to clients of different bandwidths? To answer this question needs to assign our goals from this storage:

  • Fast data insertion and removal.
  • Receiving data doesn't be blocked while sending data from the same storage for each client.
  • Each Client has different offset in the storage depending on its bandwidth.

Shared Queue

The best storage to achieve these goals is the pipe or the queue. Feed is received from one side and is sent from the other side; First In First Out (FIFO). A queue is a collection of entities linked in order. New entity is added to the tail of the queue and old entity is removed from the head terminal. So, queue is just a FIFO (First In First Out) data structure. The first added entity is the first manipulated one. Any Queue should keep its tail and head entities to handle addition and removal of them. Each entity in the queue points to the next entity.

So, the choice is suitable for our needs because:

  • Insertion and removal is so fast with linked list structure.
  • We isolate receiving data from sending it in a separate threads, because both are working on the tail element.
  • The owner of the Shared Queue should keep an object for each client. This object is to keep a pointer to client node on the queue and the offset in that node.
  • So, we can manage the sending operation in many threads.threads. Each thread sends to available clients. Each client object should pass its working element pointer and send offset to the Queue to send to each client depending on its location in the whole queue.
  • Previous point is the key for the whole article as it is the solution for slow (low bandwidth) clients. As a result, fast clients are always working in the tail element and their send offset are synchronized with the "receive offset" of the tail element of the queue.

Shared Queue functions:

IsEmpty() Checks if the queue is empty
Enqueue() Adds new element at queue tail if needed
Dequeue() Gets element to send its data
Recv() Receives new arrived data to tail node
Send() Sends data to one client
MaxQueueLength Max queue length
Head Points to head node of the queue
Tail Points to tail node of the queue
Garbage Points to first garbage node

In our case, we need some special cases to fulfill our need:

  • In real situation clients, speeds are not equal. Clients have different bandwidths. Bandwidth affects the quantity that each client can receive instantaneously. Fast Clients are always ready to receive data. So, slow clients may delay the removal of head entity of the queue, in addition, the whole sending process. The distributor agent must handle this case in some way to make sure fast clients receive real time data on time, and slow clients receive real time data as bandwidth allows.
  • Each queue element contains buffer of 32 K bytes to store received feed. So, the Enqueue() function doesn't add new element unless tail element receives its full 32 K bytes, then it adds a new element and adjusts the new tail.
    #define MAX_ELEMENT_SIZE	32768
    struct QElement
    {
    	QElement(int nSequence)
    	{
    		m_nSequence = nSequence;
    		m_nRecvOffset = 0;
    		m_pNext = NULL;
    	}
    	// element creation sequence in the queue
    	int m_nSequence;
    	// offset of received data in element buffer
    	int m_nRecvOffset;
    	// pointer to the next element in the queue
    	QElement* m_pNext;
    	// buffer of the element
    	char m_pBuffer[MAX_ELEMENT_SIZE];
    };

Enqueue

The standard usage of the function Enqueue is to add new node at queue tail, but in our queue it does only that if the tail node is full of data. In other words, the Queue uses the Enqueue function to get the working (not full) tail node. It also adds received buffer to queue element and increments "receive offset" (m_nRecvOffset) with amount of received data. The Enqueue function checks for tail "receive offset" to decide if it is full or not as in the following flow-chart figure:

//************************************
// Method:    Enqueue
// Access:    protected 
// Returns:   QElement*
// Purpose:   add new element at queue tail if needed
//************************************
QElement* Enqueue()
{
	QElement* pQE;
	::EnterCriticalSection(&m_cs);
	if(m_pTail == NULL)
		// initialize first element in the list
		pQE = m_pHead = m_pTail = m_pGarbage = new QElement(0);
	// check if last received element reached its element end
	else	if(m_pTail->m_nRecvOffset >= MAX_ELEMENT_SIZE)
	{	// add new element to the list and let last element points to it
		pQE = m_pTail->m_pNext = new QElement(m_pTail->m_nSequence + 1);
		// increment Tail to the new created element
		m_pTail = pQE;
		// increment Head
		if(m_pTail->m_nSequence > m_nMaxQueueLength)
			// advance head pointer to next element
			m_pHead = m_pHead->m_pNext;
		...
	}
	else 
		// in the middle of the element
		pQE = m_pTail;
	::LeaveCriticalSection(&m_cs);
	return pQE;
}

Dequeue

The queue uses its protected function Dequeue() to retrieve an element to send its data to client. Each client must have a pointer (initialized to NULL) to its working node and the offset in this node. The Dequeue function uses these parameters to adjust next working node and offset as in the following flow-chart. The Dequeue function doesn't remove elements as in normal queue because shared queue sends data to multiple clients. Hence, all queue nodes is kept until they are removed by a garbage collector loop in the Enqueue function. The last point to mention is the type of the client, if it is a real-time then it should join the queue for the first time at its tail to receive last received data, or if it is not real-time it should join the queue at its head to receive all data.

//************************************
// Method:    Dequeue
// Access:    protected 
// Returns:   bool
// Purpose:   get head element to send its data
// Parameter: QElement * & pCurElement
// Parameter: int & nCurElementSendOffset
//************************************
bool Dequeue(QElement*& pCurElement, int& nCurElementSendOffset)
{
	::EnterCriticalSection(&m_cs);
	// pCurElement = NULL for new client
	if(pCurElement == NULL || pCurElement->m_nSequence < m_pHead->m_nSequence)
		// check if client need real time data or all out of date data
		if(m_bRealTime)
			// point to the tail directly
			pCurElement = m_pTail, nCurElementSendOffset = 
					m_pTail ? m_pTail->m_nRecvOffset : 0;
		else
			// point to the head to get all stored data (first in)
			pCurElement = m_pHead, nCurElementSendOffset = 0;
	// check if received element reach its storage end
	else	if(nCurElementSendOffset >= MAX_ELEMENT_SIZE && pCurElement->m_pNext)
		// get next element and reset send offset
		pCurElement = pCurElement->m_pNext, nCurElementSendOffset = 0;
	::LeaveCriticalSection(&m_cs);
	// success if an element is found
	return pCurElement != NULL;
}

Recv

Shared queue receives data in two ways.
First is Recv(const char* pBuffer, int nLength), which is a simple call with the buffer to be saved in the queue.
Second is template<class RECEIVER> int Recv(RECEIVER& receiver), which asks class of type RECEIVER to receive data.

Recv Raw Data

This is the first way to receive data. It receives buffer with any length and calls Enqueue function to get the working queue node to keep data. So, in many cases it needs to loop if received length is greater than MAX_ELEMENT_SIZE.

//************************************
// Method:    Recv
// Access:    public 
// Returns:   int
// Purpose:   ask class of type RECEIVER to receive data in queue nodes
// Parameter: const char * pBuffer
// Parameter: int nLength
//************************************
int Recv(const char* pBuffer, int nLength)
{
	int nRecvLength = 0;
	while(nRecvLength < nLength)
	{
		QElement* pQE = Enqueue();
		if(pQE->m_nRecvOffset < MAX_ELEMENT_SIZE)
		{	// receive in last element
			int nRecv = min(MAX_ELEMENT_SIZE - pQE->m_nRecvOffset, 
					nLength - nRecvLength);
			memcpy(pQE->m_pBuffer+pQE->m_nRecvOffset, 
					pBuffer+nRecvLength, nRecv);
			// increment element offset with the received bytes
			pQE->m_nRecvOffset += nRecv;
			nRecvLength += nRecv;
		}
	}
	// return total received bytes
	return nRecvLength;
}

Recv Template

This is the second way to receive data. It asks class of type RECEIVER to receive data. It calls Enqueue function to get the working queue node to keep data. This function loops to request data from the RECEIVER Recv function till the enqueued node "receive offset" reaches MAX_ELEMENT_SIZE.

//************************************
// Method:    Recv
// Access:    public 
// Returns:   int
// Purpose:	  ask class of type RECEIVER to receive data in queue nodes
// Parameter: RECEIVER& receiver
//************************************
template<class RECEIVER> int Recv(RECEIVER& receiver)
{	// get tail element to save received data
	QElement* pQE = Enqueue();
	int nRecvLength = 0, nRecv;
	while(pQE->m_nRecvOffset < MAX_ELEMENT_SIZE)
	{	// receive in last element
		if((nRecv = receiver.Recv(pQE->m_pBuffer + 
		pQE->m_nRecvOffset, MAX_ELEMENT_SIZE - pQE->m_nRecvOffset)) <= 0)
			return nRecv;
		// increment element offset with the received bytes
		pQE->m_nRecvOffset += nRecv;
		// increment total received bytes
		nRecvLength += nRecv;
	}
	// return total received bytes
	return nRecvLength;
}

Send Template

Although Send template has a simple Flow-Chart and code, it is the key of this article. To send to multiple clients, the owner of the queue should keep the "send element" and the "offset" in that element for each client. The key here is that each client in independent of other clients. Therefore, if the sender is a socket class and has a good connection it will be always synchronized with the receive process. If the sender connection is slow, it may lag, but it will not affect other clients. Client information (SendElement, ElementOffset) are passed to the Send function to be used as in the following Flow-Chart and Code.

//************************************
// Method:    Send
// Access:    public 
// Returns:   int
// Purpose:   delegate queue buffer send to a sender of class SENDER
// Parameter: SENDER& sender
//            the class that must implement Send(char*, int) function
// Parameter: QElement * & pCurElement
//            client current element pointer
// Parameter: int & nCurElementSendOffset
//            current offset in client element
//************************************
template<class SENDER> int Send(SENDER& sender, 
	QElement*& pCurElement, int& nCurElementSendOffset)
{	// get head element to send its data
	if(m_pHead == NULL || Dequeue(pCurElement, nCurElementSendOffset) == false)
		// return false as no elements to send
		return 0;
	// calculate bytes to send
	int nSendBytes = min(MAX_ELEMENT_SIZE/2, 
		pCurElement->m_nRecvOffset - nCurElementSendOffset);
	// send bytes to client
	if(nSendBytes <= 0 || (nSendBytes = sender.Send
		(pCurElement->m_pBuffer + nCurElementSendOffset, nSendBytes)) <= 0)
		return nSendBytes;
	// increment sent bytes offset
	nCurElementSendOffset += nSendBytes;
	// return total sent bytes
	return nSendBytes;
}

Garbage Collection

Shared Queue sends feed to multiple clients with various speeds, so some clients may have low bandwidth and may be delayed with a few nodes. Normal dequeue process frees sent nodes, but in our case we can't free nodes till we make sure that they are sent to all clients, or we can delay freeing it to a certain limit. In my queue, I follow the strategy of free nodes before queue Head with a certain length as in the following figure:

Remember, Receiving data is done from tail side, so, freeing nodes from garbage side will not affect receiving feed. Also, for real-time clients, send is done with client node pointer which is always the tail node for normal and fast clients. For slow non real-time clients, send starts from Head node and goes fast to tail node to work like real-time clients. Consequently, Garbage Collection is safe. In order to make the process safer, I put garbage collection code in the Enqueue code to group the allocation and free of nodes in the same function. Check garbage collection code underlined in the Enqueue code:

//************************************
// Method:    Enqueue
// Access:    protected 
// Returns:   QElement*
// Purpose:   add new element at queue tail if needed
//************************************
QElement* Enqueue()
{
	QElement* pQE;
	::EnterCriticalSection(&m_cs);
	if(m_pTail == NULL)
		// initialize first element in the list
		pQE = m_pHead = m_pTail = m_pGarbage = new QElement(0);
	// check if last received element reached its element end
	else	if(m_pTail->m_nRecvOffset >= MAX_ELEMENT_SIZE)
	{	// add new element to the list and let last element points to it
		pQE = m_pTail->m_pNext = new QElement(m_pTail->m_nSequence + 1);
		// increment Tail to the new created element
		m_pTail = pQE;
		// increment Head
		if(m_pTail->m_nSequence > m_nMaxQueueLength)
			// advance head pointer to next element
			m_pHead = m_pHead->m_pNext;
		// increment Garbage
		if(m_pHead->m_nSequence > min(m_nMaxQueueLength, MAX_QUEUE_LENGTH))
		{	// keep next element
			QElement* pNext = m_pGarbage->m_pNext;
			// clear element
			delete m_pGarbage;
			// point to next element
			m_pGarbage = pNext;
		}
	}
	else 
		// in the middle of the element
		pQE = m_pTail;
	::LeaveCriticalSection(&m_cs);
	return pQE;
}

Usage

You can define any number of queues in your application. The code listed here defines one queue, one thread to receive feed, and one thread to distribute this feed to clients list. You can define many Send threads, each having its own clients list. So you can send to thousands of clients distributed between tens of threads.

SharedQueue sq;

struct CFeedClient
{
	CFeedClient()
	{	// initialize send pointer and offset
		m_pCurSendElem = NULL;
		m_nCurSendElemOffset = 0;
	}
	QElement *m_pCurSendElem;
	int m_nCurSendElemOffset;
	SOCKET m_sock;
	int Send(const char* lpcs, int nLength)
	{	return ::send(m_sock, lpcs, nLength, 0);	}
	...
};

void RecvThreadFunc(LPVOID pParam)
{
	int nLength;
	char szbuf[10240];

	while(true)
	{
		// Recv feed from some where
		... // (fill szBuf and nLength), data can be processed or transformed
		// save received feed in the queue
		sq.Recv(szbuf, nLength);
	}
}

void SendThreadFunc(LPVOID pParam)
{
	vector<CFeedClient>* pvClients = (vector<CFeedClient>*)pParam;
	while(true)
		// send saved feed to thread clients
		for(vector<CFeedClient>::iterator client = 
			pvClients->begin(); client != pvClients->end(); client++)
			sq.Send(*client, client->m_pCurSendElem, 
			client->m_nCurSendElemOffset);
}

void Usage()
{
	vector<CFeedClient> vClients;
	// create Recv thread
	_beginthread(RecvThreadFunc, 0, NULL);
	// create Send thread to send saved feed to thread clients
	_beginthread(SendThreadFunc, 0, (LPVOID)&vClients);
	// Note: you can create multiple threads with each has its own clients list
}

Points of Interest

  1. Thread Safe

    SharedQueue is a thread safe class as it synchronizes changes of its linked list with critical section. Queue Linked List changes are limited to add or remove new nodes to the queue, which is done by Enqueue, Dequeue, or Clear functions. The other functions Recv and Send are independent, so no need to synchronize their access, and they are the only callers for Enqueue and Dequeue. So, the user of the shared queue can design one thread to receive feed and multiple threads to send it or its resultant as in section Feed Distribution.

  2. Send Timeout

    Sender may set "send timeout" to a small value to make sure all clients have equal chances or sending data, but in this case slow clients (slow connection) may lag than fast clients (fast connection). Slow client may accept delayed data than losing data.

  3. Sender Type

    "Sender Type" that is passed to the Send template can be a Socket class, that has a function Send, or it can be any type that has a Send function. User of this class can take data and transfer it to any type or do any processing like compression or encryption, then keep or send it.

  4. Thread Management

    To build a simple feed distribution system we need:

    1. One thread to receive feed in a primary queue.
    2. One thread to analyze feed and format a new feed suitable for clients and save it in a secondary queue.
    3. N threads to send feed from secondary queue to clients of each thread. And to join new clients to the threads we can use a simple Round-robin scheduling to add new clients. Round-robin will achieve a simple and fast Load balancing technique.

  5. Data keeping for Disconnect clients

    If a client disconnected due to network errors, our system should let it for a certain period in threads data hoping that it will reconnect again shortly. In this way the object regarding this client still keeping its offset in the queue and will continue from its last location in the data. So, it will not lose any part of the data. If the keeping period is exceeded, the systems should delete client object, and next time it reconnects, it will join in the queue tail to receive just arrived feed as a new client.

Updates

  • 12/07/2011: Posted version v0.9000

References

Thanks to...

God

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author


Comments and Discussions

 
GeneralMy vote of 5 Pinmembermanoj kumar choubey28-Mar-12 0:01 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web04 | 2.8.141022.1 | Last Updated 7 Aug 2011
Article Copyright 2011 by Hatem Mostafa
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid