Click here to Skip to main content
15,881,424 members
Articles / Programming Languages / C#

Topic-based Publish/Subscribe design pattern implementation in C# - Part I (Using socket programming)

Rate me:
Please Sign up or sign in to vote.
4.94/5 (28 votes)
23 Mar 2009CPOL16 min read 132.7K   3.2K   81  
Implementation of a topic based Publish Subscribe design pattern using socket programming, and a proprietary messaging protocol.
using System;
using System.Collections.Generic;
using System.Text;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Windows.Forms;

namespace PubSubServer
{
    class PublisherService
    {

        public void StartPublisherService()
        {
            Thread th = new Thread(new ThreadStart(HostPublisherService));
            th.IsBackground = true;
            th.Start();
        }

        private void HostPublisherService()
        {
            IPAddress ipV4 = IPAddress.Parse("127.0.0.1");// ReturnMachineIP(); if you need machine ip then use this method.            
            IPEndPoint localEP = new IPEndPoint(ipV4, 10002);
            Socket server = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
            server.Bind(localEP);

            StartListening(server);

        }

        private static IPAddress ReturnMachineIP()
        {
            String hostName = Dns.GetHostName();
            IPHostEntry ipEntry = Dns.GetHostEntry(hostName);
            IPAddress[] addr = ipEntry.AddressList;
            IPAddress ipV4 = null;
            foreach (IPAddress item in addr)
            {
                if (item.AddressFamily == AddressFamily.InterNetwork)
                {
                    ipV4 = item;
                    break;
                }

            }
            if (ipV4 == null)
            {
                MessageBox.Show("You have no IP of Version 4.Server can not run witout it");
                Application.Exit();
            }
            return ipV4;
        }

        private static void StartListening(Socket server)
        {
            EndPoint remoteEP = new IPEndPoint(IPAddress.Any, 0);
            int recv = 0;
            byte[] data = new byte[1024];
            while (true)
            {
                try
                {
                    recv = 0;
                    data = new byte[1024];
                    recv = server.ReceiveFrom(data, ref remoteEP);
                    string messageSendFromClient = Encoding.ASCII.GetString(data, 0, recv);
                    string[] messageParts = messageSendFromClient.Split(",".ToCharArray());
                    String command = messageParts[0];
                    String topicName = messageParts[1];
                    if (!string.IsNullOrEmpty(command))
                    {
                        if (messageParts[0] == "Publish")
                        {
                            if (!string.IsNullOrEmpty(topicName))
                            {
                                List<string> eventParts = new List<string>(messageParts);
                                eventParts.RemoveRange(0, 1);
                                string message = MakeCommaSeparatedString(eventParts);
                                List<EndPoint> subscriberListForThisTopic = Filter.GetSubscribers(topicName);
                                WorkerThreadParameters workerThreadParameters = new WorkerThreadParameters();
                                workerThreadParameters.Server = server;
                                workerThreadParameters.Message = message;
                                workerThreadParameters.SubscriberListForThisTopic = subscriberListForThisTopic;

                                ThreadPool.QueueUserWorkItem(new WaitCallback(Publish), workerThreadParameters);


                            }

                        }
                    }
                }
                catch
                { }

            }
        }

        public static void Publish(object stateInfo)
        {
            WorkerThreadParameters workerThreadParameters = (WorkerThreadParameters)stateInfo;
            Socket server = workerThreadParameters.Server;
            string message = workerThreadParameters.Message;
            List<EndPoint> subscriberListForThisTopic = workerThreadParameters.SubscriberListForThisTopic;
            int messagelength = message.Length;

            if (subscriberListForThisTopic != null)
            {
                foreach (EndPoint endPoint in subscriberListForThisTopic)
                {
                    server.SendTo(Encoding.ASCII.GetBytes(message), messagelength, SocketFlags.None, endPoint);

                }
            }
        }

        private static string MakeCommaSeparatedString(List<string> eventParts)
        {
            string message = string.Empty;
            foreach (string item in eventParts)
            {
                message = message + item + ",";

            }
            if (message.Length != 0)
            {
                message = message.Remove(message.Length - 1, 1);
            }
            return message;
        }
    }
    class WorkerThreadParameters
    {
        Socket _server;

        public Socket Server
        {
            get { return _server; }
            set { _server = value; }
        }
        string _message;

        public string Message
        {
            get { return _message; }
            set { _message = value; }
        }
        List<EndPoint> _subscriberListForThisTopic;

        public List<EndPoint> SubscriberListForThisTopic
        {
            get { return _subscriberListForThisTopic; }
            set { _subscriberListForThisTopic = value; }
        }
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior) CP
Australia Australia
I am an Independent Contractor in Brisbane, Australia. For me, programming is a passion first, a hobby second, and a career third.

My Blog: http://weblogs.asp.net/razan/






Comments and Discussions