|

Introduction
In real time systems that need to respond to and process different tasks in
the fastest possible way, and the tasks can be processed concurrently, there
needs to be a number of thread waiting, willing, and able to perform the tasks
at hand. This can be achieved with a specific structural scheme for each
specific job, or with a generic method called thread pool. The work
queue is a simple and elegant type of thread pool that creates
requested number of threads at its creation and manages a queue of different
work items that implement the specific tasks, where each work item
in his turn gets a thread that works and processes it.
How to UseFirst a new type of work item needs to be defined in a
format that can be processed by the work queue. To do this,
you need to implement a work item class that inherits from the base
work item class WorkItemBase. The WorkItemBase
has two abstract functions that need to be implemented. The first,
DoWork, is the method that will be executed by the free thread when
the work item gets its turn in the queue. The second,
Abort, is called when the queue is destroyed prior to the work
item getting its turn. The specific work item should be implemented
as follows.
class SpecificWorkItem : public WorkItemBase
{
void DoWork(void* pThreadContext);
void Abort();
};
void SpecificWorkItem::DoWork(void* pThreadContext)
{
}
void SpecificWorkItem::Abort()
{
}
Note that a different types of work items can be processed in the same
work queue. In the work queue, first the Create
function needs to be called where the first parameter is the number of threads
and the second is an array of pointers to a thread data structure.
This structures can be used as a working surface for each of the threads. This
is used to reduce the local variables declared in the DoWork
function and the dynamic allocation for each item that is being
processed, for optimization of processing. CWorkQueue WorkQueue;
WorkQueue.Create(5);
As you can see in the code, one can ignore the second parameter which has the
default value is NULL. Next, to insert a work item into the
queue you need to call InsertWorkItem with the newly created
work item. This function can be called asynchronously. SpecificWorkItem* pWorkItem = new SpecificWorkItem()
m_WorkQueue.InsertWorkItem(pWorkItem);
After using the work queue you must call the Destroy
function. This function will wait until all threads have finished processing the
work items that are already out of the queue, then calls the
Abort function of each of the work items that are waiting in
the queue to be processed. m_WorkQueue.Destroy();
Implementation
The work queue is implemented using an STL queue of
work items which is guaranteed to work in an asynchronous system by a
mutual exclusion lock, a semaphore that is initialized with an initial count of
zero, an abort event, and an array of threads that are created to run a
particular function. This function is waiting on the two asynchronous
objects. First the abort event that, if set, the thread will return
from the function. Second, semaphore, that if released, the thread will
extract the next item in the queue and call its DoWork function. At
the insert function the semaphore is released (by 1) after inserting the
work item, allowing one of the threads to do attach to the worker
function. When the Destroy function is called in turn, the abort
event is set and the function waits for all threads to terminate. For each
of the work item that are left in the queue, it calls their
Abort function.
Demo project
The demo project is a project I included merely to illustrate the way the
work queue performs. What you need to do to work it is:
- determine the number of thread you want
- click the create button to create the work queue
- insert work item to the queue as you please by clicking the insert
item button
- destroy the work queue click the destroy button.
Enjoy.
thanks to Hans
Dietrich for the XlistCtrl
| You must Sign In to use this message board. |
|
| | Msgs 1 to 25 of 62 (Total in Forum: 62) (Refresh) | FirstPrevNext |
|
 |
|
|
I downloaded the sample project and tested it, it worked well. I have one question about the sample.
There are many MFC messages occured during the the progress bar drawing in the DoWork(){}, such as WM_PAINT etc. but the worked thread doesn't have a message queue, we are told not to process Windows messages in the worker thread.
Are there any Windows messages process in the code below?
m_pListCtrl->SetProgress(m_nItem,0);
for(int i = 0 ; i < 100 ; i++) { Sleep(100); m_pListCtrl->UpdateProgress(m_nItem,0,i); }
m_pListCtrl->DeleteProgress(m_nItem,0);
m_pListCtrl->SetItemText(m_nItem,0,_T("Complete"));
|
| Sign In·View Thread·PermaLink | |
|
|
|
 |
|
|
Very nice code & article!! I assign to your Work(queue ) the higher score, 5! Thanks for your work!
|
| Sign In·View Thread·PermaLink | 1.00/5 (1 vote) |
|
|
|
 |
|
|
I am using ur class for a simple program that i use at home. But i am having a problem when i try to cancel just an expecific thread. I would like to know, if possible, how to cancel -after creating 5 threads- lets say just the third thread and let all others continue with their work. Thanks. Desva
|
| Sign In·View Thread·PermaLink | |
|
|
|
 |
|
|
hello, i've got CWorkQueue working, but i'd like to be able to let the user cancel computation for a particular thread (as opposed to cancelling all threads with CWorkQueue::Destroy) from the GUI. i can figure out how to specify identifying information in each WorkItemBase-derived object, but how can i find and abort the correct thread from the GUI? it seems i'd have to add a method to CWorkQueue to search its list of threads, no? ed
|
| Sign In·View Thread·PermaLink | |
|
|
|
 |
|
|
I'd like to clarify: could I simply save a pointer to the WorkItemBase-derived object and call its Abort method? thanks, ed
|
| Sign In·View Thread·PermaLink | |
|
|
|
 |
|
|
 |
|
|
Thanks for your nice class. I have found that it was very helpful.
But, recently I discovered that this library leaks handle. After doing job using WQ, task manager shows that 1 handle increased and not released until program ends.
Look into with WinDbg, one thread handle is not being released. But, no threads are not present in WQ. Is there anyone who saw this?
|
| Sign In·View Thread·PermaLink | |
|
|
|
 |
|
|
The leak is in the demo part (CWQDemoDlg::OnDestroy()) where the thread hThreade is created but never destroyed. In this method, at the end change
} // End of else clause. } // End of the always while loop. GetDlgItem(IDC_CREATE)->EnableWindow(TRUE); GetDlgItem(IDC_NUMBER_OF_THREADS_SPIN_SPIN)->EnableWindow(TRUE); with
} // End of else clause. } // End of the always while loop. CloseHandle(hThreade); GetDlgItem(IDC_CREATE)->EnableWindow(TRUE); GetDlgItem(IDC_NUMBER_OF_THREADS_SPIN_SPIN)->EnableWindow(TRUE);
This fixes the leak.
Regards,
Yves Tkaczyk
|
| Sign In·View Thread·PermaLink | |
|
|
|
 |
|
|
I'm working on an MDI project, instead of the dialog-based example. When I create an instance of my subclass of WorkItemBase, I pass a pointer to the mainframe object, rather than a pointer to the XListCtrl object used in the example code. However, when I try to call CMainFrame methods from the thread (e.g., to indicate progress), various assertions fail regarding the validity of the CMainFrame's status bar or other handles, such as m_hWnd. Should I be using custom messages rather than passed pointers to indicate progress in my CMainFrame? If so, why is it OK to pass a pointer to the XListCtrl object in the demo code? thanks, ed
|
| Sign In·View Thread·PermaLink | |
|
|
|
 |
|
|
I wouldn't use a pointer in a thread to your GUI (bad things can happen). Instead (not saying it is the best method), try posting a message to your GUI...like (from your thread):
// string message allocator for posting messages between windows... static char* AllocBuffer(CString strMsg) { int nLen = strMsg.GetLength() + 1; char *pBuffer = new char[nLen]; strncpy(pBuffer,(const char*)strMsg, nLen); ASSERT(pBuffer != NULL); return pBuffer; }
void SomeFunction(void) { ... CString str = "Test Message"; ::PostMessage(m_hWnd,UWM_MSG_UPDATE,(WPARAM)0,(LPARAM)AllocBuffer(str)); ... }
And have a function in your CMainFrame that reads it:
// in your MainFrm.h LRESULT OnUpdate(WPARAM wParam, LPARAM lParam);
// in your MainFrm.cpp LRESULT CMainFrame::OnUpdate(WPARAM wParam, LPARAM lParam) { char* pMsg = (char*) lParam; CString csMsg = pMsg; delete pMsg; .... // do something with it... .... return 0; }
That is how I do it 
Regards,
Dan
|
| Sign In·View Thread·PermaLink | |
|
|
|
 |
|
|
It is not the accessing of the CMainFrame itself that is inherently bad, it is the accessing of the MFC GUI objects that does not work from the secondary thread. The internal maps that map HWND to the CWND* are pretty thread-specific in MFC. You can post or send messages to a control's HWND safely, but not via the CWND* or its derivatie classes (like the CButton, or whatever). I typically create variables to store data that the secondary thread will update, and then post messages that the CMainFrame would retrieve, and then it reads the data.
I do not favor the method suggested elsewhere in this message thread, because posted messages are not guaranteed delivery, and your application will probably, inevitably, end up with memory leaks under any kind of heavy load.
|
| Sign In·View Thread·PermaLink | |
|
|
|
 |
|
|
i'm afraid i don't understand; you don't favor posting messages, yet you recommend posting "messages that the CMainFrame would retrieve". would you please explain the difference? i've stored a pointer to the CMainFrame in my thread, and have used mainFrame->PostMessage() to handle communication between the thread and the GUI, but it seems from your post that this may lead to memory leaks.... thanks for your advice, ed
|
| Sign In·View Thread·PermaLink | |
|
|
|
 |
|
|
I am sorry I was not more clear.
I don't favor posting message with data allocated by the sender that must be freed by the receiver, because a posted message is not guaranteed delivery. If the receiver does not get the message, then the memory is lost, and you have a memory leak.
It is better for the sender to update some queue that is shared between the sender and receiver, if the receiver never processes the message, the message data can eventually be deleted when queue is removed, cleaned up by receiver, cleaned up by sender, etc. At least the message data memory is not lost.
Let me know if this still does not make sense, and I will elaborate further.
|
| Sign In·View Thread·PermaLink | |
|
|
|
 |
|
|
I'm using this fine class, but i have a problem with timeout support. For ex. if the duration of the job excess 2 min. i want kill it, with the complete call stack for destroy.
Could you (all) help me about.
Daniele
|
| Sign In·View Thread·PermaLink | |
|
|
|
 |
|
|
i saw several of codes threads and processes but as you know every one how studies computer scince can't work alone with aout external help so i am bigenner student i want some simple compleste code about threads with simulation because i wnat to ceart a complete programs which can help student in the futcure e-mail: programer_2000@hotmail.com
|
| Sign In·View Thread·PermaLink | 1.00/5 (1 vote) |
|
|
|
 |
|
|
If the threads in the pool are going to access a MFC object they must be create with AfxBeginThread (a worker thread)instead of ::CreateThread or beginthread because MFC stores certain data per thread. The real problem is that using ::CreateThread does not cause an inmediate failure it all depends in what the thread will do.
I don't have any links for a description of the problem but you can Google for it.
"I don't want to achieve immortality through my work... I want to achieve it through not dying." Woody Allen
|
| Sign In·View Thread·PermaLink | 2.00/5 (1 vote) |
|
|
|
 |
|
|
CreateThread -> safe if only accessing Win32 API calls -> no runtime functions and no MFC _beginthradex -> safe if accessing Win32 API calls and C runtime functions -> No MFC AfxBeginThread -> safe if accessing Win32 API calls, C runtime functions, and MFC
You end up with unitiialized thread specific variables and subsequent memory leaks if you do not start the threads with the proper functions.
|
| Sign In·View Thread·PermaLink | 3.00/5 (1 vote) |
|
|
|
 |
|
|
The idea is elegant, you are right. Every good programmer should discover such a pattern  Now, about the performance. If you're not limited by win9x platforms you can (and should) use more sophisticated thread-managing tools.
IO Completion ports. This kernel object acts similar to what you've done, but its main goal is to minimize the count of context switches, hence it is more effective. Every thread in the pool should await its turn by calling GetQueuedCompletionStatus(), whereas you post tasks (and the termination instruction at the end) by PostQueuedCompletionStatus().
QueueUserWorkItem. Honestly speaking I've never used it myself, but as far as I know this creature is similar to IO completion ports, however it adjusts the number of threads in its pool automatically (collects statistics), so it is believed to work even better. However as for me, I would prefer to make statistics myself 
|
| Sign In·View Thread·PermaLink | 2.00/5 (1 vote) |
|
|
|
 |
|
|
Hi.
I´m using this class, and its work very god.
However, when a WorkItemQueue uses an ADOConnection (by a pointer) the memory of the application grows 4k.
That Varible is declared with : _ConnectionPtr pConn;
And the constructor is :STLogLatido::STLogLatido(CString m, _ConnectionPtr c, CProgressCtrl* pr) { pConn = c; msg = m; m_prgbar = pr; m_prgbar->SetPos(m_prgbar->GetPos() + 1); }
I create the WorkItem with: Queue.InsertWorkItem(new STLogLatido(m, pConn_Log, &m_barra)); I´ve posted the problem here:http://www.codeproject.com/script/comments/forums.asp?forumid=1647&select=814385&fr=51&msg=814385#xx813863xx
I need to resolve the problem..... please help 
|
| Sign In·View Thread·PermaLink | |
|
|
|
 |
|
|
Because of assignment, you are increasing the reference count of the connection pointer beyond 1. So even if it is destroyed, it does not release reference to ADO object, and you probably create a new ADO each time somehwere earlier in the code. So you are missing exactly 4K one time, for the single connection which is never released, or 4K per work item added, since you make a new one, assign it, and then it is never released. You need to either pass POINTER to the _ConnectionPtr or else you need to decrement the reference count on the _ConnectionPtr passed by reference after its assignment.
|
| Sign In·View Thread·PermaLink | 2.00/5 (1 vote) |
|
|
|
 |
|
|
sleep(100) changed into sleep(1) in src code, add many work create 1 thread under running. Debug message: /////////////////////////////////////// The thread 0x271C has exited with code 0 (0x0). Detected memory leaks! Dumping objects -> {99} normal block at 0x00432EA0, 8 bytes long. Data: <` > 60 FD 12 00 00 00 00 00 {98} normal block at 0x00431A70, 4 bytes long. Data: < > A4 0B 00 00 {97} normal block at 0x00432EE0, 48 bytes long. Data: < > CC CD CD CD 00 00 00 00 00 00 00 00 00 00 00 00 Object dump complete. The thread 0x2730 has exited with code 0 (0x0). The program 'F:\temp\Work_Queue_demo\WQDemo\Debug\WQDemo.exe' has exited with code 0 (0x0).
|
| Sign In·View Thread·PermaLink | 2.00/5 (1 vote) |
|
|
|
 |
|
|
I compiled your project, start in debug mode and create only one thread, I waited until the thread finished. Then close the application with button exit. Following output was generated:
Detected memory leaks! Dumping objects -> {112} normal block at 0x002F5718, 8 bytes long. Data: <` > 60 FD 12 00 00 00 00 00 {111} normal block at 0x002F56D0, 4 bytes long. Data: < > A8 0B 00 00 {110} normal block at 0x002F5658, 48 bytes long. Data: < > CC CD CD CD 00 00 00 00 00 00 00 00 00 00 00 00 Object dump complete.
I tried VS 6.0 and VS 7.1 with the same result. Where is the problem ??
Ps:
Congratulations - Your work is very good, if I find the memory leak, I take your idea in my current project.
thank you Kurt Muellner
Fast Prototyping
|
| Sign In·View Thread·PermaLink | 3.33/5 (3 votes) |
|
|
|
 |
|
|
Sorry, sorry, sorry
It was my fault, your software works fine. It seems, that I am faster writing a complaint as writing code correctly.
cu  kurt
Fast Prototyping
|
| Sign In·View Thread·PermaLink | 3.00/5 (3 votes) |
|
|
|
 |
|
|
hi,
It is a realy cool code, my Q. is do IIS or any other Web server works on the same way? why i am asking this Q is to know any other way to make high load server applications.  
Thanks in advance
Vishal
|
| Sign In·View Thread·PermaLink | |
|
|
|
 |
|
|
Hi, I would wanna know if is There any way to know when all the Threads have finishes their work.... It could be sending a message to the dialog, but where?
Thanks....
|
| Sign In·View Thread·PermaLink | |
|
|
|
 |
|
|
General News Question Answer Joke Rant Admin
|