Click here to Skip to main content
Click here to Skip to main content

Intraprocess Thread Messaging

, 18 May 2010 CPOL
Rate this:
Please Sign up or sign in to vote.
A class library that creates worker threads and the communications structures for messaging between them.

Introduction

This library implements messaging for worker threads within a process. All threads communicate with each other via a single point of access, the message queue or mailbox. Messages are processed serially by the thread. The mailbox ensures messages are queued and not lost until the thread can get around to processing them. This is an asynchronous messaging system, so sending threads don't block. By declaring all the data and code for a thread inside of a class, there are no concurrency issues with the thread's data. I find this approach much cleaner and easier to maintain than sharing objects between threads and having to use lock statements peppered throughout the code with the inevitable deadlocks occurring.

History

In the past, I've used Operating Systems that provided message queues: VRTX-32 and OS/2. For some reason, Windows has never provided such an object.

The diagram above gives a quick overview of how this works:

  • Some external thread sends a message to Thread 1.
  • After completing the processing, Thread 1 sends Results 1 Message off to Thread 2.
  • Thread 2 then performs some processing, and when it completes, it sends a Results 2 Message to Thread 3 and an Ack Message to Thread 1.
  • Thread 3 may just update a database as a result of processing the Results 2 Message.

Internals of the Library

There are a couple of classes in this library, as follows:

  • Message Queue
  • Managed Thread
  • Message Classes
  • Thread Manager

The Message Queue

The message queue, or mailbox, allows any thread to send a message to and the owning thread to receive these messages. The message queue consists of three components: a Queue, a ManualResetEvent, and an object that is used for locking access to the other components.

The message queue has three methods: SendMessage() for any thread to send a message to the thread owning the queue, GetMessage() which is used only by the thread owning the queue and waits forever until a message arrives, and GetMessage(int waitTime) which does the same as GetMessage(), but will return after waitTime milliseconds if no message is received. This would allow your thread to perform some periodic processing. Alternatively, you could create a timer that would send a message to your thread to perform some periodic function.

Implementation

/// <summary>
/// Implements a message queue for a thread.
/// </summary>
internal class MessageQueue
{
    /// <summary>
    /// Queue for FIFO processing of messages
    /// </summary>
    private Queue<BaseMessage> msgQueue;

    /// <summary>
    /// Event signalling to the thread that a message has arrived
    /// </summary>
    private ManualResetEvent messageArrivedEvent;

    /// <summary>
    /// Mutual exclusion of threads to proteced resources
    /// </summary>
    private object mutexObject;

    /// <summary>
    /// Name of the managed threead that owns this message queue
    /// </summary>
    private readonly string threadName;

    /// <summary>
    /// Constructor
    /// </summary>
    public MessageQueue(string threadName)
    {
        msgQueue = new Queue<BaseMessage>();
        messageArrivedEvent = new ManualResetEvent(false);
        mutexObject = new object();
        this.threadName = threadName;
    }

    /// <summary>
    /// Send a message to the thread that owns this message queue
    /// </summary>
    public void SendMessage(BaseMessage msg)
    {
        lock(mutexObject)
        {
            msgQueue.Enqueue(msg);
            messageArrivedEvent.Set();
        }
    }

    /// <summary>
    /// Get a message. If no messages wait indefinitely for one.
    /// </summary>
    public BaseMessage GetMessage()
    {
        BaseMessage msg = null;

        // Only wait for a signal if no messages present
        if(msgQueue.Count == 0)
            messageArrivedEvent.WaitOne();
        lock (mutexObject)
        {
            msg = msgQueue.Dequeue();
            messageArrivedEvent.Reset();
        }

        return msg;
    }

    /// <summary>
    /// Get a message. Only wait for the time out period
    /// before returning, null return value indicates
    /// no messages were received.
    /// </summary>
    public BaseMessage GetMessage(int waitTime)
    {
        BaseMessage msg = null;

        // Don't wait for a signal if a message present
        if (msgQueue.Count > 0 || messageArrivedEvent.WaitOne(waitTime) == true)
        {
            lock (mutexObject)
            {
                msg = msgQueue.Dequeue();
                messageArrivedEvent.Reset();
            }
        }
        return msg;
    }

The Managed Thread

This class wraps the functionality of a thread from the thread pool and a message queue. It implements the IThread interface, which allows manipulating the thread and sending it messages.

This class provides public methods that perform the following:

  • Start the thread running after creating it.
  • Stop the thread.
  • Send messages to the thread.
/// <summary>
/// Interface to a managed thread object
/// </summary>
public interface IThread
{
    /// <summary>
    /// The thread name
    /// </summary>
    string Name { get; }

    /// <summary>
    /// Start this thread running
    /// </summary>
    void StartThread();

    /// <summary>
    /// Stop this thread
    /// </summary>
    void StopThread();

    /// <summary>
    /// Send a message to this thread
    /// </summary>
    void SendMessage(BaseMessage msg);
}

During construction, a reference to a message handling function is provided. This is where you would implement processing of your specific messages.

Once the managed thread starts running, it enters a loop where it waits at the message queue for the next message to process. The stop thread message causes the thread to exit the loop.

/// <summary>
/// Main processing loop for the thread. This one implements both the infinite
/// and timed wait for new messages.
/// </summary>
private void mainLoop(object state)
{
    Thread.CurrentThread.Name = name;
    bool run = true;
    while (run == true)
    {
        // Call the appropriate GetMessage() method
        BaseMessage msg = null;
        if (waitTimeInMilliseconds == -1)
            msg = msgQueue.GetMessage();
        else
            msg = msgQueue.GetMessage(waitTimeInMilliseconds);

        // If a message returned, check its type
        if (msg != null)
        {
            switch (msg.MsgType)
            {
                // Stop message, clear the flag so we exit the loop
                // and end the thread
                case MessageType.StopThread:
                    run = false;
                    break;
                default:
                    // Call the user's supplied message handler
                    messageHandler(msg);
                    break;
            }
        }
        else
            // If we get here the caller has opted for time outs
            messageHandler(msg);
    }
}

Message Classes

All useful messages inherit from BaseMessage. This class defines the specific message types you want for your design.

/// <summary>
/// The different messages the managed threads send to each other.
/// Add your own definitions here as needed.
/// </summary>
public enum MessageType
{
    StopThread,     // Default one to have each thread exit its main processing loop

    // Add your message definitions here
    Message1,
    Message2A,
    Message2B
}

/// <summary>
/// Base for all specific messages
/// </summary>
public abstract partial class BaseMessage
{
    /// <summary>
    /// The specific message being instantiated
    /// </summary>
    public readonly MessageType MsgType;

    /// <summary>
    /// Constructor
    /// </summary>
    public BaseMessage(MessageType msgType)
    {
        this.MsgType = msgType;
    }
}

For this example program, I've created three specific message classes.

This is the message passed around for the loop test:

/// <summary>
/// Example user defined message class for the loop test
/// </summary>
public class Message1 : BaseMessage
{
    /// <summary>
    /// Each thread in the chain will sleep this long before passing the message on
    /// </summary>
    public readonly int DelayInMilliSeconds;

    /// <summary>
    /// Constructor
    /// </summary>
    public Message1(int delayInMilliSeconds)
        : base(MessageType.Message1)
    {
        DelayInMilliSeconds = delayInMilliSeconds;
    }
}

This is the message that starts the load and sequence test:

/// <summary>
/// This message signals the worker thread to go into a loop sending out Message2Bs
/// </summary>
public class Message2A : BaseMessage
{
    /// <summary>
    /// Constructor
    /// </summary>
    public Message2A()
        : base(MessageType.Message2A)
    {
    }
}

This is the message sent back for the load and sequence test:

/// <summary>
/// This message signals the worker thread to go into a loop sending out Message2Bs
/// </summary>
public class Message2B : BaseMessage
{
    /// <summary>
    /// The current sequence number
    /// </summary>
    public readonly int SequenceNumber;

    /// <summary>
    /// The name of the managed thread sending this message
    /// </summary>
    public readonly string ThreadName;

    /// <summary>
    /// Constructor
    /// </summary>
    public Message2B(int sequenceNumber, string threadName)
        : base(MessageType.Message2B)
    {
        SequenceNumber = sequenceNumber;
        ThreadName = threadName;
    }

The Thread Manager

This class is the connection point for your application code. It implements the IThreadManager interface.

/// <summary>
/// Interface to the thread manager
/// </summary>
public interface IThreadManager
{
    /// <summary>
    /// Create a thread, it must be started running separately.
    /// This method pends at the message queue until a message arrives and then
    /// calls your message handler.
    /// </summary>
    IThread CreateThread(string threadName, Action<BaseMessage> messageHandler);

    /// <summary>
    /// Create a thread, it must be started running separately.
    /// This method pends at the message queue until either a message arrives or the time
    /// out period expires and then calls your message handler. If a time out the handler
    /// is called with a null instead of a message object.
    /// </summary>
    IThread CreateThread(
        string threadName, 
        Action<BaseMessage> messageHandler,
        int waitTimeInMilliseconds);

    /// <summary>
    /// Get a list of all the threads under management
    /// </summary>
    List<IThread> GetThreads();

    /// <summary>
    /// Start all threads running
    /// </summary>
    void StartAllThreads();

    /// <summary>
    /// Stop all threads
    /// </summary>
    void StopAllThreads();

    /// <summary>
    /// Removes all threads from the manager
    /// </summary>
    void ClearAllThreads();

    /// <summary>
    /// Send a message to the thread that created the Thread Manager
    /// </summary>
    void SendMessage(BaseMessage msg);

    /// <summary>
    /// If your application code wants to receive messages,
    /// it should wire in this event.
    /// </summary>
    event Action<BaseMessage> ReceivedMessage;
}

This allows your application code to:

  • Obtain a reference to a ThreadManager instance.
  • Create any number of threads for your system.
  • Start and stop all threads running.
  • Obtain an IThread list of all the current threads.
  • Send a message to any of those threads.
  • By wiring in an event, receive messages from the threads.

Using the Library

I haven't found a clean way of adding to the message enumerations and classes without touching the message class code, so currently, the library source code has to be included in your solution and custom changes made for your specific application.

I've included a WinForms app that I use to test the library, and demos it also. There are two tests I've written.

Loop Test

In this test, the WinForms app creates three threads. The WinForms thread sends Message 1 to thread 1, which then sleeps for the specified time before passing the message on to thread 2. Thread 2 sleeps for the specified time and passes the message on to thread 3, which also sleeps for the specified time before finally sending the message back to the WinForms thread, completing the test.

Creating and Starting the Threads

The following code from Form1::btnStartSleepLoop_Click() initiates the test.

Example1 is a class that provides the custom message handler needed by the Managed Thread class. Note that worker3 is passed a reference to the WinForms mailbox, where it will send Message 1 when it completes. The managed thread is then created with a reference to the message handler implemented in worker3. Ditto for threads 2 and 1. Next, the WinForms thread wires in the ReceivedMessage event to receive the message from thread 3 when it completes. All the managed threads are then started, and thread 1 is sent Message 1 to start the test.

// Get a reference to the thread manager
threadMgr = ThreadManager.GetThreadManager();

// Create 3 threads for passing around the sleep message
// Create them in reverse order as thread 1 needs to know thread 2's mail box, etc.
Example1 worker3 = new Example1(
    "Thread3",
    threadMgr.SendMessage); // Thread 3 sends the message back to this thread
IThread thread3 = threadMgr.CreateThread(worker3.ThreadName, worker3.MessageHandler);
threads.Add(worker3.ThreadName, thread3);

Example1 worker2 = new Example1(
    "Thread2",
    thread3.SendMessage); // Thread 2 sends the message to thread 3
IThread thread2 = threadMgr.CreateThread(worker2.ThreadName, worker2.MessageHandler);
threads.Add(worker2.ThreadName, thread2);

Example1 worker1 = new Example1(
    "Thread1",
    thread2.SendMessage); // Thread 1 sends the message to thread 2
IThread thread1 = threadMgr.CreateThread(worker1.ThreadName, worker1.MessageHandler);
threads.Add(worker1.ThreadName, thread1);

// Wire in the message received event handler
threadMgr.ReceivedMessage += new Action<BaseMessage>(threadMgr_NewMessage);

// Start the threads running
threadMgr.StartAllThreads();

// Send a message to the 1st thread
Message1 msg = new Message1(Convert.ToInt32(txtDelay.Text) * 1000);
thread1.SendMessage(msg);

Example1 Class

This class has a send message delegate where it will send the message on to when it is finished processing. A reference to MessageHandler() is passed to the Managed Thread, which then calls this method every time a message is received.

/// <summary>
/// Class to handle the thread messaging loop for Example 1.
/// This class is used to encapsulate all the objects maintained by the thread.
/// </summary>
public class Example1
{
    /// <summary>
    /// Name of this thread
    /// </summary>
    public readonly string ThreadName;

    /// <summary>
    /// Delegate used to send the message on to the next thread
    /// </summary>
    private Action<BaseMessage> sendMsg;

    /// <summary>
    /// Ctor
    /// </summary>
    public Example1(string threadName, Action<BaseMessage> sendMsg)
    {
        ThreadName = threadName;
        this.sendMsg = sendMsg;
    }

    /// <summary>
    /// Received message handler
    /// </summary>
    public void MessageHandler(BaseMessage msg)
    {
        switch (msg.MsgType)
        {
            case MessageType.Message1:
                {
                    Message1 msg1 = (Message1)msg;

                    // Sleep for the requested time
                    System.Threading.Thread.Sleep(msg1.DelayInMilliSeconds);

                    // And then send the message on to the next thread
                    sendMsg(msg1);
                }
                break;
        }
    }

When thread 3 completes, the WinForms thread receives the message in threadMgr_NewMessage(). The method loopTestUpdateForm() handles the Invoke required to get back on the UI thread and update the controls on the form.

/// <summary>
/// This form has gotten a message, i.e. from thread 3
/// </summary>
void threadMgr_NewMessage(BaseMessage msg)
{
    if (msg != null)
    {
        switch (msg.MsgType)
        {
            // Thread 3 has finished processing and sent the message on to us
            case MessageType.Message1:
                // Clean up
                threadMgr.ReceivedMessage -= threadMgr_NewMessage;
                threadMgr.StopAllThreads();
                threadMgr.ClearAllThreads();
                threads.Clear();
                loopTestUpdateForm();
                break;
        }
    }
}

Load and Sequence Test

This test is designed to stress the system and detect missing messages and messages received out of sequence. Message 2A is sent to the three threads with the requested number of Message 2Bs to be sent back. Each thread sends back the required number of messages as fast as possible in a loop, with an ascending and continuous sequence number starting from 0. This allows the WinForms thread to check that not only were the expected number of messages received, but that they arrived in order.

Starting the Test

Three threads are created as before, except that this time, Example2 objects implement the message handlers, and they all send messages back to the WinForms thread.

Example2 Class

/// <summary>
/// Class to handle Example 2, the load/stress and dropped message test
/// This class is used to encapsulate all the objects maintained by the thread.
/// </summary>
public class Example2
{
    /// <summary>
    /// Thread name
    /// </summary>
    public readonly string ThreadName;

    /// <summary>
    /// Ascending sequence number sent out in successive messages
    /// </summary>
    private int messageSequenceNumber;

    /// <summary>
    /// The number of messages to send
    /// </summary>
    private int numberOfMessagesToSend;

    /// <summary>
    /// Delegate used to send the sequence of messages
    /// back to the thread running the test
    /// </summary>
    private Action<BaseMessage> sendMsg;

    /// <summary>
    /// Ctor
    /// </summary>
    public Example2(string threadName, int numberOfMessagesToSend, 
                    Action<BaseMessage> sendMsg)
    {
        ThreadName = threadName;
        messageSequenceNumber = 0;
        this.numberOfMessagesToSend = numberOfMessagesToSend;
        this.sendMsg = sendMsg;
    }

    /// <summary>
    /// Received message handler
    /// </summary>
    public void MessageHandler(BaseMessage msg)
    {
        switch (msg.MsgType)
        {
            case MessageType.Message2A:
            {
                // Drop into a loop sending out all the requested messages back to back
                while (messageSequenceNumber < numberOfMessagesToSend)
                {
                    Message2B outboundMsg = 
                              new Message2B(messageSequenceNumber, ThreadName);
                    sendMsg(outboundMsg);
                    messageSequenceNumber++;
                }
            }
            break;
        }
    }

Points of Interest

I've implemented this messaging scheme on an ad-hoc basis many times for various customers, and finally decided to package it and make it more reusable for future projects.

One important caveat is that when passing references around between threads, the threads will still have asynchronous access to the same object. The partitioning from a good multi-threaded design will eliminate this problem.

History

  • 05/15/2010 - First version.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

WillSkou

United States United States
No Biography provided

Comments and Discussions

 
GeneralI have not read entire article [modified] PinmvpSacha Barber19-May-10 4:38 
GeneralMessage Queues Pinmemberdjhayman18-May-10 22:12 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Terms of Use | Mobile
Web02 | 2.8.141220.1 | Last Updated 18 May 2010
Article Copyright 2010 by WillSkou
Everything else Copyright © CodeProject, 1999-2014
Layout: fixed | fluid