SW Message Bus






4.74/5 (21 votes)
SW Message Bus represents message flow abstraction mechanism in a complex, multithreaded SW system.
Introduction
SW Message Bus represents message flow abstraction mechanism in a complex, multithreaded SW system.
The term "Bus" was chosen because the main idea is very similar to HW Data Bus. Once HW unit puts the data, another HW unit, that is interested in this data, picks it up from the bus.
In case of the SW Message Bus that works by the same way. One SW module puts a message on the bus, another, that is interested in this message picks it up from the bus. Bundling between the Publisher (who puts the message) and the Subscriber (who picks up the message) is done only by Message Type. No Hard Coded, no Run Time, no Configuration registration between Subscriber and Publisher is required. The only shared information between any Publisher and any Subscriber is the type of the message. Type only. Nothing else.
Another big advantage is a strong type checking all the way from the Publisher to Subscriber. No casting. Never.
The Idea
The main idea behind this infrastructure is to use run time type unique identification, for any published message, and to call any subscriber, that subscribed for this message type. In C++, that may be done using typeid
from typeinfo
, but I preferred, simpler and more efficient method, that didn't involve RTTI.
using TypeId = uintptr_t;
template < typename T >
static TypeId GetTypeId()
{
static uint32_t placeHolder;
return (reinterpret_cast<TypeId>(&placeHolder));
}
Once we know to get unique identification of Message type - the rest is simple.
Publisher puts message on a bus, using unique id of the message we know to find (in internal Message Bus repository) all subscribers that are registered to this message type.
That's it.
Subscriber
Who may subscribe for messages?
Any callable target. That may be function, lambda expression, functor or bind expression. Subscriber type is defined as:
template < typename MSG_TYPE > using Subscriber = std::function<void(MSG_TYPE)>;
Publisher
Who may publish messages?
Everybody. No restrictions. Message of any Message type may be published. If nobody subscribed for some type, nobody will get this message, but message may be published in any case.
Message Bus API
API of Message Bus is extremely simple:
template < int BUS_ID = 0 > class MsgBus
{
public:
/*!***************************************************************************
* @brief Subscribe for receiving messages of the specific Message Type.
*
* @tparam MSG_TYPE Type for which new subscriber will be added.
*
* @param subscriber Callable target.
*
* @return Handle associated with a registered subscriber. Use IsValidHandle()
* for operation success checking.
*
*****************************************************************************/
template < typename MSG_TYPE >
static SubscriberHandle Subscribe( Subscriber< MSG_TYPE > subscriber );
/*!***************************************************************************
* @brief UnSubscribe from receiving messages of the specific Message Type.
*
* @param handle Subscriber handle.
*
*****************************************************************************/
static void UnSubscribe( SubscriberHandle& handle );
/*!***************************************************************************
* @brief Publish message by blocking call. The method will return only
* when all subscribers will receive published message.
*
* @tparam MSG_TYPE Message type - optional, will be deducted by compiler.
*
* @param msg Message to be published.
*
*****************************************************************************/
template < typename MSG_TYPE >
static void PublishBlocking( const MSG_TYPE& msg );
/*!***************************************************************************
* @brief Publish message by asynchronous call. The method will return
* immediately, the message will be delivered asynchronously.
*
* @tparam MSG_TYPE Message type - optional, will be deducted by compiler.
*
* @param msg Message to be published.
*
*****************************************************************************/
template < typename MSG_TYPE >
static void PublishAsync( const MSG_TYPE& msg );
/*!***************************************************************************
* @brief Check Subscriber handle validity.
*
* @param handle Subscriber handle.
*
* @return true - valid handle, false else.
*
*****************************************************************************/
static bool IsValidHandle( SubscriberHandle& handle );
private:
static MsgBusRepository msgBusRepository;
private:
/// Instantiation, coping, moving and deleting of MsgBus class is prohibited.
MsgBus() = delete;
~MsgBus() = delete;
MsgBus( MsgBus& ) = delete;
MsgBus( MsgBus&& ) = delete;
MsgBus& operator= ( MsgBus& ) = delete;
MsgBus& operator= ( MsgBus&& ) = delete;
};
////////////////////////////////////////////////////////////////////////////////
template < int BUS_ID >
template < typename MSG_TYPE >
SubscriberHandle MsgBus< BUS_ID >::Subscribe( Subscriber< MSG_TYPE > subscriber)
{
return msgBusRepository.Subscribe< MSG_TYPE >( subscriber );
}
////////////////////////////////////////////////////////////////////////////////
template < int BUS_ID >
void MsgBus< BUS_ID >::UnSubscribe( SubscriberHandle& handle )
{
msgBusRepository.UnSubscribe( handle );
}
////////////////////////////////////////////////////////////////////////////////
template < int BUS_ID >
template < typename MSG_TYPE >
void MsgBus< BUS_ID >::PublishBlocking( const MSG_TYPE& msg )
{
msgBusRepository.Publish( msg );
}
////////////////////////////////////////////////////////////////////////////////
template < int BUS_ID >
template < typename MSG_TYPE >
void MsgBus< BUS_ID >::PublishAsync( const MSG_TYPE& msg )
{
std::async( std::launch::async,
MsgBus< BUS_ID >::PublishBlocking< MSG_TYPE >,
msg
);
}
////////////////////////////////////////////////////////////////////////////////
template < int BUS_ID >
bool MsgBus< BUS_ID >::IsValidHandle( SubscriberHandle& handle )
{
return handle.IsValid();
}
////////////////////////////////////////////////////////////////////////////////
template < int MSG_BUS_NUM >
MsgBusRepository MsgBus< MSG_BUS_NUM >::msgBusRepository;
Example
Following is a simple example of Message Bus usage. Let's define message type and some subscribers:
using namespace std;
struct MSG_TYPE_1
{
int i;
};
void RegularFunctionSubscriber( MSG_TYPE_1 msg )
{
cout<< "FunctionSubscriber " << msg.i << endl;
}
class FunctorSubscriber
{
public:
void operator()( MSG_TYPE_1 msg )
{ cout<< "FunctorSubscriber " << msg.i << endl; }
};
Now we are ready to use our Message Bus:
MSG_TYPE_1 msg1 = { 10 };
FunctorSubscriber functorSubscriber;
// Regular Function Subscriber
SubscriberHandle handle1 = MsgBus<>::Subscribe< MSG_TYPE_1 >( RegularFunctionSubscriber );
// Functor Subscriber
SubscriberHandle handle2 = MsgBus<>::Subscribe< MSG_TYPE_1 >( functorSubscriber );
// Lambda Function Subscriber
SubscriberHandle handle3 = MsgBus<>::Subscribe< MSG_TYPE_1 >( [](MSG_TYPE_1 msg)
{ cout<< "Lambda Subscriber " << msg.i << endl; } );
MsgBus<>::PublishBlocking( msg1 );
MsgBus<>::PublishAsync( msg1 );
MsgBus<>::UnSubscribe( handle1 );
MsgBus<>::UnSubscribe( handle2 );
MsgBus<>::UnSubscribe( handle3 );
Implementation
As I described above, Message Bus keeps all registered subscribers in internal repository, and knows to call them based on Message Type, when message is published.
Non typed template parameter, that MsgBus
receives (with a zero as a default), gives us possibility to create multiple Message Buses.
Message Bus repository is implemented as a map, where index is message id, and content is another map, that contains all subscribers for the specific message type.
/*!*****************************************************************************
* @file MsgBusRepository.h
*
* @brief Repository of all callable targets for specific bus.
*
* @author Evgeny Zavalkovsky
*
* @date February 2014
*******************************************************************************/
#ifndef MSGBUSREPOSITORY_H_
#define MSGBUSREPOSITORY_H_
#include <map>
#include "../API/MessageBusDefs.h"
#include "Infra/TypeId.h"
#include "Infra/SharedMutex.h"
#include "Infra/SubscriberHandle.h"
#include "MsgTypeContainer.h"
/*!*****************************************************************************
* @class MsgBusRepository
*
* @brief Repository of all callable targets for specific bus.
*
*******************************************************************************/
class MsgBusRepository
{
public:
/*!***************************************************************************
*
* @brief Constructor.
*
*****************************************************************************/
MsgBusRepository() : operational( true ) {}
/*!***************************************************************************
*
* @brief Destructor.
*
*****************************************************************************/
~MsgBusRepository()
{
mutex.LockExclusive();
for (auto iter: repositoryMap )
{
delete iter.second;
}
operational = false;
mutex.UnlockExclusive();
}
/*!***************************************************************************
*
* @brief Subscribe.
* Add new Subscriber to the repository.
*
*****************************************************************************/
template < typename MSG_TYPE >
SubscriberHandle Subscribe( Subscriber< MSG_TYPE > subscriber )
{
TypeId typeId = GetTypeId< MSG_TYPE >();
mutex.LockExclusive();
SubscriberHandleTyped< MSG_TYPE > handle;
if ( operational )
{
auto ret = repositoryMap.insert(
MsgBusRepositoryMapPair( typeId, nullptr ) );
/// Check if this is the first subscriber for the MSG_TYPE.
if ( ret.second == true )
{
ret.first->second = new MsgTypeContainer< MSG_TYPE >;
}
MsgTypeContainer< MSG_TYPE >*
container = static_cast<MsgTypeContainer< MSG_TYPE >*>(ret.first->second);
/// Add subscriber to the container.
container->Add( handle, subscriber);
}
else
{
handle.SetInvalid();
}
mutex.UnlockExclusive();
return handle;
}
/*!***************************************************************************
*
* @brief UnSubscribe.
* Remove subscriber from repository.
*
*****************************************************************************/
void UnSubscribe( SubscriberHandle& handle )
{
mutex.LockExclusive();
if( operational && handle.IsValid() )
{
TypeId typeId = handle.GetTypeid();
auto iter = repositoryMap.find( typeId );
if ( iter != repositoryMap.end() )
{
MsgTypeContainerBase* container =iter->second;
container->Remove( handle );
/// Check if this is the last subscriber in the container
if( container->Empty() )
{
repositoryMap.erase( iter );
delete container;
}
}
}
handle.SetInvalid();
mutex.UnlockExclusive();
}
/*!***************************************************************************
*
* @brief Publish.
* Publish message for all subscribers for MSG_TYPE.
*
*****************************************************************************/
template < typename MSG_TYPE > void Publish( const MSG_TYPE& msg )
{
TypeId typeId = GetTypeId< MSG_TYPE >();
mutex.LockShared();
if( operational )
{
auto iter = repositoryMap.find( typeId );
if ( iter != repositoryMap.end() )
{
MsgTypeContainer< MSG_TYPE >*
container = static_cast< MsgTypeContainer< MSG_TYPE >* >(iter->second);
container->Publish( msg );
}
}
mutex.UnlockShared();
}
/// Disable coping and moving.
MsgBusRepository( MsgBusRepository& ) = delete;
MsgBusRepository( MsgBusRepository&& ) = delete;
MsgBusRepository& operator= ( MsgBusRepository& ) = delete;
MsgBusRepository& operator= ( MsgBusRepository&& ) = delete;
private:
using MsgBusRepositoryMap = std::map< TypeId, MsgTypeContainerBase* >;
using MsgBusRepositoryMapPair = std::pair< TypeId, MsgTypeContainerBase* >;
bool operational;
MsgBusRepositoryMap repositoryMap;
/// Multiple Readers - Single Writer Lock.
SharedMutex mutex;
};
#endif /* MSGBUSREPOSITORY_H_ */
The last snippet is implementation of the subscribers' container, per specific message type.
/*!*****************************************************************************
* @file MsgTypeContainer.h
*
* @brief Holds all callable targets of the specific MSG_TYPE.
*
* @author Evgeny Zavalkovsky
*
* @date February 2014
*******************************************************************************/
#ifndef MSGTYPECONTAINER_H_
#define MSGTYPECONTAINER_H_
#include <map>
#include "../API/MessageBusDefs.h"
#include "Infra/SubscriberHandle.h"
/*!*****************************************************************************
* @class MsgTypeContainerBase
*
* @brief Non template base of MsgTypeContainer class
* Required for omitting template parameter dependency
* in MsgTypeContainer class
*
*******************************************************************************/
class MsgTypeContainerBase
{
public:
MsgTypeContainerBase() = default;
virtual ~MsgTypeContainerBase() = default;
MsgTypeContainerBase( MsgTypeContainerBase& ) = delete;
MsgTypeContainerBase( MsgTypeContainerBase&& ) = delete;
MsgTypeContainerBase& operator= ( MsgTypeContainerBase& ) = delete;
MsgTypeContainerBase& operator= ( MsgTypeContainerBase&& ) = delete;
virtual void Remove( SubscriberHandle handle ) = 0;
virtual bool Empty() = 0;
};
/*!*****************************************************************************
* @class MsgTypeContainer
*
* @brief Holds all callable targets of the specific MSG_TYPE
*
*******************************************************************************/
template < typename MSG_TYPE >
class MsgTypeContainer : public MsgTypeContainerBase
{
public:
/*!***************************************************************************
*
* @brief Add.
* Add new callable target.
*
*****************************************************************************/
void Add( SubscriberHandle handle, Subscriber< MSG_TYPE > subscriber )
{
containerMap.insert( MsgBusContainerMapPair( handle, subscriber ) );
}
/*!***************************************************************************
*
* @brief Remove.
* Remove callable target.
*
*****************************************************************************/
void Remove( SubscriberHandle handle )
{
containerMap.erase( handle );
}
/*!***************************************************************************
*
* @brief Empty.
* Check if container is empty.
*
*****************************************************************************/
bool Empty()
{
return containerMap.empty();
}
/*!***************************************************************************
*
* @brief Publish.
* Publish message to all targets in conatiner.
*
*****************************************************************************/
void Publish( const MSG_TYPE& msg )
{
for (auto& iter: containerMap )
{
iter.second( msg );
}
}
/// Default Constructor and Destructor
// Deleted Move and Copy Constructors and Assign Operators
MsgTypeContainer() = default;
virtual ~MsgTypeContainer() noexcept = default;
MsgTypeContainer( MsgTypeContainer& ) = delete;
MsgTypeContainer( MsgTypeContainer&& ) = delete;
MsgTypeContainer& operator= ( MsgTypeContainer& ) = delete;
MsgTypeContainer& operator= ( MsgTypeContainer&& ) = delete;
private:
using MsgBusContainerMap = std::map< SubscriberHandle,
Subscriber< MSG_TYPE >
>;
using MsgBusContainerMapPair = std::pair< SubscriberHandle,
Subscriber< MSG_TYPE >
>;
MsgBusContainerMap containerMap;
};
#endif /* MSGTYPECONTAINER_H_ */
Hidden Details
I didn't overview in this article two generic infrastructure implementations:
- Counting Semaphore that for some reason is not a part of C++11
- Multiple Readers - Single Writer Semaphore that is planned to be in C++14, as a
std::shared_lock
For details, please look at the source code.
OS Dependency
Message Bus is absolutely OS independent.
Documentation
Message Bus has a full DoxyGen generated documentation.
Please look at Documentation.html.
Compiler Support
Message Bus was tested using gcc 4.8.1 and VS2013 compilers.
For gcc 4.8.1 compiler: compiler flag -std=c++11 and linker flag -pthread have to be added.