|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Announcements
Want a new Job?
Chapters
Services
Feature Zones
|
0. IntroductionMicrosoft has provided the .NET framework various helpful thread synchronization primitives starting from monitors up to reader-writer locks. What is missing is the same on the inter-process level, as well as simple message passing mechanisms useful e.g. for service (client/server or SOA) and producer/consumer patterns on both thread and process level. I try to fill this gap with a simple self-contained framework for both inter-thread and inter-process synchronization and communication (IPC) using structures like semaphores, mailboxes, memory-mapped files, blocking channels and simple message flow controllers. The set of classes provided in this articles is also available as a library project (open source, BSD License) maintained on www.cdrnet.net/projects/threadmsg/. The ambition of this framework is as short as simple:
Note: I removed all the XML Comments of the code samples in the article to save space - check out the attached source code if you need more details about the methods and their arguments. 1. Starting with a SampleTo demonstrate how simple inter-process message passing could be, I will start with a small sample: A console application that can be started as either a reader or a writer, depending on the command line arguments. In a writer process you may enter some text and send it to a mailbox (return key), a reader displays all messages received from the mailbox. You may start as many writers and readers as you want, but every message will only be shown on exactly one reader. [Serializable]
struct Message
{
public string Text;
}
class Test
{
IMailBox mail;
public Test()
{
mail = new ProcessMailBox("TMProcessTest",1024);
}
public void RunWriter()
{
Console.WriteLine("Writer started");
Message msg;
while(true)
{
msg.Text = Console.ReadLine();
if(msg.Text.Equals("exit"))
break;
mail.Content = msg;
}
}
public void RunReader()
{
Console.WriteLine("Reader started");
while(true)
{
Message msg = (Message)mail.Content;
Console.WriteLine(msg.Text);
}
}
[STAThread]
static void Main(string[] args)
{
Test test = new Test();
if(args.Length > 0)
test.RunWriter();
else
test.RunReader();
}
}
Once a mailbox (here: However, what happens behind the scenes is a bit more complex: The messages
are transferred through one of the only remaining ways of sharing memory
between processes: memory mapped files (MMF), in our case virtual files
existing only in the system page file. Access to this file is synchronized
using two Win32 semaphores. The messages are binary serialized written to the
file, that's why the 2. Inter-Thread and Inter-Process Synchronization in the .NET WorldCommunication between threads and processes requires either shared memory or a built-in mechanism for transferring data into and out of the process/thread. In the case of shared memory, there is also a set of synchronization primitives needed to allow concurrent access. All threads in a single process share a common logical address space (the heap), but starting from Windows 2000 there's no way to share memory between processes. However, processes are allowed to read and write to the same file and the WinAPI provides various syscalls to simplify mapping files to the process' address space and to work with virtual files existing only as kernel objects ("sections") pointing to a memory block in the system page file. For both inter-thread shared heaps and inter-process shared files, concurrent access may result in data inconsistency. We discuss in short several mechanisms to ensure the orderly execution of cooperating processes or threads allowing data consistency to be maintained. 2.1 Thread Synchronization
The .NET Framework and C# offer very simple and straightforward thread
synchronization mechanisms using void Work1()
{
NonCriticalSection1();
Monitor.Enter(this);
try
{
CriticalSection();
}
finally
{
Monitor.Exit(this);
}
NonCriticalSection2();
}
void Work2()
{
NonCriticalSection1();
lock(this)
{
CriticalSection();
}
NonCriticalSection2();
}
Both 2.2 Inter-Thread Semaphores
One of the classic synchronization primitives (introduced by Edsger Dijkstra)
is the counting semaphore. Semaphores are objects with a counter and two
operations: public sealed class ThreadSemaphore : ISemaphore
{
private int counter;
private readonly int max;
public ThreadSemaphore() : this(0, int.Max) {}
public ThreadSemaphore(int initial) : this(initial, int.Max) {}
public ThreadSemaphore(int initial, int max)
{
this.counter = Math.Min(initial,max);
this.max = max;
}
public void Acquire()
{
lock(this)
{
counter--;
if(counter < 0 && !Monitor.Wait(this))
throw new SemaphoreFailedException();
}
}
public void Acquire(TimeSpan timeout)
{
lock(this)
{
counter--;
if(counter < 0 && !Monitor.Wait(this,timeout))
throw new SemaphoreFailedException();
}
}
public void Release()
{
lock(this)
{
if(counter >= max)
throw new SemaphoreFailedException();
if(counter < 0)
Monitor.Pulse(this);
counter++;
}
}
}
Semaphores are useful for more complex blocking scenarios like channels we
will discuss later. You could also use semaphores for mutual exclusion locking
of critical sections ( Please note that counting semaphores are potentially dangerous objects if not
used carefully. You are on the safe side however if you follow a basic rule: Never
call ThreadSemaphore s = new ThreadSemaphore(1);
void Work3()
{
NonCriticalSection1();
s.Acquire();
try
{
CriticalSection();
}
finally
{
s.Release();
}
NonCriticalSection2();
}
2.3 Inter-Process SemaphoresTo synchronize resource access between processes we need primitives like the ones discussed above for the process level. Unfortunately, there is no process level Monitor class available in the .NET Framework. However, the Win32 Api provides Semaphore Kernel Objects that may be used to synchronize access between processes. Robin Galloway-Lunn introduces how to map Win32 semaphores to the .NET world in "Using Win32 Semaphores in C#". Our implementation looks similar: [DllImport("kernel32",EntryPoint="CreateSemaphore",
SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern uint CreateSemaphore(
SecurityAttributes auth, int initialCount,
int maximumCount, string name);
[DllImport("kernel32",EntryPoint="WaitForSingleObject",
SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern uint WaitForSingleObject(
uint hHandle, uint dwMilliseconds);
[DllImport("kernel32",EntryPoint="ReleaseSemaphore",
SetLastError=true,CharSet=CharSet.Unicode)]
[return : MarshalAs( UnmanagedType.VariantBool )]
internal static extern bool ReleaseSemaphore(
uint hHandle, int lReleaseCount, out int lpPreviousCount);
[DllImport("kernel32",EntryPoint="CloseHandle",SetLastError=true,
CharSet=CharSet.Unicode)]
[return : MarshalAs( UnmanagedType.VariantBool )]
internal static extern bool CloseHandle(uint hHandle);
public class ProcessSemaphore : ISemaphore, IDisposable
{
private uint handle;
private readonly uint interruptReactionTime;
public ProcessSemaphore(string name) : this(
name,0,int.MaxValue,500) {}
public ProcessSemaphore(string name, int initial) : this(
name,initial,int.MaxValue,500) {}
public ProcessSemaphore(string name, int initial,
int max, int interruptReactionTime)
{
this.interruptReactionTime = (uint)interruptReactionTime;
this.handle = NTKernel.CreateSemaphore(null, initial, max, name);
if(handle == 0)
throw new SemaphoreFailedException();
}
public void Acquire()
{
while(true)
{ //looped 0.5s timeout to make NT-blocked threads interruptable.
uint res = NTKernel.WaitForSingleObject(handle,
interruptReactionTime);
try {System.Threading.Thread.Sleep(0);}
catch(System.Threading.ThreadInterruptedException e)
{
if(res == 0)
{ //Rollback
int previousCount;
NTKernel.ReleaseSemaphore(handle,1,out previousCount);
}
throw e;
}
if(res == 0)
return;
if(res != 258)
throw new SemaphoreFailedException();
}
}
public void Acquire(TimeSpan timeout)
{
uint milliseconds = (uint)timeout.TotalMilliseconds;
if(NTKernel.WaitForSingleObject(handle, milliseconds) != 0)
throw new SemaphoreFailedException();
}
public void Release()
{
int previousCount;
if(!NTKernel.ReleaseSemaphore(handle, 1, out previousCount))
throw new SemaphoreFailedException();
}
#region IDisposable Member
public void Dispose()
{
if(handle != 0)
{
if(NTKernel.CloseHandle(handle))
handle = 0;
}
}
#endregion
}
The important point is that the semaphore is named. This allows other processes to create a handle to the same semaphore just by entering the same name. To make blocked threads interruptible we use a (dirty) workaround using timeouts and Sleep(0). We need interrupt support to safely shutdown the threads. It's recommended however to release the semaphore until no more thread is blocked, allowing a clean application exit. You may also have noticed that both the inter-thread and the inter-process semaphore share the same interface. This pattern is achieved on all classes, leading to the abstraction mentioned in the introduction. Note however that for performance reasons you should NOT use inter-process implementations for inter-thread scenarios or inter-thread implementations for single thread scenarios. 3. Inter-Process Shared Memory: Memory Mapped FilesWe have seen how to synchronize access to shared resources for both threads
and processes. What is missing for transferring messages is the shared resource
itself. For threads this is just as easy as declaring a class member variable,
but for processes we need a technique called Memory Mapped Files (MMF) provided
by the Win32 API. Working with MMF is not much harder than working with the
Win32 Semaphores discussed above. What we need first is a handle of such a
mapped file using the [DllImport("Kernel32.dll",EntryPoint="CreateFileMapping",
SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern IntPtr CreateFileMapping(uint hFile,
SecurityAttributes lpAttributes, uint flProtect,
uint dwMaximumSizeHigh, uint dwMaximumSizeLow, string lpName);
[DllImport("Kernel32.dll",EntryPoint="MapViewOfFile",
SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern IntPtr MapViewOfFile(IntPtr hFileMappingObject,
uint dwDesiredAccess, uint dwFileOffsetHigh,
uint dwFileOffsetLow, uint dwNumberOfBytesToMap);
[DllImport("Kernel32.dll",EntryPoint="UnmapViewOfFile",
SetLastError=true,CharSet=CharSet.Unicode)]
[return : MarshalAs( UnmanagedType.VariantBool )]
internal static extern bool UnmapViewOfFile(IntPtr lpBaseAddress);
public static MemoryMappedFile CreateFile(string name,
FileAccess access, int size)
{
if(size < 0)
throw new ArgumentException("Size must not be negative","size");
IntPtr fileMapping = NTKernel.CreateFileMapping(0xFFFFFFFFu,null,
(uint)access,0,(uint)size,name);
if(fileMapping == IntPtr.Zero)
throw new MemoryMappingFailedException();
return new MemoryMappedFile(fileMapping,size,access);
}
We prefer virtual files directly in the system page file, so we provide -1
(0xFFFFFFFF) as the file handle to create our mapped file handle. We also
specify the required file size in bytes and a name to allow other
processes to access the same file concurrently. Having such a file,
we may map several parts (specified by offset and size in bytes) of this file
to our local address space. We do this with the public MemoryMappedFileView CreateView(int offset, int size,
MemoryMappedFileView.ViewAccess access)
{
if(this.access == FileAccess.ReadOnly && access ==
MemoryMappedFileView.ViewAccess.ReadWrite)
throw new ArgumentException(
"Only read access to views allowed on files without write access",
"access");
if(offset < 0)
throw new ArgumentException("Offset must not be negative","size");
if(size < 0)
throw new ArgumentException("Size must not be negative","size");
IntPtr mappedView = NTKernel.MapViewOfFile(fileMapping,
(uint)access,0,(uint)offset,(uint)size);
return new MemoryMappedFileView(mappedView,size,access);
}
In unsafe code, we could just take the returned pointer ( public byte ReadByte(int offset)
{
return Marshal.ReadByte(mappedView,offset);
}
public void WriteByte(byte data, int offset)
{
Marshal.WriteByte(mappedView,offset,data);
}
public int ReadInt32(int offset)
{
return Marshal.ReadInt32(mappedView,offset);
}
public void WriteInt32(int data, int offset)
{
Marshal.WriteInt32(mappedView,offset,data);
}
public void ReadBytes(byte[] data, int offset)
{
for(int i=0;i<data.Length;i++)
data[i] = Marshal.ReadByte(mappedView,offset+i);
}
public void WriteBytes(byte[] data, int offset)
{
for(int i=0;i<data.Length;i++)
Marshal.WriteByte(mappedView,offset+i,data[i]);
}
However, we want to write and read whole object trees to the file, so we need more advanced accessors with automatic binary serialization support: public object ReadDeserialize(int offset, int length)
{
byte[] binaryData = new byte[length];
ReadBytes(binaryData,offset);
System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter
= new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();
System.IO.MemoryStream ms = new System.IO.MemoryStream(
binaryData,0,length,true,true);
object data = formatter.Deserialize(ms);
ms.Close();
return data;
}
public void WriteSerialize(object data, int offset, int length)
{
System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter
= new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();
byte[] binaryData = new byte[length];
System.IO.MemoryStream ms = new System.IO.MemoryStream(
binaryData,0,length,true,true);
formatter.Serialize(ms,data);
ms.Flush();
ms.Close();
WriteBytes(binaryData,offset);
}
Please note that serialized size of the object should not exceed the view size. The serialized size is always bigger than the size of the object itself. I have not tried binding the memory stream directly to the mapped view instead of the byte array but that should work too, probably even with a small performance gain. 4. Mailbox: Passing Messages between Threads and ProcessesA mailbox has nothing to do with neither email nor NT Mailslots. It is a safe shared memory structure that can hold only one object. The content is read and written through a property. If the mailbox does not hold an object, a thread reading the content is blocked until another thread writes some content. If it already holds content, a thread trying to write to it is blocked until another thread reads the content first. The content can only be read once - its reference is automatically removed after reading. We've developed above all we need to build such mailboxes. 4.1 Inter-Thread MailboxA mailbox is very easy to build using two semaphores: one is signaled when the box is empty, the other when it is full. To read from the mailbox one first waits until the mailbox is full and signals the empty semaphore after reading. To write one needs to wait until it's empty and signals the full semaphore after writing. Note that the empty semaphore is signaled at the beginning. public sealed class ThreadMailBox : IMailBox
{
private object content;
private ThreadSemaphore empty, full;
public ThreadMailBox()
{
empty = new ThreadSemaphore(1,1);
full = new ThreadSemaphore(0,1);
}
public object Content
{
get
{
full.Acquire();
object item = content;
empty.Release();
return item;
}
set
{
empty.Acquire();
content = value;
full.Release();
}
}
}
4.2 Inter-Process MailboxThe inter-process version is nearly as simple as the inter-thread
implementation. The only difference is that we now use inter-process semaphores
and that we read and write to a memory mapped file instead of a simple class
member variable. As serialization could fail, we provide a small rollback
exception handler to undo any changes made to the mailbox state. There are many
possible error sources (invalid handles, access denied, file size,
public sealed class ProcessMailBox : IMailBox, IDisposable
{
private MemoryMappedFile file;
private MemoryMappedFileView view;
private ProcessSemaphore empty, full;
public ProcessMailBox(string name,int size)
{
empty = new ProcessSemaphore(name+".EmptySemaphore.MailBox",1,1);
full = new ProcessSemaphore(name+".FullSemaphore.MailBox",0,1);
file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.MailBox",
MemoryMappedFile.FileAccess.ReadWrite,size);
view = file.CreateView(0,size,
MemoryMappedFileView.ViewAccess.ReadWrite);
}
public object Content
{
get
{
full.Acquire();
object item;
try {item = view.ReadDeserialize();}
catch(Exception e)
{ //Rollback
full.Release();
throw e;
}
empty.Release();
return item;
}
set
{
empty.Acquire();
try {view.WriteSerialize(value);}
catch(Exception e)
{ //Rollback
empty.Release();
throw e;
}
full.Release();
}
}
#region IDisposable Member
public void Dispose()
{
view.Dispose();
file.Dispose();
empty.Dispose();
full.Dispose();
}
#endregion
}
Now we have all the tools needed for the IPC message-passing sample in the
beginning of the article. You may want to scroll back to the sample as it
demonstrates how the 5. Channels: Queued Message TransferAn important point of mailboxes is that they can hold only one object at once. If a worker in a long processing chain (connected with mailboxes) needs a bit more time than usual for a special command, the whole chain is blocked immediately. Often it's more favorable to have buffered message-passing channels where you can pick out incoming message whenever you've got time left without blocking the sender. Such buffering is provided by channels, an alternative to the simpler mailboxes. Again, we will discuss both an inter-thread and an inter-process implementation. 5.1 ReliabilityAnother important difference between mailboxes and channels is that channels
have some reliability features and for example automatically dump messages
failed to send to the queue (because of a thread interrupt while waiting for a
lock) to an internal dump container. This means that channel-processing threads
can safely be shutdown without loosing any messages. This is maintained by two
abstract classes, 5.2 Inter-Thread ChannelThe inter-thread channel is based on the mailbox but uses a synchronized queue instead of a variable as a message buffer. Thanks to the counting semaphore model the channel blocks receiving if the queue is empty and blocks sending if the queue is full. You cannot run into any enqueue/dequeue failures. We achieve this by initializing the empty semaphore with the channel size and the full semaphore with zero. If a thread sending a message is interrupted while being blocked in the empty semaphore we copy the message to the dump container and let the exception propagate. No dumping is required in the receive method as you won't loose any message when being interrupted there. Note that a thread can only be interrupted while being blocked, that is when calling Aquire() on a semaphore. public sealed class ThreadChannel : ThreadReliability, IChannel
{
private Queue queue;
private ThreadSemaphore empty, full;
public ThreadChannel(int size)
{
queue = Queue.Synchronized(new Queue(size));
empty = new ThreadSemaphore(size,size);
full = new ThreadSemaphore(0,size);
}
public void Send(object item)
{
try {empty.Acquire();}
catch(System.Threading.ThreadInterruptedException e)
{
DumpItem(item);
throw e;
}
queue.Enqueue(item);
full.Release();
}
public void Send(object item, TimeSpan timeout)
{
try {empty.Acquire(timeout);}
...
}
public object Receive()
{
full.Acquire();
object item = queue.Dequeue();
empty.Release();
return item;
}
public object Receive(TimeSpan timeout)
{
full.Acquire(timeout);
...
}
protected override void DumpStructure()
{
lock(queue.SyncRoot)
{
foreach(object item in queue)
DumpItem(item);
queue.Clear();
}
}
}
5.3 Inter-Process Channel
Building an inter-process channel is a bit harder as you first need a way to
provide a buffer. Possible solutions could be to use an inter-process mailbox
and queue the send or receive methods depending on the required behavior. To
avoid several drawbacks of this solution we'll implement a queue directly in
the memory-mapped file instead. A first class, public sealed class ProcessChannel : ProcessReliability, IChannel, IDisposable
{
private MemoryMappedFile file;
private MemoryMappedFileView view;
private MemoryMappedQueue queue;
private ProcessSemaphore empty, full, mutex;
public ProcessChannel( int size, string name, int maxBytesPerEntry)
{
int fileSize = 64+size*maxBytesPerEntry;
empty = new ProcessSemaphore(name+".EmptySemaphore.Channel",size,size);
full = new ProcessSemaphore(name+".FullSemaphore.Channel",0,size);
mutex = new ProcessSemaphore(name+".MutexSemaphore.Channel",1,1);
file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.Channel",
MemoryMappedFile.FileAccess.ReadWrite,fileSize);
view = file.CreateView(0,fileSize,
MemoryMappedFileView.ViewAccess.ReadWrite);
queue = new MemoryMappedQueue(view,size,maxBytesPerEntry,true,0);
if(queue.Length < size || queue.BytesPerEntry < maxBytesPerEntry)
throw new MemoryMappedArrayFailedException();
}
public void Send(object item)
{
try {empty.Acquire();}
catch(System.Threading.ThreadInterruptedException e)
{
DumpItemSynchronized(item);
throw e;
}
try {mutex.Acquire();}
catch(System.Threading.ThreadInterruptedException e)
{
DumpItemSynchronized(item);
empty.Release();
throw e;
}
queue.Enqueue();
try {queue.WriteSerialize(item,0);}
catch(Exception e)
{
queue.RollbackEnqueue();
mutex.Release();
empty.Release();
throw e;
}
mutex.Release();
full.Release();
}
public void Send(object item, TimeSpan timeout)
{
try {empty.Acquire(timeout);}
...
}
public object Receive()
{
full.Acquire();
mutex.Acquire();
object item;
queue.Dequeue();
try {item = queue.ReadDeserialize(0);}
catch(Exception e)
{
queue.RollbackDequeue();
mutex.Release();
full.Release();
throw e;
}
mutex.Release();
empty.Release();
return item;
}
public object Receive(TimeSpan timeout)
{
full.Acquire(timeout);
...
}
protected override void DumpStructure()
{
mutex.Acquire();
byte[][] dmp = queue.DumpClearAll();
for(int i=0;i<dmp.Length;i++)
DumpItemSynchronized(dmp[i]);
mutex.Release();
}
#region IDisposable Member
public void Dispose()
{
view.Dispose();
file.Dispose();
empty.Dispose();
full.Dispose();
mutex.Dispose();
}
#endregion
}
6. Message RoutingWe've discussed how to synchronize resource access and how to pass messages
between threads and processes using mailboxes or channels. When working with
blocking channels, you may run into problems e.g. when you need to listen to
more than one channel in the same thread. To solve such situations, there are
some small class modules available: A channel forwarder, a multiplexer and
demultiplexer and a channel event gateway. You may define your own channel
processors the same way using the simple 6.1 Channel ForwarderA channel forwarder does nothing more but listening on a channel and forwarding received messages to another channel. If needed, the forwarder may put each received message into an envelope marked with a constant number before forwarding (this feature is used in the multiplexer, see below). public class ChannelForwarder : SingleRunnable
{
private IChannel source, target;
private readonly int envelope;
public ChannelForwarder(IChannel source,
IChannel target, bool autoStart, bool waitOnStop)
: base(true,autoStart,waitOnStop)
{
this.source = source;
this.target = target;
this.envelope = -1;
}
public ChannelForwarder(IChannel source, IChannel target,
int envelope, bool autoStart, bool waitOnStop)
: base(true,autoStart,waitOnStop)
{
this.source = source;
this.target = target;
this.envelope = envelope;
}
protected override void Run()
{ //NOTE: IChannel.Send is interrupt save and
//automatically dumps the argument.
if(envelope == -1)
while(running)
target.Send(source.Receive());
else
{
MessageEnvelope env;
env.ID = envelope;
while(running)
{
env.Message = source.Receive();
target.Send(env);
}
}
}
}
6.2 Channel Multiplexer and DemultiplexerA multiplexer listens on several input channels and forwards every received message (in an envelope to identify the input channel) to a common output channel. This may be used to listen to multiple channels at once. A demultiplexer on the other hand listens on a common input channel and forwards them to one of several output channels depending on the message envelope. public class ChannelMultiplexer : MultiRunnable
{
private ChannelForwarder[] forwarders;
public ChannelMultiplexer(IChannel[] channels, int[] ids,
IChannel output, bool autoStart, bool waitOnStop)
{
int count = channels.Length;
if(count != ids.Length)
throw new ArgumentException("Channel and ID count mismatch.","ids");
forwarders = new ChannelForwarder[count];
for(int i=0;i<count;i++)
forwarders[i] = new ChannelForwarder(channels[i],
output,ids[i],autoStart,waitOnStop);
SetRunnables((SingleRunnable[])forwarders);
}
}
public class ChannelDemultiplexer : SingleRunnable
{
private HybridDictionary dictionary;
private IChannel input;
public ChannelDemultiplexer(IChannel[] channels, int[] ids,
IChannel input, bool autoStart, bool waitOnStop)
: base(true,autoStart,waitOnStop)
{
this.input = input;
int count = channels.Length;
if(count != ids.Length)
throw new ArgumentException("Channel and ID count mismatch.","ids");
dictionary = new HybridDictionary(count,true);
for(int i=0;i<count;i++)
dictionary.add(ids[i],channels[i]);
}
protected override void Run()
{ //NOTE: IChannel.Send is interrupt save and
//automatically dumps the argument.
while(running)
{
MessageEnvelope env = (MessageEnvelope)input.Receive();
IChannel channel = (IChannel)dictionary[env.ID];
channel.send(env.Message);
}
}
}
6.3 Channel Event GatewayThe channel event gateway receives messages from a channel and fires an event
for each message received. This class may be useful for event-oriented
applications like GUIs, or to initialize minor activities using the system
public class ChannelEventGateway : SingleRunnable
{
private IChannel source;
public event MessageReceivedEventHandler MessageReceived;
public ChannelEventGateway(IChannel source, bool autoStart,
bool waitOnStop) : base(true,autoStart,waitOnStop)
{
this.source = source;
}
protected override void Run()
{
while(running)
{
object c = source.Receive();
MessageReceivedEventHandler handler = MessageReceived;
if(handler != null)
handler(this,new MessageReceivedEventArgs(c));
}
}
}
7. The Pizza Drive-in DemoThat's it; we have discussed the most important structures and techniques of the framework (others classes like the Rendezvous and Barrier implementations are ignored in this article). We end this article the same way we began it: with a demonstration. This time we have a look at a small pizza drive-in simulation. The screen shot at the top of this page shows this simulation in action: four parallel processes talking to each other. The diagram below shows how data/messages flow between the four processes using inter-process channels and inside of the processes using faster inter-thread channels and mailboxes.
To set the ball rolling, a customer orders a pizza and something to drink. He
does this with a method call in the customer interface that posts an To run this demo, open 4 command shells (cmd.exe) and start as many cooks as you want using "PizzaDemo.exe cook", the backend using "PizzaDemo.exe backend" and the facade process with the customer interface using "PizzaDemo.exe facade" (replace 'PizzaDemo' with the name of your assembly). Note that some threads (like the cooks) sleep some seconds each time to increase reality. Press return to stop and exit a process. If you press enter while still in action you'll see that some messages are dumped in the dump report at the end. In a real world application, the dump container would be stored to disk. The demo uses several mechanisms introduced in this article. For example, the
cashier loads a 8. ConclusionWe've discussed how service oriented architecture could be built and how inter-process communication and synchronization could be implemented in C#. However, this is not the only solution for such problems. For example, using that many threads could be a serious problem in bigger projects. Completely missing are transaction support and alternative channel/mailbox implementations like named pipes and TCP sockets. There may also be some flaws in the architecture, please let me know. 9. References
10. History
| |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||