Click here to Skip to main content
15,868,141 members
Articles / General Programming / Threads
Tip/Trick

Simple Blocking Queue for Thread Communication and Inter-thread Invocation

Rate me:
Please Sign up or sign in to vote.
4.98/5 (20 votes)
23 Jan 2011CPOL4 min read 77.4K   35   34
A generic class to be used as a conveyor for data and tasks between threads

I want to provide this advice on thread communication, because I already gave several answers on the use of threads to CodeProject members. I post my Answers to Questions found in Question & Answers section of the site; and usually my answers are accepted very well. The only problem is that many questions cause me to reproduce nearly the same ideas over and over. I need to place some code and explanation in a permanent place and refer to it. At the same time, the techniques I offer may not fit the format of whole article. So, I think the form of Tips/Tricks will serve my purpose well.

Here is the most typical situation. A developer creates some UI, most typically with System.Windows.Forms; WFP is less usual at this time. Some UI action requires long time processing; this is where the handling of the UI event does not work well. In this case, some developers don't know what to do, and some use threads, but try to create a thread repeatedly, using Background worker, created a regular threads or a thread from thread pool. In simple cases this is enough, but in massive processing and more complex logics the problem is the total number of threads that becomes unpredictable. Of course, the extra cost of repeated creation and later abandoning of a thread can easily cause a performance hit, and more importantly, a problem of clean-up (and generally, of support).

 

In many such cases, I advice to create a fixed number of threads and permanently keep them running, posting some task to such thread when needed. What is relatively difficult to explain is how to keep a thread in a wait state (wasting zero CPU time) between tasks. It also takes some nerve to explain why spin wait is wrong.

 

Another problem which seems to be different is a desire to implement behavior similar to System.Windows.Forms.Control.Invoke or System.Windows.Threading.Dispatcher. There is a popular misconception about this class, caused by the method System.Windows.Threading.Dispatcher.GetCurrectDispatcher. It always works, but the Invoke method can only be called on the target thread if this thread is designed in a special way. In practice, such inter-thread processing really takes place if the target thread if the specially designed UI thread created in the UI cycle of Window.Form.Application or WPF Application. The problem is that the class Dispatcher is sealed. So, the question is how to provide similar functionality a custom thread.

 

The two groups of problems depicted above can be resolved using some queue encapsulated in the thread used as a target of the invocation. First, I made a simple blocking queue class to provide both data transport and synchronization between threads:

 

C#
using System;
using System.Threading;
using System.Collections.Generic;

public class DataQueue<ITEM> {

    public DataQueue() { this.IsBlocking = true; }
    public DataQueue(bool blocking) { this.IsBlocking = blocking; }

    public EventWaitHandle WaitHandle { get { return FWaitHandle; } }

    public void Close() {
        this.FWaitHandle.Set();
    } //Close

    public void Submit(ITEM item) {
        lock (LockObject) 
            Queue.Enqueue(item);
        FWaitHandle.Set();
    } //Submit

    public ITEM GetMessage() {
        if (IsBlocking)
            FWaitHandle.WaitOne();
        ITEM result = default(ITEM);
        lock (LockObject) {
            if (Queue.Count > 0)
                result = Queue.Dequeue();
            if (IsBlocking && (Queue.Count < 1))
                FWaitHandle.Reset();
            return result;
        } //lock
    } //GetMessage

    public void Clear() {
        lock (LockObject)
            Clear(Queue);
    } //Clear
    public void Clear(Func<ITEM, bool> removalCriterion) {
        lock (LockObject)
            Clear(Queue, removalCriterion);
    } //Clear

    #region implementation
    
    static void Clear(Queue<ITEM> queue) { queue.Clear(); }
    static void Clear(Queue<ITEM> queue, Func<ITEM, bool> removalCriterion) {
        if (removalCriterion == null) {
            queue.Clear();
            return;
        } //if
        Queue<ITEM> copy = new Queue<ITEM>();
        while (queue.Count > 0) {
            ITEM item = queue.Dequeue();
            if (!removalCriterion(item))
                copy.Enqueue(item);
        } //loop
        while (copy.Count > 0)
            queue.Enqueue(copy.Dequeue());
    } //Clear

    bool IsBlocking;
    Queue<ITEM> Queue = new Queue<ITEM>();
    
    //SA!!! important ManualReset.
    //See GetMessage for re-setting
    EventWaitHandle FWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
    Object LockObject = new object();

    #endregion implementation

} //DataQueue

 

 

 

 

I recommend using blocking operation nearly in all cases, otherwise the mechanism would behave like spin wait, causing unwanted abuse of the CPU.

 

 

 

Here is the simple sample of the target thread and the usage, a Console application:

 

 

 

C#
using System;
using System.Threading;

class QueuedThread {
    internal QueuedThread() { this.Thread = new Thread(Body); }
    internal void Start() { this.Thread.Start(); }
    internal void Abort() { //be extra careful!
        this.Queue.Close();
        this.Thread.Abort();
    } //Abort
    internal void PostMessage(string message) {
        this.Queue.Submit(message);
    } //void PostMessage
    #region implementaion
    void Body() {
        try {
            while (true) {
                string message = Queue.GetMessage();
                //safe way to exit, not alway possible
                if (message == null)
                    break;
                ProcessMessage(message);
            } //loop
        } catch (ThreadAbortException) {
            //Very important in case Abort is used
            //Provide last-will processing here
            //even if Abort is not a designed option
        } catch (Exception e) { //always needed
            Console.WriteLine("{0}: {1}", e.GetType().Name, e.Message);
        } //exception
    } //Body
    void ProcessMessage(string message) {
        Console.WriteLine("message Processed {0}", message);
    } //ProcessMessage
    Thread Thread;
    DataQueue<string> Queue = new DataQueue<string>();
    #endregion implementaion
} //class QueuedThread

class Program {

    void Run() {
        QueuedThread t = new QueuedThread();
        t.Start();
        t.PostMessage("first");
        Thread.Sleep(400);
        t.PostMessage("second");
        Thread.Sleep(400);
        t.PostMessage("third");
        Console.WriteLine();
        Console.WriteLine("Finished. Press any key...");
        Console.ReadKey(true);
        //safest way of exiting of the thread,
        //unfortunatelly, not applicable to 100% cases:
        t.PostMessage(null);
        //less safe way, need extra care in thread implementation,
        //safe enough for this sample though:
        //t.Abort();
    } //Run

    static void Main(string[] args) {
        new Program().Run();
    } //Main

} //class Program

 

 

I don't want to go deeper into the relatively complex issue related to acceptance of Abort method (see sample code). Sometimes, such discussions lead to flame wars. The final decision should be made by the developer who wants to use my advice (and, generally, any developer using threads). Anyway, when a data queue is used, Abort could easily be avoided using a special “termination” message (like an empty string in my simple example above).

 

 

Finally, one can also use delegates to implement processing tasks instead just of data elements:

 

The above may appear to be not flexible enough. The thread needs to know what to do with the data item; hence ProcessMessage is just one fixed method. Here is when inter-thread invocation similar to Dispatcher should come into play.

 

So, how about the Delegate invocation? Well, nothing prevents adding a delegate to the ITEM type. This is a just bit more complex. The first thing to understand is that a parameter (or parameters) for delegate invocation should be supplied as well. This is a minimalistic sample, again a Console application:

 

C#
using System;
using System.Threading;

class DelegateInvocationThread {

    class ActionWithParameter<PARAMETER> {
        internal ActionWithParameter(Action<PARAMETER> action, PARAMETER parameterValue) {
            this.Action = action;
            this.Parameter = parameterValue;
        } //ActionWithParameter
        internal static ActionWithParameter<PARAMETER> ExitAction { get { return FExitAction; } }
        internal void Invoke() {
            if (Action != null)
                Action(Parameter);
        } //Invoke
        internal bool ExitCondition { get { return Action == null || Parameter == null; } }
        Action<PARAMETER> Action;
        PARAMETER Parameter;
        static ActionWithParameter<PARAMETER> FExitAction =
            new ActionWithParameter<PARAMETER>(null, default(PARAMETER));
    } //ActionWithParameter

    internal DelegateInvocationThread() { this.Thread = new Thread(Body); }
    internal void Start() { this.Thread.Start(); }
    internal void Invoke(Action<string> action, string parameter) {
        Queue.Submit(new ActionWithParameter<string>(action, parameter));
    } //Invoke
    internal void InvokeExit() {
        Queue.Submit(ActionWithParameter<string>.ExitAction);
    } //InvokeExit()
    internal void Abort() { //be extra careful!
        this.Queue.Close();
        this.Thread.Abort();
    } //Abort
    
    #region implementaion
    void Body() {
        try {
            while (true) {
                ActionWithParameter<string> action = Queue.GetMessage();
                //safe way to exit, not alway possible
                if (action.ExitCondition)
                    break;
                action.Invoke();
            } //loop
        } catch (ThreadAbortException) {
            //Very important in case Abort is used
            //Provide last-will processing here
            //even if Abort is not a designed option
        } catch (Exception e) { //always needed
            Console.WriteLine("{0}: {1}", e.GetType().Name, e.Message);
        } //exception
    } //Body
    void ProcessMessage(string message) {
        Console.WriteLine("message Processed {0}", message);
    } //ProcessMessage
    Thread Thread;
    DataQueue<ActionWithParameter<string>> Queue = 
        new DataQueue<ActionWithParameter<string>>();
    #endregion implementaion
} //class DelegateInvocationThread

class Program {

    void Run() {
        DelegateInvocationThread t = new DelegateInvocationThread();
        t.Start();
        t.Invoke(
            delegate(string value) {
                Console.WriteLine(string.Format("Method #1, parameter: {0}", value));
            }, "first");
        Thread.Sleep(400);
        t.Invoke(
            delegate(string value) {
                Console.WriteLine(string.Format("Method #2, parameter: {0}", value));
            }, "second");
        Thread.Sleep(400);
        t.Invoke(
            delegate(string value) {
                Console.WriteLine(string.Format("Method #3, parameter: {0}", value));
            }, "third");
        Console.WriteLine();
        Console.WriteLine("Finished. Press any key...");
        Console.ReadKey(true);
        //safest way of exiting of the thread,
        //unfortunatelly, not applicable to 100% cases:
        t.InvokeExit();
        //less safe way, need extra care in thread implementation,
        //safe enough for this sample though:
        //t.Abort();
    } //Run

    static void Main(string[] args) {
        new Program().Run();
    } //Main

} //class Program

I've shown anonymous delegate syntax which can be compiled with C# v.2 or later. With C# v.3 lambda form of anonymous delegate can also be used:

 

 

C#
t.Invoke(
    new Action<string>((value) => {
        Console.WriteLine(string.Format("Method #3, parameter: {0}", value));
    }), "third");

 

 

This idea could further be used to combine more than one queue in the same target thread. The warning is: it is not possible to use more than one instance of DataQueue in the same thread: they will block each other, if blocking operation is used. The modifications should go inside the class DataQueue. At the same time, such generalization is easy using the techniques I offer.

Thank you,

 

 

— Sergey A Kryukov

 

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Architect
United States United States
Physics, physical and quantum optics, mathematics, computer science, control systems for manufacturing, diagnostics, testing, and research, theory of music, musical instruments… Contact me: https://www.SAKryukov.org

Comments and Discussions

 
Praisegreat contribution Pin
Southmountain9-Mar-22 9:39
Southmountain9-Mar-22 9:39 
GeneralRe: great contribution Pin
Sergey Alexandrovich Kryukov6-Dec-22 21:33
mvaSergey Alexandrovich Kryukov6-Dec-22 21:33 
GeneralNice reading Pin
Praveen Kumar Katiyar9-Jul-14 19:04
professionalPraveen Kumar Katiyar9-Jul-14 19:04 
GeneralRe: Nice reading Pin
Sergey Alexandrovich Kryukov9-Jul-14 20:26
mvaSergey Alexandrovich Kryukov9-Jul-14 20:26 
GeneralRe: Nice reading Pin
Praveen Kumar Katiyar11-Jul-14 4:13
professionalPraveen Kumar Katiyar11-Jul-14 4:13 
QuestionC# .net 2.0 WINCE Platform. Pin
mr colorbooks17-Sep-12 16:05
mr colorbooks17-Sep-12 16:05 
AnswerRe: C# .net 2.0 WINCE Platform. Pin
Sergey Alexandrovich Kryukov17-Sep-12 16:18
mvaSergey Alexandrovich Kryukov17-Sep-12 16:18 
GeneralRe: C# .net 2.0 WINCE Platform. Pin
mr colorbooks17-Sep-12 16:20
mr colorbooks17-Sep-12 16:20 
GeneralRe: C# .net 2.0 WINCE Platform. Pin
Sergey Alexandrovich Kryukov18-Sep-12 9:03
mvaSergey Alexandrovich Kryukov18-Sep-12 9:03 
Thank you for this information, but, as you can see, I did not make a statement about such compatibility. EventWaitHandle is one of the most important classes needed for multithreading anyway; if some platform does not support it, it's a big problem.
—SA
Sergey A Kryukov

GeneralRe: C# .net 2.0 WINCE Platform. Pin
mr colorbooks18-Sep-12 14:03
mr colorbooks18-Sep-12 14:03 
GeneralRe: C# .net 2.0 WINCE Platform. Pin
Sergey Alexandrovich Kryukov18-Sep-12 15:03
mvaSergey Alexandrovich Kryukov18-Sep-12 15:03 
GeneralMy vote of 5 Pin
hoernchenmeister6-Sep-12 2:16
hoernchenmeister6-Sep-12 2:16 
GeneralRe: My vote of 5 Pin
Sergey Alexandrovich Kryukov6-Sep-12 7:03
mvaSergey Alexandrovich Kryukov6-Sep-12 7:03 
QuestionVery good tip Pin
BillW335-Sep-12 8:07
professionalBillW335-Sep-12 8:07 
AnswerRe: Very good tip Pin
Sergey Alexandrovich Kryukov5-Sep-12 8:14
mvaSergey Alexandrovich Kryukov5-Sep-12 8:14 
GeneralRe: Very good tip Pin
BillW335-Sep-12 10:24
professionalBillW335-Sep-12 10:24 
GeneralRe: Very good tip Pin
Sergey Alexandrovich Kryukov5-Sep-12 12:48
mvaSergey Alexandrovich Kryukov5-Sep-12 12:48 
GeneralMy vote of 5 Pin
pasztorpisti23-Jul-12 12:17
pasztorpisti23-Jul-12 12:17 
GeneralRe: My vote of 5 Pin
Sergey Alexandrovich Kryukov23-Jul-12 13:47
mvaSergey Alexandrovich Kryukov23-Jul-12 13:47 
GeneralRe: Thank you, Indivara, I have something to think about anyway.... Pin
Sergey Alexandrovich Kryukov26-Jan-11 14:42
mvaSergey Alexandrovich Kryukov26-Jan-11 14:42 
GeneralRe: I didn't mean a visually appealing demo, just a downloadable... Pin
Indivara26-Jan-11 13:54
professionalIndivara26-Jan-11 13:54 
GeneralReason for my vote of 5 Clean, efficient and VERY simple! Pin
d.allen10125-Nov-11 1:45
d.allen10125-Nov-11 1:45 
GeneralRe: Thank you very much, Donald. --SA Pin
Sergey Alexandrovich Kryukov25-Nov-11 5:06
mvaSergey Alexandrovich Kryukov25-Nov-11 5:06 
GeneralReason for my vote of 5 Nice article - my 5 Pin
Espen Harlinn10-Jul-11 8:23
professionalEspen Harlinn10-Jul-11 8:23 
GeneralRe: Thank you, Espen. --SA Pin
Sergey Alexandrovich Kryukov10-Jul-11 8:49
mvaSergey Alexandrovich Kryukov10-Jul-11 8:49 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.