Click here to Skip to main content
15,881,757 members
Articles / Programming Languages / C#

.NET Interprocess Communication Revisited

Rate me:
Please Sign up or sign in to vote.
4.96/5 (30 votes)
4 Jan 2010CPOL4 min read 121.4K   2.3K   111  
The XDMessaging 2.0 library provides an easy-to-use, zero configuration alternative to existing IPC implementations.
/*=============================================================================
*
*	(C) Copyright 2007, Michael Carlisle (mike.carlisle@thecodeking.co.uk)
*
*   http://www.TheCodeKing.co.uk
*  
*	All rights reserved.
*	The code and information is provided "as-is" without waranty of any kind,
*	either expresed or implied.
*
*-----------------------------------------------------------------------------
*	History:
*		11/02/2007	Michael Carlisle				Version 1.0
*       08/09/2007  Michael Carlisle                Version 1.1
*       12/12/2009  Michael Carlisle                Version 2.0
 *                  Added XDIOStream implementation which can be used from Windows Services.
*=============================================================================
*/
using System;
using System.Collections.Generic;
using System.Text;
using System.IO;
using System.Threading;
using System.Diagnostics;
using System.Windows.Forms;

namespace TheCodeKing.Net.Messaging.Concrete.IOStream
{
    /// <summary>
    /// A concrete implementation of IXDListener and IXDBroadcast which can be used to send and recieve messages across
    /// appDomain and process boundaries using file IO streams to a shared directory. Applications may leverage this 
    /// instance to register listeners on pseudo 'channels', and receive messages sent from all processes using this
    /// IXDBroadcast implementation within the same machine.
    /// </summary>
    internal class XDIOStream : IXDBroadcast, IXDListener
    {
        // Flag as to whether dispose has been called
        private bool disposed = false;
        /// <summary>
        /// Unique mutex key to synchronize the clean up tasks across processes.
        /// </summary>
        private const string MutexCleanUpKey = "TheCodeKing.Net.XDServices.IOStream.Cleaner";
        /// <summary>
        /// Get a list of charactors that must be stripped from a channel name folder.
        /// </summary>
        private static readonly char[] invalidChannelChars = Path.GetInvalidFileNameChars();
        /// <summary>
        /// The timeout period after which messages are deleted. 
        /// </summary>
        private const int fileTimeoutMilliseconds = 5000;
        /// <summary>
        /// The temporary folder where messages will be stored.
        /// </summary>
        private readonly string tempFolder;
        /// <summary>
        /// A list of FileSystemWatcher instances used for each registered channel.
        /// </summary>
        private Dictionary<string, FileSystemWatcher> watcherList;
        /// <summary>
        /// A lock object used to ensure changes to watcherList are thread-safe.
        /// </summary>
        private object lockObj = new object();

        /// <summary>
        /// The constructor used to create a cnew instance of XDIOStream. Instances are created using
        /// factory methods on the XDListener or XDBroadcast classes.
        /// </summary>
        internal XDIOStream()
        {
            this.tempFolder = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.CommonApplicationData), "XDMessaging");
            this.watcherList = new Dictionary<string, FileSystemWatcher>(StringComparer.InvariantCultureIgnoreCase);
        }

        /// <summary>
        /// The implementation of IXDBroadcast, used to broadcast a new message to other processes. This creates a unique
        /// file on the filesystem. The temporary files are cleaned up after a pre-defined timeout. 
        /// </summary>
        /// <param name="channelName"></param>
        /// <param name="message"></param>
        public void SendToChannel(string channelName, string message)
        {
            if (string.IsNullOrEmpty(channelName))
            {
                throw new ArgumentNullException(channelName, "The channel name must be defined");
            }
            if (message == null)
            {
                throw new ArgumentNullException(message, "The messsage packet cannot be null");
            }
            // create temporary name
            string fileName = Guid.NewGuid().ToString();
            string folder = GetChannelDirectory(channelName);
            string filePath = Path.Combine(folder, string.Concat(fileName, ".msg"));
            // write the message to the temp file, which will trigger listeners in other processes
            using (StreamWriter writer = File.CreateText(filePath))
            {
                // write out the channel name and message, this allows for invalid
                // charators in the channel name.
                writer.Write(string.Concat(channelName, ":", message));
                writer.Flush();
            }
            // return as fast as we can, leaving a clean up task
            ThreadPool.QueueUserWorkItem(CleanUp, new FileInfo(filePath).Directory);
        }
        /// <summary>
        /// This method is called within a seperate thread and deletes messages that are older than
        /// the pre-defined expiry time.
        /// </summary>
        /// <param name="state"></param>
        private void CleanUp(object state)
        {
            DirectoryInfo directory = state as DirectoryInfo;
            // use a mutex to ensure only one listener system wide is running
            bool createdNew = true;
            using (Mutex mutex = new Mutex(true, MutexCleanUpKey, out createdNew))
            {
                if (createdNew)
                {
                    try
                    {
                        // wait for the specified timeout before attempting to clean directory
                        Thread.Sleep(fileTimeoutMilliseconds);
                        // check directory not deleted, don't use cached version (directory.Exists)
                        if (Directory.Exists(directory.FullName))
                        {
                            foreach (FileInfo file in directory.GetFiles("*.msg"))
                            {
                                // attempt to clean up all expired messages in the channel directory
                                if (file.CreationTimeUtc <= DateTime.UtcNow.AddMilliseconds(-fileTimeoutMilliseconds))
                                {
                                    if (File.Exists(file.FullName))
                                    {
                                        try
                                        {
                                            file.Delete();
                                        }
                                        catch (IOException) { } // the file could have been deleted by another broadcaster, retry later.
                                        catch (UnauthorizedAccessException) { } // if the file is still in use retry again later.
                                    }
                                }
                            }
                        }
                    }
                    catch (IOException) { } // the file could have been deleted by another broadcaster, retry later.
                    catch (UnauthorizedAccessException) { } // if the file is still in use retry again later.
                }
            }
            if (createdNew)
            {
                // after mutex release add an additional thread, in case we're the last out to finalize cleanup
                ThreadPool.QueueUserWorkItem(CleanUp, directory);
            }
        }

        /// <summary>
        /// The MessageReceived event used to broadcast the message to attached instances within the current appDomain.
        /// </summary>
        public event XDListener.XDMessageHandler MessageReceived;

        /// <summary>
        /// Sets up a new FileSystemWatcher so that messages can be received on a particular 'channel'.
        /// </summary>
        /// <param name="channelName"></param>
        public void RegisterChannel(string channelName)
        {
            if (string.IsNullOrEmpty(channelName))
            {
                throw new ArgumentNullException(channelName, "The channel name cannot be null or empty.");
            }
            if (disposed)
            {
                throw new ObjectDisposedException("IXDListener", "This instance has been disposed.");
            }
            FileSystemWatcher watcher = EnsureWatcher(channelName);
            watcher.EnableRaisingEvents = true;
        }
        /// <summary>
        /// Disables any FileSystemWatcher for a particular channel so that messages are no longer received.
        /// </summary>
        /// <param name="channelName"></param>
        public void UnRegisterChannel(string channelName)
        {
            if (string.IsNullOrEmpty(channelName))
            {
                throw new ArgumentNullException(channelName, "The channel name cannot be null or empty.");
            }
            if (disposed)
            {
                throw new ObjectDisposedException("IXDListener", "This instance has been disposed.");
            }
            FileSystemWatcher watcher = EnsureWatcher(channelName);
            watcher.EnableRaisingEvents = false;
        }

        /// <summary>
        /// Provides a thread safe method to lookup/create a instance of FileSystemWatcher for a particular channel.
        /// </summary>
        /// <param name="channelName"></param>
        /// <returns></returns>
        private FileSystemWatcher EnsureWatcher(string channelName)
        {
            FileSystemWatcher watcher = null;
            // try to get a reference to the watcher used for the current watcher
            if (!watcherList.TryGetValue(channelName, out watcher))
            {
                // if no watcher then lock the list
                lock (lockObj)
                {
                    // whilst locked double check if the item has been added since the lock was applied
                    if (!watcherList.TryGetValue(channelName, out watcher))
                    {
                        // create a new watcher for the given channel, by default this is not enabled.
                        string folder = GetChannelDirectory(channelName);
                        watcher = new FileSystemWatcher(folder, "*.msg");
                        watcher.NotifyFilter = NotifyFilters.CreationTime | NotifyFilters.LastWrite;
                        watcher.Changed += new FileSystemEventHandler(OnMessageReceived);
                        watcherList.Add(channelName, watcher);
                    }
                }
            }
            return watcher;
        }

        /// <summary>
        /// A helper method used to determine the temporary directory location used for
        /// a particular channel. The directory is created if it does not exist.
        /// </summary>
        /// <param name="channelName"></param>
        /// <returns></returns>
        private string GetChannelDirectory(string channelName)
        {
            string folder = null;
            try
            {
                string channelKey = GetChannelKey(channelName);
                folder = Path.Combine(tempFolder, channelKey);
                if (!Directory.Exists(folder))
                {
                    Directory.CreateDirectory(folder);
                }
                return folder;
            }
            catch (PathTooLongException e)
            {
                throw new ArgumentException(string.Format("Unable to bind to channel as the name '{0}' is too long." +
                    " Try a shorter channel name.", channelName), e);
            }
            catch (UnauthorizedAccessException ue)
            {
                throw new UnauthorizedAccessException(string.Format("Unable to bind to channel '{0}' as access is denied." +
                    " Ensure the process has read/write access to the directory '{1}'.", channelName, folder), ue);
            }
            catch (IOException ie)
            {
                throw new IOException(string.Format("There was an unexpected IO error binding to channel '{0}'." +
                    " Ensure the process is unable to read/write to directory '{1}'.", channelName, folder), ie);
            }
        }

        /// <summary>
        /// The FileSystemWatcher event that is triggered when a new file is created in the channel temporary
        /// directory. This dispatches the MessageReceived event.
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void OnMessageReceived(object sender, FileSystemEventArgs e)
        {
            // if a new file is added to the channel directory
            if (e.ChangeType == WatcherChangeTypes.Changed)
            {
                try 
                {
                    // check if file exists
                    if (File.Exists(e.FullPath))
                    {
                        string rawmessage = null;
                        // try to load the file in shared access mode
                        using (FileStream stream = File.Open(e.FullPath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
                        {
                            using (StreamReader reader = new StreamReader(stream))
                            {
                                rawmessage = reader.ReadToEnd();
                            }
                        }
                        // if the message contains valid data
                        if (!string.IsNullOrEmpty(rawmessage) && rawmessage.Contains(":"))
                        {
                            // extract the channel name and message data
                            string[] parts = rawmessage.Split(new[]{':'}, 2);
                            string message = parts[1];
                            string channel = parts[0];
                            if (MessageReceived != null)
                            {
                                using (DataGram dataGram = new DataGram(channel, message))
                                {
                                    // dispatch the message received event
                                     MessageReceived(this, new XDMessageEventArgs(dataGram));
                                }
                            }
                        }
                    }
                }
                catch (FileNotFoundException) 
                {
                    // if for any reason the file was deleted before the message could be read from the file,
                    // then can safely ignore this message
                }
                catch (UnauthorizedAccessException ue)
                {
                    throw new UnauthorizedAccessException(string.Format("Unable to bind to channel as access is denied." +
                        " Ensure the process has read/write access to the directory '{0}'.", e.FullPath), ue);
                }
                catch (IOException ie)
                {
                    throw new IOException(string.Format("There was an unexpected IO error binding to a channel." +
                        " Ensure the process is unable to read/write to directory '{0}'.", e.FullPath), ie);
                }
            }
        }
        /// <summary>
        /// Gets a channel key string associated with the channel name. This is used as the 
        /// directory name in the temporary directory, and we therefore strip out any invalid characters.
        /// </summary>
        /// <param name="channelName">The channel name for which a channel key is required.</param>
        /// <returns>The string channel key.</returns>
        internal static string GetChannelKey(string channelName)
        {
            foreach (char c in invalidChannelChars)
            {
                if (channelName.Contains(c.ToString()))
                {
                    channelName = channelName.Replace(c, '_');
                }
            }
            return channelName;
        }

        /// <summary>
        /// Deconstructor, cleans unmanaged resources only
        /// </summary>
        ~XDIOStream()
        {
            Dispose(false);
        }

        /// <summary>
        /// Dispose implementation which ensures all FileSystemWatchers
        /// are shut down and handlers detatched.
        /// </summary>
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
        /// <summary>
        /// Dispose implementation, which ensures the native window is destroyed
        /// </summary>
        private void Dispose(bool disposeManaged)
        {
            if (!disposed)
            {
                disposed = true;
                if (disposeManaged)
                {
                    if (MessageReceived != null)
                    {
                        // remove all handlers
                        Delegate[] del = MessageReceived.GetInvocationList();
                        foreach (TheCodeKing.Net.Messaging.XDListener.XDMessageHandler msg in del)
                        {
                            MessageReceived -= msg;
                        }
                    }
                    if (watcherList != null)
                    {
                        // shut down watchers
                        foreach (FileSystemWatcher watcher in watcherList.Values)
                        {
                            watcher.EnableRaisingEvents = false;
                            watcher.Changed -= new FileSystemEventHandler(OnMessageReceived);
                            watcher.Dispose();
                        }
                        watcherList.Clear();
                        watcherList = null;
                    }
                }
            }
        }
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Architect
United Kingdom United Kingdom
Mike Carlisle - Technical Architect with over 20 years experience in a wide range of technologies.

@TheCodeKing

Comments and Discussions