A C++ delegate library that supports synchronous, asynchronous and remote function invocation on any C++03 or higher based system. Any serialization is supported such as binary, JSON and XML. All communication protocols are supported including UDP, TCP, serial, and data pipes.
Introduction
C++ delegates simplify usage of a publish/subscribe pattern. With delegates, client code anonymously registers a callback function pointer to receive runtime notification. In other languages, delegates are a first class-feature and built into the language. Not so in C++ which leaves developers to create custom libraries to emulate delegates.
Delegates normally support synchronous executions, that is, when invoked, the bound function is executed within the caller’s thread of control. A few years ago, I wrote the article entitled “Asynchronous Multicast Delegates in C++”. The library offers synchronous and asynchronous function invocations on any callable function. Simply put, the callback function and callback thread is specified during subscriber registration. During notification, the bound callback function is invoked on the subscriber’s desired thread of control.
This article explains an extension to my original C++ delegate library: remote delegates. A remote delegate invokes a function (with data) on a remote system. A remote system is defined as an application running on a different CPU separated by a communication link or a program executing within a separate process. To a user, the delegate appears local; however the library invokes the remote function with little effort. Think of remote delegates as a C++ standard compliant remote procedure call (RPC) implemented using C++ delegates.
The features of the remote delegate library are:
- Remote Invocation – remotely invoke any callable function (up to 5 arguments)
- Any Protocol – supports any transmission medium: UDP, TCP, serial, named pipes
- Any Serialization – support for object any serialization method: binary, JSON, XML
- Endianness – handles different CPU architectures
- Any Compiler – standard C++ code for any compiler without weird hacks
- Any OS – easy porting to any OS. Win32, POSIX and std::thread ports included
- Any Function – invoke any callable function: member, static, or free
- Any Argument Type – supports any argument type: value, reference, pointer, pointer to pointer
- Multiple Arguments – supports multiple function arguments
The remote delegate implementation significantly eases passing data and objects between remote applications. A small amount of platform-specific code is written to tailor the library to a particular OS and communication protocol. After which, the framework handles all of the low-level machinery to safely invoke any function signature on a remote system.
The original asynchronous delegate implementation strived to ease inter-thread communication by invoking functions and passing data between threads using C++ delegates. Remote delegates extend the library to include inter-process and inter-processor communications.
The original article Asynchronous Multicast Delegates in C++ covers all synchronous and asynchronous delegate library features. The focus of this article is the new remote delegate enhancements.
Delegates Background
If you’re not familiar with a delegate, the concept is quite simple. A delegate can be thought of as a super function pointer. In C++, there's no pointer type capable of pointing to all the possible function variations: instance member, virtual, const, static, and free (global). A function pointer can’t point to instance member functions, and pointers to member functions have all sorts of limitations. However, delegate classes can, in a type-safe way point to any function provided the function signature matches. In short, a delegate points to any function with a matching signature to support anonymous function invocation.
This C++ delegate implementation is full featured and allows calling any function, even instance member functions, with any arguments either synchronously or asynchronously. The delegate library makes binding to and invoking any function a snap. The addition of remote delegates extends the delegate paradigm to include invoking functions executing within a separate context.
Using the Code
I’ll first present how to use the code, and then get into the implementation details.
The core delegate library is supported by any C++03 or higher compiler. However, all remote delegate examples are written using some C++11 features. Visual Studio/Win32 and Eclipse/POSIX projects and examples are included.
The delegate library is comprised of delegates and delegate containers. A delegate is capable of binding to a single callable function. A multicast delegate container holds one or more delegates in a list to be invoked sequentially. A single cast delegate container holds at most one delegate.
The new remote delegate classes are show below, where X is the number of function arguments in the target function signature.
DelegateRemoteSendX<>
DelegateFreeRemoteRecvX<>
DelegateMemberRemoteRecvX<>
DelegateRemoteSendX<>
initiates invoking a remote function executing on a remote system. The sending system creates this object.
DelegateFreeRemoteRecvX<>
synchronously invokes a local free callback function located on the receiving remote system.
DelegateMemberRemoteRecvX<>
synchronously invokes a local member callback function located on the receiving remote system.
The remote delegates are capable of being inserted into any existing delegate container.
SinglecastDelegateX<>
MulticastDelegateX<>
MulticastDelegateSafeX<>
Send Data Example
The send application shows how to invoke a remote function with a single RemoteDataPoint&
argument. The key points on the code below are:
MakeDelegate()
creates a send remote delegate. - Invoke the send remote delegate.
int main(void)
{
BOOL result = AfxWinInit(GetModuleHandle(NULL), NULL, ::GetCommandLine(), 0);
ASSERT_TRUE(result == TRUE);
result = AfxSocketInit(NULL);
ASSERT_TRUE(result == TRUE);
UdpDelegateSend::GetInstance().Initialize();
stringstream ss(ios::in | ios::out | ios::binary);
auto sendDataPointDelegate =
MakeDelegate<RemoteDataPoint&>(UdpDelegateSend::GetInstance(), ss, REMOTE_DATA_POINT_ID);
cout << "Press any key to exit program." << endl;
int x = 1;
int y = 1;
while (!_kbhit())
{
RemoteDataPoint dataPoint(x++, y++);
sendDataPointDelegate(dataPoint);
}
return 0;
}
MakeDelegate()
is an overloaded function that helps create delegate objects. Normally MakeDelegate()
uses template argument deduction to create the correct instance type based on the arguments. However, a sending delegate doesn’t bind to a function; the bound function is on the remote. Therefore, when creating a send delegate the function is called using the template function argument types.
auto sendDataPointDelegate =
MakeDelegate<RemoteDataPoint&>(UdpDelegateSend::GetInstance(), ss, REMOTE_DATA_POINT_ID);
The first argument is the transport object. The second argument is outgoing data byte stream. The last argument is a shared ID between the sender and receiver.
The sender invokes the remote delegate using the correct function arguments. The RemoteDataPoint
object is serialized and a message sent to the receiver.
RemoteDataPoint dataPoint(x++, y++);
sendDataPointDelegate(dataPoint);
Receive Delegate Example
The receive application shows how to register for a remote delegate callback. The key points on the code below:
MakeDelegate()
creates a receive remote delegate. RecvDataPointCb()
is called when the sender invokes the delegate.
static void RecvDataPointCb(RemoteDataPoint& data)
{
cout << "RemoteDataPoint: " << data.GetX() << " " << data.GetY() << endl;
}
int main(void)
{
BOOL result = AfxWinInit(GetModuleHandle(NULL), NULL, ::GetCommandLine(), 0);
ASSERT_TRUE(result == TRUE);
result = AfxSocketInit(NULL);
ASSERT_TRUE(result == TRUE);
UdpDelegateRecv::GetInstance().Initialize();
auto recvDataPointDelegate = MakeDelegate(&RecvDataPointCb, REMOTE_DATA_POINT_ID);
cout << "Press any key to exit program." << endl;
while (!_kbhit())
Sleep(10);
return 0;
}
The receiver creates a delegate using the same ID as the sender.
auto recvDataPointDelegate = MakeDelegate(&RecvDataPointCb, REMOTE_DATA_POINT_ID);
The first argument is a pointer to the callback function. The second argument is the shared ID. The delegate library receives the message, deserialized the ReceiveDataPoint
object, and invokes the callback function.
Of course, the delegate library supports member function callbacks, in addition to multiple arguments and different argument types.
SysData Example
The remote delegates can be inserted into a delegate container, just like any other delegate type. The types of delegates are summarized below:
- Synchronous
- Asynchronous
- Asynchronous Blocking
- Remote
This example shows how to register for notification using each delegate type. SysData
is a simple class that stores the system mode and notifies clients when changed.
class SysData
{
public:
static MulticastDelegateSafe1<const SystemModeChanged&> SystemModeChangedDelegate;
static SysData& GetInstance();
void SetSystemMode(SystemMode::Type systemMode);
private:
SysData();
~SysData();
SystemMode::Type m_systemMode;
LOCK m_lock;
};
The SystemModeChangedDelegate
container is used by subscribers to register. The function callback signature is void (const SystemModeChanged&)
.
static MulticastDelegateSafe1<const SystemModeChanged&> SystemModeChangedDelegate;
The SystemModeChanged
object is the callback argument type.
class SystemModeChanged
{
public:
SystemModeChanged() :
PreviousSystemMode(SystemMode::STARTING),
CurrentSystemMode(SystemMode::STARTING)
{
}
SystemMode::Type PreviousSystemMode;
SystemMode::Type CurrentSystemMode;
friend std::ostream& operator<< (std::ostream &out, const SystemModeChanged& data);
friend std::istream& operator>> (std::istream &in, const SystemModeChanged& data);
};
When someone calls SysData::SetSystemMode()
the new mode is saved and all registered subscribers are notified.
void SysData::SetSystemMode(SystemMode::Type systemMode)
{
LockGuard lockGuard(&m_lock);
SystemModeChanged callbackData;
callbackData.PreviousSystemMode = m_systemMode;
callbackData.CurrentSystemMode = systemMode;
m_systemMode = systemMode;
if (SystemModeChangedDelegate)
SystemModeChangedDelegate(callbackData);
}
TestSysData()
registers four callbacks with SysData::SystemModeChangedDelegate
. Each callback represents a different notification type: remote, synchronous, asynchronous and asynchronous blocking.
Notice that SysData
just exposes a generic delegate container. Each anonymous client decides on the how to be notified. Also note how the arguments used in calling MakeDelegate()
dictate the delegate type created.
static void SystemModeChangedCb(const SystemModeChanged& data)
{
cout << "SystemModeChangedCb: " << data.CurrentSystemMode << " " <<
data.PreviousSystemMode << endl;
}
void TestSysData()
{
sysDataWorkerThread.CreateThread();
stringstream ss(ios::in | ios::out | ios::binary);
SysData::SystemModeChangedDelegate +=
MakeDelegate<const SystemModeChanged&>(UdpDelegateSend::GetInstance(),
ss, REMOTE_SYSTEM_MODE_CHANGED_ID);
auto recvDataPointDelegate =
MakeDelegate(&SystemModeChangedCb, REMOTE_SYSTEM_MODE_CHANGED_ID);
SysData::SystemModeChangedDelegate += MakeDelegate(&SystemModeChangedCb);
SysData::SystemModeChangedDelegate +=
MakeDelegate(&SystemModeChangedCb, &sysDataWorkerThread);
SysData::SystemModeChangedDelegate +=
MakeDelegate(&SystemModeChangedCb, &sysDataWorkerThread, 5000);
SysData::GetInstance().SetSystemMode(SystemMode::STARTING);
SysData::GetInstance().SetSystemMode(SystemMode::NORMAL);
std::this_thread::sleep_for(std::chrono::seconds(1));
SysData::SystemModeChangedDelegate -= MakeDelegate(&SystemModeChangedCb);
SysData::SystemModeChangedDelegate -=
MakeDelegate(&SystemModeChangedCb, &sysDataWorkerThread);
SysData::SystemModeChangedDelegate -=
MakeDelegate(&SystemModeChangedCb, &sysDataWorkerThread, 5000);
SysData::SystemModeChangedDelegate -=
MakeDelegate<const SystemModeChanged&>
(UdpDelegateSend::GetInstance(), ss, REMOTE_SYSTEM_MODE_CHANGED_ID);
sysDataWorkerThread.ExitThread();
}
Porting Details
The remote delegate library is abstracted from object serialization and the communication protocol. Generic interfaces are used by the library to perform these actions.
Serialization
Each user-defined data type sent as a remote delegate argument must:
- Implement a default constructor.
- Overload the insertion and extraction operators to serialize the object (i.e.,
operator<<
and operator>>
).
The example below binary serializes (i.e., inserts/extracts) a RemoteDataPoint&
object.
struct RemoteDataPoint
{
public:
RemoteDataPoint(int x, int y) : m_x(x), m_y(y) {}
RemoteDataPoint() : m_x(0), m_y(0) {}
int GetX() const { return m_x; }
int GetY() const { return m_y; }
private:
int m_y;
int m_x;
friend std::ostream& operator<< (std::ostream &out, const RemoteDataPoint& data);
friend std::istream& operator>> (std::istream &in, RemoteDataPoint& data);
};
std::ostream& operator<< (std::ostream &out, const RemoteDataPoint& data)
{
out << data.m_x << std::endl;
out << data.m_y << std::endl;
return out;
}
std::istream& operator>> (std::istream &in, RemoteDataPoint& data)
{
in >> data.m_x;
in >> data.m_y;
return in;
}
A similar RemoteDataPointJson
object is serialized with RapidJSON.
std::ostream& operator<< (std::ostream &out, const RemoteDataPointJson& data)
{
StringBuffer sb;
PrettyWriter<StringBuffer> writer(sb);
data.Serialize(writer);
out << sb.GetLength() + 1;
out << sb.GetString();
return out;
}
std::istream& operator>> (std::istream &in, RemoteDataPointJson& data)
{
size_t bufLen = 0;
in >> bufLen;
char* buf = (char*)malloc(bufLen);
in.rdbuf()->sgetn(buf, bufLen);
Document d;
d.Parse(buf);
data.m_x = d["m_x"].GetInt();
data.m_y = d["m_y"].GetInt();
free(buf);
return in;
}
The serialization and deserialization method employed is up to you. The only requirement is that an input or output stream is used to hold the serialized object.
A std::stringstream
is used in the examples. But any class deriving from std::iostream
can be utilized.
Transport
The remote delegate library uses the IDelegateTransport
class to send data to the remote.
class IDelegateTransport
{
public:
virtual void DispatchDelegate(std::iostream& s) = 0;
};
A sender inherits from IDelegateTransport
and implements DispatchDelegate()
. The UDP implementation is shown below:
void UdpDelegateSend::DispatchDelegate(std::iostream& s)
{
size_t len = (size_t)s.tellp();
char* sendBuf = (char*)malloc(len);
s.rdbuf()->sgetn(sendBuf, len);
int result = m_sendSocket.Send((void*)sendBuf, len, 0);
ASSERT_TRUE(result == len);
free(sendBuf);
s.seekg(0);
s.seekp(0);
}
Similarly, a Windows named pipe sends remote delegates between processes.
void PipeDelegateSend::DispatchDelegate(std::iostream& s)
{
size_t len = (size_t)s.tellp();
char* sendBuf = (char*)malloc(len);
s.rdbuf()->sgetn(sendBuf, len);
DWORD sentLen = 0;
BOOL success = WriteFile(m_hPipe, sendBuf, len, &sentLen, NULL);
ASSERT_TRUE(success && sentLen == len);
free(sendBuf);
s.seekg(0);
s.seekp(0);
}
A receiver thread obtains data from a UDP socket. The only requirement is that DelegateRemoteInvoker::Invoke()
is called with the incoming stream of data.
unsigned long UdpDelegateRecv::Process(void* parameter)
{
MSG msg;
const int BUF_SIZE = 1024;
char recvBuf[BUF_SIZE];
SOCKADDR_IN addr;
int addrLen = sizeof(addr);
BOOL success = AfxSocketInit(NULL);
ASSERT_TRUE(success == TRUE);
success = m_recvSocket.Create(514, SOCK_DGRAM, NULL);
ASSERT_TRUE(success);
m_started = true;
for (;;)
{
if (PeekMessage(&msg, NULL, WM_USER_BEGIN, WM_USER_END, PM_REMOVE) != 0)
{
switch (msg.message)
{
case WM_EXIT_THREAD:
m_recvSocket.Close();
return 0;
}
}
int recvMsgSize = m_recvSocket.Receive(recvBuf, BUF_SIZE, 0);
if (recvMsgSize > 0)
{
stringstream ss(ios::in | ios::out | ios::binary);
ss.write(recvBuf, recvMsgSize);
DelegateRemoteInvoker::Invoke(ss);
}
else
{
Sleep(100);
}
}
return 0;
}
A named pipe is implemented similarly. Notice the same DelegateRemoteInvoker::Invoke()
function is called, only in this case the data bytes are obtained from a named pipe and not a UDP socket.
unsigned long PipeDelegateRecv::Process(void* parameter)
{
MSG msg;
BOOL connected = FALSE;
char recvBuf[BUF_SIZE];
for (;;)
{
if (PeekMessage(&msg, NULL, WM_USER_BEGIN, WM_USER_END, PM_REMOVE) != 0)
{
switch (msg.message)
{
case WM_EXIT_THREAD:
CloseHandle(m_hPipe);
return 0;
}
}
if (!connected)
{
connected = ConnectNamedPipe(m_hPipe, NULL) ?
TRUE : (GetLastError() == ERROR_PIPE_CONNECTED);
}
else
{
DWORD recvMsgSize = 0;
BOOL success = ReadFile(m_hPipe, recvBuf, BUF_SIZE, &recvMsgSize, NULL);
if (success && recvMsgSize > 0)
{
stringstream ss(ios::in | ios::out | ios::binary);
ss.write(recvBuf, recvMsgSize);
DelegateRemoteInvoker::Invoke(ss);
}
else
{
Sleep(100);
}
}
}
return 0;
}
POSIX UdpDelegateSend
and UdpDelegateRecv
examples are available in the attached source code.
The DelegateRemoteInvoker()
library function simply looks up the matching receive delegate instance by ID and calls DelegateInvoke()
.
bool DelegateRemoteInvoker::Invoke(std::istream& s)
{
DelegateIdType id;
s >> id;
s.seekg(0);
std::map<DelegateIdType, DelegateRemoteInvoker*>::iterator it;
{
LockGuard lockGuard(GetLock());
it = GetMap().find(id);
}
if (it != GetMap().end())
{
(*it).second->DelegateInvoke(s);
return true;
}
else
{
return false;
}
}
The transport mechanism is completely user defined. Any communication medium is supported using a small amount of platform specific code.
Remote Delegate Details
DelegateRemoteSend1<>
implements a single parameter send remote delegate.
template <class Param1>
class DelegateRemoteSend1 : public Delegate1<Param1> {
public:
DelegateRemoteSend1(IDelegateTransport& transport, std::iostream& stream, DelegateIdType id) :
m_transport(transport), m_stream(stream), m_id(id) { }
virtual DelegateRemoteSend1* Clone() const { return new DelegateRemoteSend1(*this); }
virtual void operator()(Param1 p1) {
m_stream << m_id << std::ends;
m_stream << p1 << std::ends;
m_transport.DispatchDelegate(m_stream);
}
virtual bool operator==(const DelegateBase& rhs) const {
const DelegateRemoteSend1<Param1>* derivedRhs = dynamic_cast<const DelegateRemoteSend1<Param1>*>(&rhs);
return derivedRhs &&
m_id == derivedRhs->m_id &&
&m_transport == &derivedRhs->m_transport; }
private:
IDelegateTransport& m_transport; std::iostream& m_stream; DelegateIdType m_id; };
The constructor requires a IDelegateTransport&
, a std::iostream&
and a DelegateIdType
.
The operator()
inserts m_id
into the stream then calls operator<<
on each parameter to serialize. After which, DispatchDelegate()
is called to send the delegate the remote system.
DelegateFreeRemoteRecv1<>
implements a single parameter receive remote delegate.
template <class Param1>
class DelegateFreeRemoteRecv1 : public DelegateFree1<Param1>, public DelegateRemoteInvoker {
public:
typedef void(*FreeFunc)(Param1);
DelegateFreeRemoteRecv1(FreeFunc func, DelegateIdType id) : DelegateRemoteInvoker(id) { Bind(func, id); }
void Bind(FreeFunc func, DelegateIdType id) {
m_id = id;
DelegateFree1<Param1>::Bind(func);
}
virtual DelegateFreeRemoteRecv1* Clone() const { return new DelegateFreeRemoteRecv1(*this); }
virtual void DelegateInvoke(std::istream& stream) {
RemoteParam<Param1> param1;
Param1 p1 = param1.Get();
stream >> m_id;
stream.seekg(stream.tellg() + std::streampos(1));
stream >> p1;
stream.seekg(stream.tellg() + std::streampos(1));
DelegateFree1<Param1>::operator()(p1);
}
virtual bool operator==(const DelegateBase& rhs) const {
const DelegateFreeRemoteRecv1<Param1>* derivedRhs = dynamic_cast<const DelegateFreeRemoteRecv1<Param1>*>(&rhs);
return derivedRhs &&
m_id == derivedRhs->m_id &&
DelegateFree1<Param1>::operator == (rhs);
}
private:
DelegateIdType m_id; };
The constructor takes a FreeFunc
function pointer and a DelegateIdType
.
DelegateRemoteInvoker::Invoke()
calls DelegateInvoke()
with the incoming data stream. Each function parameter is deserialized using operator<<
and the FreeFunc
target function is called.
The member function delegate variants are implemented similarly.
Asynchronous Delegate Details
Asynchronous delegates are also part of delegate library. The porting details for those features are covered within the Asynchronous Multicast Delegates in C++ article.
Source Code
The attached source code contains the entire delegate library and numerous examples for Visual Studio and Eclipse.
Visual Studio
The Visual Studio/Win32 examples have three separate projects:
- RemoteDelegate – many samples where the remote sender and receiver are executing within the same application.
- RemoteDelegeteSend – a UDP remote delegate sending console application.
- RemoteDelegateRecv – a UDP receiving console application that receives from
RemoteDelegateSend
.
To run the JSON examples, you’ll need to:
- Clone the
RapidJSON
library into a rapidjson directory within your RemoteDelegate source directory.
https://github.com/Tencent/rapidjson/
- Define RAPID_JSON in the C/C++ > Preprocessor > Preprocessor Definitions within Visual Studio.
Eclipse
The Eclipse/POSIX examples are imported into a workspace using File > Import... > Existing Projects into Workspace.
To run the JSON examples, you’ll need to:
- Clone the
RapidJSON
library into a rapidjson directory within your RemoteDelegate source directory.
https://github.com/Tencent/rapidjson/
- Define RAPID_JSON in Properties > C/C++ General > Paths and Symbols > Symbols tab.
Conclusion
I’ve been using the C++ delegate library on a few different projects now. The ease at which functions and data can be moved between threads has really changed how I create multi-threaded applications.
The remote delegate enhancements extend library by allowing remote notification. The library eases application development by passing data between remote systems using a simple delegate mechanism.
References
History
- 15th March, 2020
- 18th March, 2020
- Added Eclipse/POSIX examples.
- 19th March, 2020
- Fixed built-in data type handling.
-