Click here to Skip to main content
13,828,551 members
Click here to Skip to main content
Add your own
alternative version

Tagged as

Stats

10.6K views
11 bookmarked
Posted 14 May 2015
Licenced CPOL

Revisiting the Active Object Pattern - with C++11 Closures

, 14 May 2015
Rate this:
Please Sign up or sign in to vote.
Revisiting the Active Object Pattern - with C++11 Closures

I have a confession: with the never ending things going on (you know: life ;-) I missed the (fairly) recent changes in the C++ language. C++ was my first OO language and it probably remains my favorite. I can't help but love the mix of high level abstractions with metal grinding pointer arithmetic - it's like a cool sports car with manual transmission. Beauty and power. You have to be more alert, more involved. That's part of the fun - you are taking control. But for some time now, C++ felt old, tired, disconnected from the endless stream of new languages. Until C++11 came along.

Wikipedia describes C++11 as follows:

C++11 (formerly known as C++0x) is the most recent version of the standard of the C++ programming language. It was approved by ISO on 12 August 2011. C++11 includes several additions to the core language and extends the C++ standard library, incorporating most of the C++ Technical Report 1 (TR1) libraries.

Bjarne Stroustrup wrote that:

Surprisingly, C++11 feels like a new language: The pieces just fit together better than they used to and I find a higher-level style of programming more natural than before and as efficient as ever.

And it does feel like a new language. And this is exciting for geeks like me. In this blog post, I discuss how I implemented Schmidt's Active Object pattern in a novel way using C++11 Closures.

First, another confession: for a long time I've suffered from Node envy. Node.js envy, to be precise. Look at this "Hello World" JavaScript code:

var http = require("http");

http.createServer(function(request, response) {
    response.writeHead(200, {"Content-Type": "text/plain"});
    response.write("Hello World");
    response.end();
}).listen(8888);

What I "envy" is not the use of asynchronous I/O operation with callbacks ("the callback pattern"), but the compelling beauty of Lambda functions. Lambda functions simplify asynchronous programming because they allow us to write code that is seemingly synchronous. The code that is executed by the lambda function is temporally disjointed from the code that precedes it, and yet both parts are spatially co-located. And the outcome is smaller, tighter code that feels more natural and is easier to read and maintain. And this can be done in C++11.

I won't discuss C++11 lambda functions because others have done this better than I can. This article is an example of some of the great coverage you can find on the net (Alex Allain has lots of interesting material to read). But I do want to touch on the difference between Lambda functions and Closures, since my implementation below uses Closures. Lambda functions are anonymous functions that don't need to be bound to a name and can be specified as lambda expressions. A Closure is an example of a lambda function which "closes" over the environment in which it was specified (meaning that it can access the variables available in the referencing environment). Alex Allain's article (which I referenced above) doesn't make a big distinction between lambdas and closures and simply treats closures as lambdas with "variable capture". Syntactically in C++ lambdas and closures are almost identical, so the distinction is there and it is slight, yet I think it is important to note the semantic difference.

On to Active Object.

Douglas Schmidt describes the Active Object design pattern in Pattern Oriented Software Architecture (Volume 2: Patterns for Concurrent and Networked Objects):

 

The Active Object design pattern decouples method execution from method invocation to enhance concurrency and simplify synchronized access to objects that reside in their own threads of control.

Once again, I don't want to paraphrase the work of others, so I assume that you are knowledgeable about the details of the Active Object pattern. If not, you should probably familiarize yourself with the pattern before reading on.

To illustrate my ideas, I will only concentrate on one variation of the Active Object pattern. In this variation, the Client and Proxy are "folded" into the same object and the Scheduler and ActivationList implement a simple message queue policy (this is reminiscent of Schmidt's original AO paper, which he later expanded on). I think this is probably the most prevalent variation of the pattern - in which we want to serialize access to an object, and use an in-order queue (FIFO) to "bounce" the method invocation from one thread to another.
Let's look at the example code from the Wikipedia entry on Active Object. The Wikipedia code is implemented in Java and I went and implemented it using C++11. I placed the comments in the code to explain the logic.

//
// Code to illustrate a simple yet powerful ActiveObject
// implementation in C++11.
// Based on the contrived example in:
//              http://en.wikipedia.org/wiki/Active_object
//
#include "stdafx.h"
#include <thread>
#include <queue>
#include <mutex>
#include <future>
 
typedef std::function<void()> Operation;
 
class OriginalClass {
private:
    double val;
public:
    OriginalClass() : val(0) {}
    void doSomething() {
        val = 1.0;
    }
 
    void doSomethingElse() {
        val = 2.0;
    }
};
 
// I need to add a thread-safe FIFO.
// We use the FIFO to serialize access to the Active Object.
class DispatchQueue {
    std::mutex qlock;
    std::queue<Operation> ops_queue;
    std::condition_variable empty;
public:
    void put(Operation op) {
        std::lock_guard<std::mutex> guard(qlock);
        ops_queue.push(op);
        empty.notify_one();
    }
 
    Operation take() {
        std::unique_lock<std::mutex> lock(qlock);
        empty.wait(lock, [&]{ return !ops_queue.empty(); });
 
        Operation op = ops_queue.front();
        ops_queue.pop();
        return op;
    }
};
 
class BecomeActiveObject
{
private:
    double val;
    DispatchQueue dispatchQueue;
    std::atomic<bool> done;
    std::unique_ptr<std::thread> runnable;
public:
    BecomeActiveObject() : val(0), done(false) { 
        // Start the thread that this object will "occupy".
        // The public methods of this object will run in the 
        // context of the new thread
        runnable = std::make_unique<std::thread>(&BecomeActiveObject::run, this);
    }
    ~BecomeActiveObject() { runnable->join(); }
 
    void run() {
        while (!done) {
            dispatchQueue.take()();
        }
    }
    
    // This is part of the public interface of this class.
    // Each method is composed of the actual code we want to execute,
    // and veneer code which queues our code.
    void doSomething() 
    {
        // This is the actual code to execute.
        // Store it in an std::function and push it to the FIFO
        auto runnable = [&]() {
            val = 1.0;
        };
 
        dispatchQueue.put(runnable);
    }
 
    void doSomethingElse() 
    {
        // This is a more succint syntax, but it may also be
        // harder to read for some.
        dispatchQueue.put(( [&]() {
            val = 2.0;
        }
        ));
    }
    
};
 
int main(int argc, char **argv) {
    BecomeActiveObject active;
    active.doSomething();
    active.doSomethingElse();
    // In a blink of an eye object 'active' will be destroyed.
    // It may be that the two methods above have not executed yet,
    // so I placed a thread::join in the d'tor of BecomeActiveObject.
    // Not the prettiest piece of code, but it's good enough for this example.
    return 0;
}

The more "traditional" method of implementing ActiveObject in C++ involves defining two sets of interfaces: a public interface and a private interface. Every method in the public interface also appears in the private interface. The public interface is used by clients to invoke methods on the object, and they create a message indicating the request and its parameters and enqueue the message. The private interface is used by the dispatcher which dequeues messages and invokes the private method. This works well enough but creates big classes that have a lot of extraneous code that is there just to get all this mechanics to work. Every change to the interface requires a series of changes (public and private interface; message definition).

#include "stdafx.h"
#include <thread>
#include <queue>
#include <mutex>
#include <future>
#include <string>
#include <iostream>
 
// This is the same DispatchQueue class as before, except
// that now it is parameterized with an operation type
template<typename T>
class DispatchQueue {
    std::mutex qlock;
    std::queue<T> ops_queue;
    std::condition_variable empty;
public:
    void put(T op) {
        std::lock_guard<std::mutex> guard(qlock);
        ops_queue.push(op);
        empty.notify_one();
    }
 
    T take() {
        std::unique_lock<std::mutex> lock(qlock);
        empty.wait(lock, [&]{ return !ops_queue.empty(); });
 
        T op = ops_queue.front();
        ops_queue.pop();
        return op;
    }
};
 
class BecomeActiveObject
{
private:
    enum {
        id_doSomething,
        id_doSomethingElse
    };
 
    // Same as before
    struct MessageInt {
        int val;
    };
    struct MessageStr {
        char* val;
    };
 
    // union of all message data
    struct MessageData {
        size_t id;
        union {
            MessageInt intVal;
            MessageStr strVal;
        };
    };
 
    DispatchQueue<MessageData> dispatchQueue;
    std::atomic<bool> done;
    std::unique_ptr<std::thread> runnable;
 
    void __doSomething(int a)
    {
        std::cout << "In __doSomething(" << a << ")" << std::endl;
    }
 
    void __doSomethingElse(const std::string &a)
    {
        std::cout << "In __doSomethingElse(" << a << ")" << std::endl;
    }
 
public:
    BecomeActiveObject() : done(false) {
        runnable = std::make_unique<std::thread>(&BecomeActiveObject::run, this);
    }
    ~BecomeActiveObject() { runnable->join(); }
 
    void run() {
        while (!done) {
            MessageData msg = dispatchQueue.take();
            switch (msg.id) {
            case id_doSomething:
                __doSomething(msg.intVal.val);
                break;
            case id_doSomethingElse:
                __doSomethingElse(msg.strVal.val);
                delete msg.strVal.val;
                break;
            default:
                // error!
                break;
            }
        }
    }
 
    void doSomething(int a)
    {
        MessageData data;
        data.intVal.val = a;
        data.id = id_doSomething;
        dispatchQueue.put(data);
    }
 
    void doSomethingElse(const std::string &a)
    {
        MessageData data;
        data.strVal.val = _strdup(a.c_str());
        data.id = id_doSomethingElse;
        dispatchQueue.put(data);
    }
 
};
 
int main(int argc, char **argv) {
    BecomeActiveObject active;
    active.doSomething(5);
    active.doSomethingElse("HelloWorld");
    return 0;
}

A somewhat more sophisticated implementation uses functors. We no longer need the code which does the switching on the message type when we grab a message from the FIFO and dispatch it. But the sophistication of the code probably only adds a layer of obfuscation if you are not familiar with the underlying idiom. We gain too little from this to be worthwhile.

//
// Functor implementation of ActiveObject
//
#include "stdafx.h"
#include <thread>
#include <queue>
#include <mutex>
#include <future>
#include <string>
#include <iostream>
 
// This is the same DispatchQueue class as before, except
// that now it is parameterized with an operation type
template<typename T>
class DispatchQueue {
    std::mutex qlock;
    std::queue<T> ops_queue;
    std::condition_variable empty;
public:
    void put(T op) {
        std::lock_guard<std::mutex> guard(qlock);
        ops_queue.push(op);
        empty.notify_one();
    }
 
    T take() {
        std::unique_lock<std::mutex> lock(qlock);
        empty.wait(lock, [&]{ return !ops_queue.empty(); });
 
        T op = ops_queue.front();
        ops_queue.pop();
        return op;
    }
};
 
 
// Message functor interface
struct IMessage {
    virtual int execute() = 0;
};
 
 
// Message functor template
template <class TARGET, class METHOD, class PARAMS>
struct Message : public IMessage {
    Message(TARGET *target, METHOD handlerMethod, PARAMS params) : target(target),
    handlerMethod(handlerMethod),
    params(params) {}
 
    int execute();
private:
    TARGET *target;
    METHOD handlerMethod;
    PARAMS params;
};
 
template <class TARGET, class METHOD, class PARAMS>
int Message<TARGET, METHOD, PARAMS>::execute() {
    (target->*handlerMethod)(params);
    return 1;
};
 
class BecomeActiveObject
{
private:
    DispatchQueue<IMessage*> dispatchQueue;
    std::atomic<bool> done;
    std::thread runnable;
 
    // Same as before
    struct MessageInt {
        int val;
    };
    struct MessageStr {
        char* val;
    };
 
    // union of all message data
    union MessageData {
        MessageInt intVal;
        MessageStr strVal;
    };
 
    // message handler callback prototype
    typedef void (BecomeActiveObject::*MsgHandler)(MessageData msgData);
 
    // Active Object message type
    typedef Message<BecomeActiveObject, MsgHandler, MessageData> AoMsg;
 
    void __doSomething(MessageData data)
    {
        std::cout << "In __doSomething(" << data.intVal.val << ")" << std::endl;
    }
 
    void __doSomethingElse(MessageData data)
    {
        std::cout << "In __doSomethingElse(" << data.strVal.val << ")" << std::endl;
        delete data.strVal.val;
    }
 
public:
    BecomeActiveObject() : done(false) {
        runnable = std::thread(&BecomeActiveObject::run, this);
    }
    ~BecomeActiveObject() { runnable.join(); }
 
    void run() {
        while (!done) {
            IMessage* msg = dispatchQueue.take();
            msg->execute();
            delete msg;
        }
    }
 
    void doSomething(int a)
    {
        MessageData data;
        data.intVal.val = a;
        AoMsg* m = new AoMsg(this, &BecomeActiveObject::__doSomething, data);
        dispatchQueue.put(m);
    }
 
    void doSomethingElse(const std::string &a)
    {
        MessageData data;
        data.strVal.val = _strdup(a.c_str());
        AoMsg* m = new AoMsg(this, &BecomeActiveObject::__doSomethingElse, data);
        dispatchQueue.put(m);
    }
 
};
 
int main(int argc, char **argv) {
    BecomeActiveObject active;
    active.doSomething(5);
    active.doSomethingElse("HelloWorld");
    return 0;
}

Now let's come full circle and return to the Closure implementation of ActiveObject and add a few of features to it.

//
// This version of the ActiveObject illustrates more advanced
// features such as calling public methods with references,
// calling methods with return values, and accessing the object's
// internal state.
//
#include "stdafx.h"
#include <thread>
#include <queue>
#include <mutex>
#include <future>
#include <iostream>
 
typedef std::function<void()> Operation;
 
class DispatchQueue {
    std::mutex qlock;
    std::queue<Operation> ops_queue;
    std::condition_variable empty;
public:
    void put(Operation op) {
        std::lock_guard<std::mutex> guard(qlock);
        ops_queue.push(op);
        empty.notify_one();
    }
 
    Operation take() {
        std::unique_lock<std::mutex> lock(qlock);
        empty.wait(lock, [&]{ return !ops_queue.empty(); });
 
        Operation op = ops_queue.front();
        ops_queue.pop();
        return op;
    }
};
 
class BecomeActiveObject
{
private:
    double val;
    DispatchQueue dispatchQueue;
    std::atomic<bool> done;
    std::thread runnable;
public:
    BecomeActiveObject() : val(0), done(false) {
        runnable = std::thread([=]{ run(); });
    }
    ~BecomeActiveObject() { 
        // Schedule a No-Op runnable to flush the dispatch queue
        dispatchQueue.put([&]() { done = true; });
        runnable.join();
    }
 
    double getVal()  { return val; }
 
    void run() {
        while (!done) {
            dispatchQueue.take()();
        }
    }
 
    // This method returns a value, so it is blocking on the future result
    int doSomething()
    {
        std::promise<int> return_val;
        auto runnable = [&]() {
            int ret = 999;
            return_val.set_value(ret);
        };
 
        dispatchQueue.put(runnable);
        return return_val.get_future().get();
    }
 
    // This method accesses the object's internal state from within the closure
    // Because the access to the ActiveObject is serialized, we can safely access 
    // the object's internal state.
    void doSomethingElse()
    {
        dispatchQueue.put(([this]() {
            this->val = 2.0;
        }
        ));
    }
 
    // This method takes two params which we want to reference in the closure
    void doSomethingWithParams(int a, int b) {
        // This lambda function code will execute later from the context of a different thread, 
        // but the integers {a, b} are bound now.
        // This is a beautiful way to write clear code
        dispatchQueue.put(([a,b]() {
            std::cout << "this is the internal implementation of doSomethingWithParams(";
            std::cout << a << "," << b << ")\n";
        }
        ));
    }
 
    // This method takes two reference parameters so it must execute blocking
    void doSomethingWithReferenceParams(int &a, int &b) {
        std::promise<void> return_val;
        // This lambda function code will execute later from the context of a different thread, 
        // but the integers {a, b} are bound now.
        // This is a beautiful way to write clear code
        dispatchQueue.put(([&a, &b, &return_val]() {
            std::cout << "this is the internal implementation of doSomethingWithReferenceParams(";
            std::cout << a << "," << b << ")\n";
            a = 1234;
            b = 5678;
            return_val.set_value();
        }
        ));
 
        return_val.get_future().get();
    }
};
 
 
int main(int argc, char **argv) {
    BecomeActiveObject active;
    int i = active.doSomething();
    assert(i = 999);
    // mix things up by starting another thread
    std::thread t1(&BecomeActiveObject::doSomethingElse, &active);
    active.doSomethingWithParams(5, 7);
    int a=1, b=2;
    active.doSomethingWithReferenceParams(a, b);
    assert(a == 1234 && b == 5678);
    t1.join();
    assert(active.getVal() == 2.0);
    return 0;
}

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Neta777
Israel Israel
No Biography provided

You may also be interested in...

Comments and Discussions

 
QuestionA little bugfix Pin
max-av18-May-15 4:36
membermax-av18-May-15 4:36 
AnswerRe: A little bugfix Pin
Neta77718-May-15 7:52
memberNeta77718-May-15 7:52 
GeneralRe: A little bugfix Pin
Jim Barry20-May-15 1:31
memberJim Barry20-May-15 1:31 
GeneralRe: A little bugfix Pin
Neta77720-May-15 9:55
memberNeta77720-May-15 9:55 
GeneralRe: A little bugfix Pin
Jim Barry21-May-15 4:12
memberJim Barry21-May-15 4:12 
GeneralRe: A little bugfix Pin
Neta77721-May-15 14:45
memberNeta77721-May-15 14:45 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

Permalink | Advertise | Privacy | Cookies | Terms of Use | Mobile
Web03 | 2.8.190114.1 | Last Updated 14 May 2015
Article Copyright 2015 by Neta777
Everything else Copyright © CodeProject, 1999-2019
Layout: fixed | fluid