Click here to Skip to main content
15,897,891 members
Articles / Programming Languages / C#

.NET Interprocess Communication Revisited

Rate me:
Please Sign up or sign in to vote.
4.96/5 (30 votes)
4 Jan 2010CPOL4 min read 122.4K   2.3K   111  
The XDMessaging 2.0 library provides an easy-to-use, zero configuration alternative to existing IPC implementations.
/*=============================================================================
*
*	(C) Copyright 2007, Michael Carlisle (mike.carlisle@thecodeking.co.uk)
*
*   http://www.TheCodeKing.co.uk
*  
*	All rights reserved.
*	The code and information is provided "as-is" without waranty of any kind,
*	either expressed or implied.
*
*=============================================================================
*/
using System;
using System.Collections.Generic;
using System.Text;
using System.IO;
using System.Threading;
using System.Diagnostics;
using System.Windows.Forms;
using TheCodeKing.Net.Messaging.Concrete.MultiBroadcast;

namespace TheCodeKing.Net.Messaging.Concrete.IOStream
{
    /// <summary>
    /// A concrete implementation of IXDBroadcast which can be used to send messages across
    /// appDomain and process boundaries using file IO streams to a shared directory. Instances
    /// of XDIOStreamListener can be used to receive the messages in another process.
    /// </summary>
    internal sealed class XDIOStreamBroadcast : IXDBroadcast
    {
        /// <summary>
        /// Unique mutex key to synchronize the clean up tasks across processes.
        /// </summary>
        private const string mutexCleanUpKey = @"Global\XDIOStreamBroadcast.Cleanup";
        /// <summary>
        /// Get a list of charactors that must be stripped from a channel name folder.
        /// </summary>
        private static readonly char[] invalidChannelChars = Path.GetInvalidFileNameChars();
        /// <summary>
        /// The temporary folder where messages will be stored.
        /// </summary>
        private static readonly string temporaryFolder;
        /// <summary>
        /// The timeout period after which messages are deleted. 
        /// </summary>
        private const int fileTimeoutMilliseconds = 5000;

        /// <summary>
        /// Static constructor gets the path to the temporary directory.
        /// </summary>
        static XDIOStreamBroadcast()
        {
            temporaryFolder = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.CommonApplicationData), "XDMessaging");
        }

        /// <summary>
        /// Default constructor.
        /// </summary>
        internal XDIOStreamBroadcast()
        {
        }

        /// <summary>
        /// The implementation of IXDBroadcast, used to broadcast a new message to other processes. This creates a unique
        /// file on the filesystem. The temporary files are cleaned up after a pre-defined timeout. 
        /// </summary>
        /// <param name="channelName"></param>
        /// <param name="message"></param>
        public void SendToChannel(string channelName, string message)
        {
            if (string.IsNullOrEmpty(channelName))
            {
                throw new ArgumentNullException(channelName, "The channel name must be defined");
            }
            if (message == null)
            {
                throw new ArgumentNullException(message, "The messsage packet cannot be null");
            }
            if (string.IsNullOrEmpty(channelName))
            {
                throw new ArgumentException("The channel name may not contain the ':' character.", "channelName");
            }
            // create temporary name
            string fileName = Guid.NewGuid().ToString();
            string folder = GetChannelDirectory(channelName);
            string filePath = Path.Combine(folder, string.Concat(fileName, ".msg"));
            // write the message to the temp file, which will trigger listeners in other processes
            using (StreamWriter writer = File.CreateText(filePath))
            {
                // write out the channel name and message, this allows for invalid
                // characters in the channel name.
                writer.Write(string.Concat(channelName, ":", message));
                writer.Flush();
            }
            // return as fast as we can, leaving a clean up task
            ThreadPool.QueueUserWorkItem(CleanUpMessages, new FileInfo(filePath).Directory);
        }
        /// <summary>
        /// This method is called within a seperate thread and deletes messages that are older than
        /// the pre-defined expiry time.
        /// </summary>
        /// <param name="state"></param>
        private void CleanUpMessages(object state)
        {
            DirectoryInfo directory = state as DirectoryInfo;

            // use a mutex to ensure only one listener system wide is running
            bool createdNew = true;
            string mutexName = string.Concat(mutexCleanUpKey, ".", directory.Name);
            using (Mutex mutex = new Mutex(true, mutexName, out createdNew))
            {
                // we this thread owns the Mutex then clean up otherwise exit.
                if (createdNew)
                {
                    // wait for the specified timeout before attempting to clean directory
                    try
                    {
                        Thread.Sleep(fileTimeoutMilliseconds);
                    }
                    catch (ThreadInterruptedException) { }
                    CleanUpMessages(directory);
                    // release the mutex
                    mutex.ReleaseMutex();
                }
            }
            if (createdNew)
            {
                // after mutex release add an additional thread for cleanup in case we're the last out
                // and there are now additional files to clean
                ThreadPool.QueueUserWorkItem(CleanUpMessages, directory);
            }
        }

        /// <summary>
        /// Helper method to delete messages form the given directory older
        /// than the specified timeout.
        /// </summary>
        /// <param name="directory"></param>
        private static void CleanUpMessages(DirectoryInfo directory)
        {
            try
            {
                // check directory not deleted, don't use cached version (directory.Exists)
                if (Directory.Exists(directory.FullName))
                {
                    foreach (FileInfo file in directory.GetFiles("*.msg"))
                    {
                        // attempt to clean up all expired messages in the channel directory
                        if (file.CreationTimeUtc <= DateTime.UtcNow.AddMilliseconds(-fileTimeoutMilliseconds))
                        {
                            if (File.Exists(file.FullName))
                            {
                                try
                                {
                                    file.Delete();
                                }
                                catch (IOException) { } // the file could have been deleted by another broadcaster, retry later.
                                catch (UnauthorizedAccessException) { } // if the file is still in use retry again later.
                            }
                        }
                    }
                }
            }
            catch (IOException) { } // the file could have been deleted by another broadcaster, retry later.
            catch (UnauthorizedAccessException) { } // if the file is still in use retry again later.
        }

        /// <summary>
        /// A helper method used to determine the temporary directory location used for
        /// a particular channel. The directory is created if it does not exist.
        /// </summary>
        /// <param name="channelName"></param>
        /// <returns></returns>
        internal static string GetChannelDirectory(string channelName)
        {
            string folder = null;
            try
            {
                string channelKey = GetChannelKey(channelName);
                folder = Path.Combine(temporaryFolder, channelKey);
                if (!Directory.Exists(folder))
                {
                    Directory.CreateDirectory(folder);
                }
                return folder;
            }
            catch (PathTooLongException e)
            {
                throw new ArgumentException(string.Format("Unable to bind to channel as the name '{0}' is too long." +
                    " Try a shorter channel name.", channelName), e);
            }
            catch (UnauthorizedAccessException ue)
            {
                throw new UnauthorizedAccessException(string.Format("Unable to bind to channel '{0}' as access is denied." +
                    " Ensure the process has read/write access to the directory '{1}'.", channelName, folder), ue);
            }
            catch (IOException ie)
            {
                throw new IOException(string.Format("There was an unexpected IO error binding to channel '{0}'." +
                    " Ensure the process is unable to read/write to directory '{1}'.", channelName, folder), ie);
            }
        }

        /// <summary>
        /// Gets a channel key string associated with the channel name. This is used as the 
        /// directory name in the temporary directory, and we therefore strip out any invalid characters.
        /// </summary>
        /// <param name="channelName">The channel name for which a channel key is required.</param>
        /// <returns>The string channel key.</returns>
        internal static string GetChannelKey(string channelName)
        {
            foreach (char c in invalidChannelChars)
            {
                if (channelName.Contains(c.ToString()))
                {
                    channelName = channelName.Replace(c, '_');
                }
            }
            return channelName;
        }
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Architect
United Kingdom United Kingdom
Mike Carlisle - Technical Architect with over 20 years experience in a wide range of technologies.

@TheCodeKing

Comments and Discussions