Introduction
"Oh my god not another threading article", I hear you cry. "Isn't
there enough threading articles on CodeProject already!?"
"Er, no," is my answer (now
that's a surprise isn't it).
So why bother? If you want a good example
of a thread pooling solution then read the C# article by Ami Bar
(Smart Thread Pool), and there are lots of other good threading articles throughout CodeProject, so why?
Well, most of these articles are written for C# (indeed most of
all the articles here seem to be written for C# at the moment) and I wanted
to see a VB solution.
What does this set of wrapper classes do?
Well they are a group of classes that come
together to allow the developer to create their own class which performs a task and
is inherited from a previously created base class. Each instance of this developer
created class runs in its own thread and data is fed to it, one data item at a time
(as a passed parameter) from a single or even multiple input queue(s).
The class then outputs any resultant data via multiple output queues. These queues in turn
become input queues for other task instances. What this means is that you
can quite easily create a basic workflow. A piece of data moves from
one thread, which performs a unique job or task, to a second thread, then perhaps to a
third thread etc. All inter-thread movement is made by use of queues.
In the samples provided I have created such a "workflow". One task identifies
a list of files in a folder, and the description of each
file is then sent to a separate thread which zips the file. A description of the zipped
file is then sent to another thread which encrypts the file.
Background
The example mentioned above was an actual
requirement at a site I worked on (the code written at the client site has not been
used for this article).
We had a large number of files created by a Siebel
system based in an office in France. These files (for business reasons) had to
be sent via an FTP to a server in Hong Kong over a slow connection. The data was
fairly confidential and had to be encrypted. The speed of the connection meant
we had to zip the files too. (I actually had another thread which sent the
encrypted files to an FTP server.)
So why didn't I just use a threaded pool for this? After
all, each thread would be stateless and could be killed or reused after every
file was processed. Well, other processes, apart from my own threads, use the
threaded pool and if I sent five or six hundred file requests then the pool would
queue the majority of the requests and all of these other tasks would be queued
too.
Also, at the same time I received another requirement
which needed threads to remember state (well live connections really)
between different data queue Items. Here I was prevented from using the threaded
pool because when a thread is complete in the pool it dies and is then recreated
with another task, so I could not maintain connections as the thread died. Again,
I also didn't want to hog thread pool threads by not allowing the threads
to die after a piece of data was processed.
I am also a lazy person and once I have written the
queue/thread processing logic thingy bit I don't want to do it again and again.
This is why I love inheritance! By creating a wrapper around the threads
and making an inheritable base class I am free! Free I tell you! Free from
threads (yippee - stirring music begins) a new dawn rises!
*Ahem* Sorry about that, moving on...
Terminology and Class Relationship
The thread unit factory stores lots of instances of
thread units.
A thread unit is a holding place for a .NET thread and
its associated thread worker. The thread worker contains the ability interrogate
multiple input queues. Whenever data is passed to any input queue it is the
thread worker that reads it and sends it to an embedded task class (inherited
from the base task class).
As you can see from the diagram in this paradigm, queues have direction. They can be inbound or outbound and are used as input queues or
output queues respectively. When direction is assigned to a queue it is actually assigned
to a synchronized version of the queue which is nearly thread safe
(I use synclock to make it fully thread safe). Therefore, it is possible for an
actual queue to be given multiple directions and be assigned to many tasks. Indeed it is
also possible for a task to have the same actual queue as an input queue
and as an output queue. For example, a folder name coming through an inbound
queue could cause the task to send a list of its subfolders outbound to the same
queue. These in turn enter the task through its inbound queue.
As well as having direction, queues also have priority so you can have a high priority inbound queue to
be used as a command queue and an inbound queue at a low priority for data.
Something like this would allow commands to be issued to the task before
any further data items are accepted (a reason why in the above example it may be
better to add each subfolder as an item on the inbound queue of a thread unit as
opposed to processing all subfolders in one pass through the task class).
How does the developer use the wrapper classes?
Create an application.
Add 2 files to the application.
These are SiebNet
Core Classes.vb and SiebNet Core Factory Classes.vb.
In the applications own code block add the references:
Imports SiebNet.Threading.Core.Classes
Imports SiebNet.Threading.Core.Factories
If the application has its own root namespace (see menu
project/properties) then you must change the Imports
to have the root
namespace at the beginning of the namespace. So, for example, if the root namespace is
ExampleProgram
then the lines would be:
Imports ExampleProgram.SiebNet.Threading.Core.Classes
Imports ExampleProgram.SiebNet.Threading.Core.Factories
If there is a root namespace then you must also amend the Imports on the
file SiebNet Core Factory Classes.vb to include the root namespace. If you
are feeling particularly keen you could instead just merge the two VB files
and their namespaces together. It's up to you.
Now define new task class that is inherited from baseTask (We will look at
the base class in detail later).
In the new class, override the "perform"
function with the code needed to do the task. If there is to be an update to
a GUI control in this task then override the UpdateGui
subroutine.
Finally, if there any live connections that need to be closed or other activities that you wish to happen exactly
at the point after the thread unit has been stopped by the thread unit factory then
override the "Finish" Subroutine.
Namespace MyNamespace.NoGUI
Friend Class taskFOO
Inherits baseTask
Friend Overrides Sub Perform(ByRef Item As IisQItem)
Thread.Sleep(200)
End Sub
End Class
End Namespace
The above creates a task called taskFoo
. This class does not interact with the GUI
in any way. Every time a queue item arrives from the queue this task sleeps for
200 milliseconds.
Namespace MyNamespace.GUI
Friend Class taskFOO2
Inherits baseTask
Friend Overrides Sub Perform(ByRef Item As IisQItem)
Dim i As Integer
For i = 1 To 10
PerformGuiUpdate(Str(i))
Next
End Sub
Friend Overrides Sub UpdateGui(ByVal GUIUpdObj As Object)
If Not _GuiControl Is Nothing Then
If TypeOf _GuiControl Is TextBox Then
CType(_GuiControl, TextBox).Text = CType(GUIUpdObj,
String)
End If
End If
End Sub
End Class
End Namespace
The above creates a task called taskfoo2
which, every time a
queue item arrives, counts to 10 updating an associated textbox with the
count as it goes.
Please note the namespace has been included. It is very important
that a namespace is used. This will become apparent later as the discussion turns to how a task is instantiated. For now just realize that all defined task classes must lie within a namespace.
Define all meaningful object(s) that will be passed down the queues
For a Queue to be of use then "stuff" needs to be sent down the queue. Unfortunately not just any object
can be passed through the queue. The object must implement an interface called IisQItem
which is defined as:
Interface IisQItem
Inherits ICloneable
Function QItemType() As Q_ItemType
Function QItemSubType() As String
End Interface
An example of an Queue Item Object is:
Friend Class qitemFOO
Implements IisQItem
Private _fooText As String
Friend ReadOnly Property text() As String
Get
Return _fooText
End Get
End Property
Friend Sub New(ByVal fooText As String)
_fooText = fooText
End Sub
Public Function QItemSubType() As String Implements
SiebNet.Threading.Core.Classes.IisQItem.QItemSubType
Return ("FooTextClass")
End Function
Public Function QItemType() As
SiebNet.Threading.Core.Classes.Q_ItemType Implements
SiebNet.Threading.Core.Classes.IisQItem.QItemType
Return Q_ItemType.Data
End Function
Public Function Clone() As Object Implements System.ICloneable.Clone
Clone = New qitemFOO(_fooText)
End Function
End Class
Rather than a piece of code having lots of "if TypeOf
class then... "
statements, in certain circumstances it is easier to say: Is this a
TypeOf
IisQitem
? Does it have a
QItemType
of "data" and to then use a "select
case <QItemSubType>" structure for processing the data.
The
cloning facility is required so that a complete object copy of the object could
be made. This is useful when a task sends the same queue item out to multiple
outbound queues. The code in the basetask makes a separate copy of the object
for each outbound queue so that any tasks which receive the same data don't use the same object reference
possible impacting each other.
Define a subroutine which builds the task instances and associated Queues
The new classes have been created. The application is ready to "rock".
Now we must create a subroutine which instantiates the various classes as well as creating
queues for those classes.
A lot of the object
creation work has been hidden from the developer by the Thread Unit factory
class but still a little work is required.
Suppose we have defined a simple
task (taskfoo
) which had one inbound Queue (FolderQ) and two outbound queues
(FileQ and FolderQ – this is the same actual queue as the inbound queue but
with a different direction). The queue can transport two types of item
classes - dataFileQItem
and dataFolderQItem
- both of which are based on the interface IisQItem A
subroutine
to setup the classes for processing may look
like:
Public Class frmMain
Private facThreadUnit As ThreadUnitFactory = New ThreadUnitFactory
Private facQ As QFactory = New QFactory
....
Private Sub btnStartThreadUnits_Click(ByVal sender As System.Object,
ByVal e As System.EventArgs) Handles btnStartThreadUnits.Click
Dim thrUnit As baseThreadUnit
facQ.Add(New baseQ_Wrapper("FileQ"))
facQ.Add(New baseQ_Wrapper("FolderQ"))
thrUnit = facThreadUnit.CreateThreadUnit("taskFooKey",
"NAMESPACE.taskFoo")
thrUnit.addQ(facQ.Item("FolderQ"), Q_Priority.Low,
Q_Direction.InBound, -1)
thrUnit.addQ(facQ.Item("FileQ), Q_Priority.Low,
Q_Direction.OutBound, -1)
thrUnit.addQ(facQ.Item("FolderQ), Q_Priority.Low,
Q_Direction.OutBound, -1)
thrUnit.Start()
End Sub
....
End Class
Define a subroutine which adds data to the queues
In this example lets assume that taskfoo
processes folders, therefore it needs a folder to begin its processing. The following would do this:
Private Sub Button1_Click(ByVal sender As System.Object, ByVal e As
System.EventArgs) Handles Button1.Click
facQ.Item("FolderQ").Enqueue(New dataFolderQitem("C:\Subdir1"))
End Sub
Define a subroutine which shuts down the thread units
Private Sub Button2_Click(ByVal sender As System.Object, ByVal e As
System.EventArgs) Handles Button2.Click
facThreadUnit.abortAllThreadUnits(True)
End Sub
The above shows how to abort all thread units
in the factory. Please notice a boolean true at the end of the line.
This is telling the class to do a "clean"
abort, which is an attempt to get the thread unit to shut down in a clean way
- when it is not currently processing and is ready to read the next queue
item.
This is as opposed to a dirty abort which forces the
thread unit to issue a Thread.Abort()
command straightaway irrespective of whether the task has finished working on
a data item or not.
What happens during a clean
abort is that the factory sends an abort request to each thread unit. The
thread unit forwards the request to its own internal thread worker which then
attempts to add an abort command queue item to an input queue. If there is a
high priority queue then that is used, otherwise a normal one is selected.
When a task has completed processing
on a queue item it ends it function call, its parent thread worker waits for a new
queue item before calling the task instance again. If instead of a data item the
worker finds an abort request it doesn't call the task class, rather
it drops out of its queue processing loop and ends it function. This
kills the background thread.
The factory keeps monitoring its thread
units and if after about four seconds, it finds that some are still alive then it
does a dirty abort on the remaining thread units. As a final act the factory
calls each thread unit's finish subroutine, which in turn calls the task's finish
subroutine. This allows logging out from connections and other such actions to be
done, rather than waiting for garbage collection to force the actions.
OK the job is now done - take a break, relax, have a coffee!
A trivial example
Welcome back!! Here is a trivial example:
This example uses the system to change the colour of a text box (Yawn!)
Create a new Windows application called ExampleColouredTextBox (this will be
the root namespace too).
In the form create three buttons and a text box (as in
picture above). The names of the controls are BtnCreateThreadUnits
, btnEnQueue
,
btnAbort
and TextBox1
.
Add the two VB files(SiebNet Core
Classes.vb and SiebNet Core Factory Classes.vb) Amend the import of the file
SiebNet Core Factory Classes.vb to be:
Imports ExampleColouredTextBox.SiebNet.Threading.Core.Classes
Now insert the following code in the main form VB:
Imports ExampleColouredTextBox.SiebNet.Threading.Core.Classes
Imports ExampleColouredTextBox.SiebNet.Threading.Core.Factories
Public Class Form1
Friend facThreadUnit As ThreadUnitFactory = New ThreadUnitFactory
Friend facQ As QFactory = New QFactory
Private Sub BtnCreateThreadUnits_Click(ByVal sender As System.Object,
ByVal e As System.EventArgs) Handles BtnCreateThreadUnits.Click
Dim thrUnit As baseThreadUnit
facQ.Add(New baseQ_Wrapper("ColourQ"))
thrUnit = facThreadUnit.CreateThreadUnit("taskColour1",
"ExampleColouredTextBox.exampleClasses.taskColour", TextBox1)
thrUnit.addQ(facQ.Item("ColourQ"), Q_Priority.Low,
Q_Direction.InBound, -1)
thrUnit.addQ(facQ.Item("ColourQ"), Q_Priority.Low,
Q_Direction.OutBound, 100)
thrUnit.Start()
MsgBox("Started")
End Sub
Private Sub btnEnQueue_Click(ByVal sender As System.Object, ByVal e As
System.EventArgs) Handles btnEnQueue.Click
facQ.Item("ColourQ").Enqueue(New
exampleClasses.dataCurrentColour(Color.Red))
End Sub
Private Sub btnAbort_Click(ByVal sender As System.Object, ByVal e As
System.EventArgs) Handles btnAbort.Click
facThreadUnit.abortAThreadUnit("taskColour1", True)
MsgBox("Stopped")
End Sub
End Class
Namespace exampleClasses
Friend Class dataCurrentColour
Implements IisQItem
Private _Colour As Color
Friend Property Colour() As Color
Get
Return _Colour
End Get
Set(ByVal value As Color)
_Colour = value
End Set
End Property
Friend Sub New(ByVal Colour As Color)
_Colour = Colour
End Sub
Public Function QItemSubType() As String Implements
SiebNet.Threading.Core.Classes.IisQItem.QItemSubType
Return "curentColour"
End Function
Public Function QItemType() As
SiebNet.Threading.Core.Classes.Q_ItemType Implements
SiebNet.Threading.Core.Classes.IisQItem.QItemType
Return Q_ItemType.Data
End Function
Public Function Clone() As Object Implements System.ICloneable.Clone
Clone = New dataCurrentColour(_Colour)
End Function
End Class
Friend Class taskColour
Inherits baseTask
Friend Overrides Sub Perform(ByRef Item As IisQItem)
If TypeOf (Item) Is dataCurrentColour Then
With CType(Item, dataCurrentColour)
PerformGuiUpdate(.Colour)
Select Case .Colour
Case Color.Red
.Colour = Color.Green
Case Color.Green
.Colour = Color.Blue
Case Color.Blue
.Colour = Color.Red
End Select
System.Threading.Thread.Sleep(2)
End With
End If
SendToOutQ(Item, False, "ColourQ")
End Sub
Friend Overrides Sub UpdateGui(ByVal GUIUpdObj As Object)
If Not _GuiControl Is Nothing Then
CType(_GuiControl, TextBox).BackColor = CType(GUIUpdObj,
Color)
End If
End Sub
End Class
End Namespace
Now build and run. Have fun.
Provided Examples
Some example applications that use the wrappers have been provided, each of
them dealing with a slightly different aspect of the threading solution. I
strongly suggest that you have a look at them. These
are:
- ExampleColouredTextBox: The above trivial example which shows basic
queue transportation where a task takes the colour passed to it and sets the
background colour of an associated textbox.
- ExampleRoutingEngine: This
solution provides a centralized routing task which moves items around a number
of tasks. Every other task in the application sends its output to the routing
task. The router examines an item and extracts its next routing entry which
determines what the destination is for that item.
The application creates
queue items that have a random colour and a random shape – there are four different
shapes and four different colours that can be selected.
Each item then has three
routing entries added to it (one for the colour, one for the shape and a final
routing to a final task), this means that the router will send each
item to three separate tasks. There are nine tasks in the system excluding the router –
one for each shape, one for each colour and the final task). Each task
increments the value of a label on the GUI giving us a breakdown in shapes and
colours of the randomizer.
- ExampleZipAndEncryptGUIUpdateViaQueues: This
example allows the user to select a folder where every file in that folder is
zipped and encrypted in a new destination folder. Optionally if there are any
subfolders then they too can be analyzed, zipped and encrypted.
There are three
main thread units here running tasks. The first task is an analyzer. It
identifies, from an item being passed to it that holds the source folder and
destination folder, a list of files that are to be zipped and optionally a list
of subfolders which are routed back to the analyzer as separate items.
The
list of files is sent as separate description items, via queues, to the zip
task, which, surprise surprise, zips the file. The description of the zipped file
is then sent to the encryption task which encrypts the file.
This process
actually updates lots of GUI controls. In this example rather than have all three
tasks continually update the GUI, I have created additional tasks that
update progress bars, etc. These tasks are feed by additional queues from the three
main tasks. This means the updates are slightly later than when they really
occurred but this isn't a major concern for this example.
- ExampleZipAndEncryptGUIUpdateInTasks: This
example allows the user to select a folder where every file in that folder is
zipped and encrypted in a new destination folder. Optionally if there are any
subfolders then they too can be analysed, zipped and encrypted.
There are three
main thread units here running tasks. The first task is an analyser. It
identifies, from an item being passed to it that holds the source folder and
destination folder, a list of files that are to be zipped and optionally a list
of subfolders which are routed back to the analyser as separate items.
The
list of files is sent as separate description items, via queues, to the zip
task, which, surprise surprise, zips the file. The description of the zipped file
is then sent to the encryption task which encrypts the file.
This process actually updates
lots of GUI controls. In this example all three of the main tasks continually
update the GUI. Each task has more than one GUI control to update. As the base
task class only allows one GUI control to be controlled in the class then there
is a slight issue. In this example we send references to other GUI Controls via
Queue Items which are gathered and used by the derived task classes. While this
may slow down the three tasks (They have to marshal each GUI update to the GUI
thread) this is mitigated by the fact that there is only three tasks running whereas
the above example had more tasks running – one for each GUI Control). It's a
juggle – you decide which way you would go.
Back to Base ics
As the majority of the developers work should be to do with overriding the basetask class it makes sense to look at this parent class. So here it is in all its glory:
Friend Class baseTask
Protected _colQWrappers As Collection = New Collection
Protected _GuiControl As Control
Private objLock As New Object
Protected Delegate Sub GUIUpdate(ByVal Update As Object)
Friend Property GuiControl() As Control
Get
SyncLock (objLock)
Return _GuiControl
End SyncLock
End Get
Set(ByVal value As Control)
SyncLock (objLock)
_GuiControl = value
End SyncLock
End Set
End Property
Friend ReadOnly Property colQWrappers() As Collection
Get
Return _colQWrappers
End Get
End Property
Friend Sub New()
End Sub
Friend Sub addQ(ByVal Q As baseQ_Wrapper)
_colQWrappers.Add(Q,Q.IdKey)
End Sub
Friend Overridable Sub Finish()
_colQWrappers.Clear()
_GuiControl = Nothing
End Sub
Friend Overridable Sub SendToOutQ(ByRef QItem As
IisQItem, ByVal bToAllOutQueues As Boolean, Optional ByVal
NamedQ As String = "")
Dim q As baseQ_Wrapper
If NamedQ <> "" Then
If Not _colQWrappers.Item(NamedQ) Is Nothing Then
q = _colQWrappers.Item(NamedQ)
q.Enqueue(QItem.Clone)
End If
End If
If bToAllOutQueues Then For
Each q In _colQWrappers If
q.IdKey <> NamedQ Then
q.Enqueue(QItem.Clone)
End If
Next
End If
End Sub
Friend Overridable Sub Perform(ByRef Item As IisQItem)
End Sub
Friend Overridable Sub PerformGuiUpdate(ByVal GUIUpdObj As Object)
Dim delNew As GUIUpdate
If _GuiControl Is Nothing Then Exit Sub
SyncLock (objLock)
If _GuiControl.InvokeRequired Then
delNew = New GUIUpdate(AddressOf Me.UpdateGui)
_GuiControl.Invoke(delNew, New Object() {GUIUpdObj})
Else
UpdateGui(GUIUpdObj)
End If
End SyncLock
End Sub
Friend Overridable Sub UpdateGui(ByVal GUIUpdObj As Object)
End Sub
End Class
_colQWrappers
holds the outbound queues associated to this task and its
parent threadunit. They are added to the collection by the "addQ" sub and data
is added to the queues by the "SendToOutQ" sub.
The GuiControl for the
task is added by its parent thread worker through the GuiControl property in the
task. When a GUI update is required the developers code must call the
"PerformGuiUpdate" subroutine passing it a piece of data to control the GUI Update.
This sub identifies if the current thread is the GUI thread (wouldn't that be
nice!) by checking the guicontrol's invokerequired
method. When it finds that
the code is running in a different thread, the task class creates a new
delegate based on the defined delegate GUIUpdate
.
The delegate points to the subroutine UpdateGui
in the task and is used by the Invoke
command on the passed
GUI Control. This invoke takes the passed data and marshals it to the same
thread as the GUI before calling the subroutine UpdateGui
(which must contain
instructions for updating the GUI) and passing that data. When completed the
Invoke
ends and the tasks thread continues. You may wish to use begininvoke
instead of invoke. This is a perfectly reasonable choice, except that it could
lead to many pending updates being queued on the GUI thread. The choice is
yours.
You will notice that the new sub has no passed parameters. This
was a deliberate step in order to allow the code which uses reflection to pick up
the correct constructor. This code sits in the ThreadTaskFactory
class and loads
the assembly into memory. The assembly is then interrogated in order to pull back and run the
constructor for the required class name. This is why the namespace is so
important, it allows the assembly to identify the class for reflection.
In
order for this to logic to work correctly any child of the basetask must have a new
constructor subroutine which has no parameters. If you wish parameter data then
send the data down an input queue instead.
There is a Finish sub which
you need to perform actions such as logging off any connections. The finish sub
is called by the threadunit factory as a last step after all the threads have
been aborted.
Finally, the developer must place their code in the Perform
subroutine of the inherited class. The code must process the handling of a
single data item before exiting the function. The thread worker will then handle
the input queue processing and possible call the perform function again with
different data. The code should use the above routines where possible.
Conclusion
The wrapper classes presented here are no "free lunch". They require a little bit of thought and perhaps a mindshift into queues. However, they do make threading and issues associated with threading (such as GUI Updating) easy. And, in an age of multi core processing, that may be the difference between your application running like a dog or a great deal faster.
IT consultant with over 15 years experience in the industry.
Currently working in the UK on Siebel projects and using NET for external systems integration from/to Siebel.