Click here to Skip to main content
Click here to Skip to main content

Concurrent Event Sink Template Under MultiThreading in C++

, 24 Jan 2014 CPOL
Rate this:
Please Sign up or sign in to vote.
A new concurrent event sink template class under multithreading in C++

Introduction 

There are some articles here talking about event sink in C++, but none of them is under multithreading environment. In this article, I developed a concurrent event sink template class supporting raising multiple types of events from any number of thread simultaneously. This template is very good and convenient utility to do publishing, subscribing, unsubscribing events from multiple threads, and I implemented it in terms of map, spin lock and thread local storage.

Background 

Event sinks are classes designed to receive incoming events from other objects. The relationship between the other objects who raise events, and event sinks who act upon receiving events can be fitted as observer pattern described in the famous design pattern book. The other objects are publishers who generate all events, the event sink classes are the subscriber or listeners to listen to those events and then react upon on them.  

There are a lot of applications using this and some of them have been documented in the literature. I come from financial trading industry. The low latency and high throughput trading applications need not only listen to events like price update event, order update event and some financial news updates etc, but also they need synchronize these events together, analyze those information together and then make timing critical trading decisions.  

In C++, an event sink is implemented as a callback, while in C# or Java, have built in support for event sinks by using delegate functions. As mentioned above, more often than not, we need put those event sink together in order to synchronize and analyze them together, There are some articles talking about event sink[1][2], but none of them is under multithreading environment. Most of the software systems now have to support parallelism/concurrency, for speed, throughput, efficiency. Nowadays with modern multi core hardware, a running process has to have multiple threads - some are spawned on demand, few are waiting for some message to do work, some are just waiting for other threads/processes (or other kernel objects), and some are just "on bench", participating in thread pool party for the process. In this article, I implement a concurrent event sink template class supporting raising multiple types of events from any number of thread simultaneously in Windows. I will implement Linux version later.

Event Sink Template Class Implementation

Let us take a brief look at data members of this event sink template class
template < class _TySinkClass >
class EventSink
{
typedef std::multimap< _TySinkClass*, SinkItem,  std::less< _TySinkClass* > > EVENTMAP;
mutable SpinLock m_cs;
mutable EVENTMAP m_map;
mutable ThreadStorage m_sd;
bool m_bInitialized;
} 

The key data member in Event Sink template class is a STL multi map, which is used to store all event sink classes, in order for quicker search. The key of the map is the pointer to the event sink object, the value is an object of a class called Sink Item shown below, with each data member comment out to describe its purpose. The sink item class inherit from a reference count base class, which is to count how many reference to an actual event sink object.

struct SinkItem : public RefCount
{		
// Set to true if this item is in the process of being removed.
bool bRemoved;
// Set to erase when the callback completes.
bool bForceErase;
// Number of active callbacks or threads
unsigned int uiActive;
// Wait on remove handle.
HANDLE hEvent;
}

When an object of an event sink class is interested in any event from the the publisher, it needs subscribe itself to by calling AddSink() function. In this function, it search whether this item exist or not, if it does, then add an reference, if not, insert a new item into the map. I use a spin lock to guarantee only one thread at a time.

When the Event Sink template class needs publish an event, it call a macro (PUBLISH_X) to do that. In this macro, it use a event walker, essentially the iterator of the map, to walk through the map find all valid event items, and call the callback function of the event sink object. Please read the detail comments there to understand how it is done. Valid event item means that the event is not in the process of removing ( bRemoved flag is false ), and certainly not to be forced to erase. (bForceErase flag is true). The trick part is that how the uiActive is used to count the number of concurrent thread.  

When an object of an event sink class is not interested in any event any more, it needs unsubscribe itself by calling RemoveSink() function. In this function, it first search this event item, and then decrement the reference to this item, if the reference count go down to zero, we need start to mark this item as to be removed. If the number of the concurrent thread is zero, we can really remove it from the map. If there is an thread accessing this item concurrently, then we need check whether the thread where removsink() is running is the same thread as the concurrent publishing thread, if not, we can wait for some time to let the other thread finish. The way to judge whether the threads are the same or not, is that I use a thread local storage (TLS). When starting to publish, I set this TLS variable as 1, when done publishing, set back TLS variable as 0, since thread local storage cannot be seen by other threads, by checking this variable, I know whether removesink() is running in the same thread as the concurrent publishing thread. About thread local variable, you can check this article[3].

Using the code and Testing

The typical usage of the above Event Sink template class is that any publisher can inherit this template class, with the event class as the template argument.  The consumers that consumes those events should inherit the event class. The example is shown below:

class Publisher : public EventSink < Event >
class Consumer : public Event 

The event data and the event class are defined as below:

struct EventData
{
int EventType;
long Data;
};
class Event
{
public:
virtual void OnNewData(EventData * ) {}
}; 

In publisher class, I add a vector of thread handle in order that a publisher can publish any number events from any number of threads. As mentioned above, publisher inherit from Event Sink class, so a publisher object has only one map, which store all events from any number of different threads. Let us take a look at data members in the publisher class.

class Publisher : public EventSink < Event >
{
private:
bool m_bIsInitialized; // To judge whether publisher is initialized with proper event data or not
int m_iCurrentIndex; //To identify the index the current thread or event data
int m_iNumThread;	// The number of publishing thread
std::vector<EventData *> m_vecEventData; // vector to store all event data pointers
std::vector<HANDLE> m_vecThread;		  // vector to store all thread handles
};

In consumer class, I have one thread handle as a data member, so that the subscription is from different thread, while unsubscribe is from main thread as you can see from the code.

In main function, I can run any number of thread for a publisher, and assign corresponding number of event data for each thread. Also I can create any number of consumers, who subscribe to this publisher from a separate thread, and later, I can unsubscribe some consumers from the main thread. The code is below.

I have tested up to 1500 threads, 100 consumers. When running 1500 threads, 100 consumers,  the program has been running fine with exact 1501 thread ( 1500 publishing thread plus a main thread), average 7.1% CPU load, Peak 12% CPU (during 1500 thread initializing period, Peak CPU and average CPU is higher than the period after 1500 thread initializing period)   and about 53 MB memory usage in an duel core Intel i7- 3520M@2.9GHz, 2.9GHz machine.

int _tmain(int argc, _TCHAR* argv[])
{
EventData myEventData[NUM_THREADS];
std::vector< EventData *> pEd;
pEd.resize(NUM_THREADS);
for ( int i = 0; i < NUM_THREADS; i++ )
{
// Each event data has it own event type
myEventData[i].EventType = i + 1;
// the address of each event data is assigned to the member in the vector
pEd[i] = &myEventData[i];
}
//myPublisher has any number of thread
Publisher myPublisher(NUM_THREADS);
// SetEventData for this publisher
myPublisher.SetEventData( pEd );
Consumer myConsumers[NUM_CONSUMERS];
for ( int i = 0; i < NUM_CONSUMERS; i++ )
{
// set this consumer up with the publisher
myConsumers[i].SetAPublisher( &myPublisher );
// Subscribe this consumer to the publisher
myConsumers[i].SubScribeToPublisher();
}
// the publisher start to publish
myPublisher.initialize();
Sleep(10000 );
// UnSubscribe the first consumer
myConsumers[0].UnSubScribe();
for ( int i = 0; i < NUM_THREADS; i++ )
joinThread( myPublisher.GetThreadHandle( i ) );
return 0;
} 

Revision History 

January 23, 2014. Correct some terminology about Event and Event Sink, according to some feedback

Literature  

  1. http://www.codeproject.com/Articles/18198/An-event-mechanism-for-C-using-an-interface-based
  2. http://www.codeproject.com/Articles/6614/Generic-Observer-Pattern-and-Events-in-C
  3. http://www.codeproject.com/Articles/8113/Thread-Local-Storage-The-C-Way

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

PengHeProfessor
Software Developer (Senior)
United States United States
I have been developing low latency high throughput applications, services and platforms in financial trading industry since 2004, mostly in C++, some in C#.net and Java.

Comments and Discussions

 
QuestionWhy all the trouble to "re-create the wheel" PinmemberAndy Bantly18-Aug-14 10:37 
QuestionEvent sink Pinmembergeoyar6-Jan-14 9:41 
AnswerRe: Event sink PinprofessionalPengHeProfessor7-Jan-14 6:51 
GeneralRe: Event sink PinmemberStefan_Lang8-Jan-14 5:37 
GeneralRe: Event sink PinprofessionalPengHeProfessor8-Jan-14 10:27 
GeneralRe: Event sink PinmemberStefan_Lang8-Jan-14 21:26 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Terms of Use | Mobile
Web01 | 2.8.150129.1 | Last Updated 24 Jan 2014
Article Copyright 2014 by PengHeProfessor
Everything else Copyright © CodeProject, 1999-2015
Layout: fixed | fluid