Click here to Skip to main content
15,881,877 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
See more:
I have a WCF Service (using a DuplexChannel Factory) in a Publisher Subscriber scenario. The WCF service is running in a Windows Service that receives data and then pushes that data to clients. My problem is the ServiceHost tends to crash when it cannot push the data any more due to a client fault (Communication Object Aborted), or during debugging etc. I am trying to just Send and FORGET.. but the Duplex Contract seems to want to get responses regardless of IsOneWay=true, and ALL of my methods have void for return types!
The Channel and Behaviors are all done programmatically so I can encapsulate them and use over and over again.
Here is a basic sample - sorry it is a lot of code.

Is there a better way to accomplish this - I am open to it (no 3rd party libraries though) ,

C#
using System;
using System.Collections.Generic;
using System.Net.Security;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.ServiceModel.Description;

namespace WCF.TEST
{
    public class WCFBASE
    {
        public NetTcpBinding SetNetTCPBinding()
        {
            NetTcpBinding binding = new NetTcpBinding(SecurityMode.Transport);
            binding.Security.Message.ClientCredentialType = MessageCredentialType.Windows;
            binding.Security.Mode = SecurityMode.Transport;
            binding.Security.Transport.ClientCredentialType = TcpClientCredentialType.Windows;
            binding.Security.Transport.ProtectionLevel = ProtectionLevel.EncryptAndSign;
            binding.ListenBacklog = 1000;
            binding.MaxConnections = 30;
            binding.MaxBufferSize = Int32.MaxValue;
            binding.MaxReceivedMessageSize = Int32.MaxValue; // 2147483647;
            binding.ReaderQuotas.MaxStringContentLength = Int32.MaxValue; // 2147483647;
            binding.ReaderQuotas.MaxArrayLength = Int32.MaxValue; // 2147483647;
            binding.ReaderQuotas.MaxNameTableCharCount = Int32.MaxValue; // 2147483647;

            binding.SendTimeout = TimeSpan.MaxValue;

            binding.ReceiveTimeout = TimeSpan.FromSeconds(10 * 60);            binding.ReliableSession.Ordered = true;

            binding.ReliableSession.Enabled = false;
              binding.ReliableSession.InactivityTimeout = TimeSpan.MaxValue;

            return binding;

        }

        public Binding SetMexTCPBinding()
        {
            Binding mexBinding = MetadataExchangeBindings.CreateMexTcpBinding();
            mexBinding.Name = "MexTcpBindingEndpoint";
            mexBinding.Namespace = "http://sportit.org/mex";
            mexBinding.OpenTimeout = TimeSpan.FromSeconds(3);
            mexBinding.CloseTimeout = TimeSpan.FromSeconds(3);
            mexBinding.SendTimeout = TimeSpan.FromSeconds(3);
            return mexBinding;
        }

        public ServiceMetadataBehavior SetServiceMetaBehavior(string EndPointAddress)
        {
            if (string.IsNullOrEmpty(EndPointAddress))
            {
                EndPointAddress = "http://localhost:9600/";
            }

            Uri BaseAddress = new Uri(EndPointAddress);
            ServiceMetadataBehavior smb = new ServiceMetadataBehavior();
            smb.HttpGetEnabled = false;
            smb.HttpGetUrl = new Uri(EndPointAddress);
            smb.HttpsGetEnabled = false;
            return smb;
        }

    }

    [ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]
    public class WCFSERVER : WCFBASE, WCF.TEST.IPublisher
    {
        public void Subscribe()
        {
            try
            {
                lock (wcfPublishLocker)
                {

                    IDataCallback callback = OperationContext.Current.GetCallbackChannel<IDataCallback>();
                    if (!subscribers.Contains(callback))
                        subscribers.Add(callback);

                }
            }
            catch(Exception e)
            {
                Utility.EventArgs<Exception> exEvent = new Utility.EventArgs<Exception>(e);
                OnServerException(exEvent);
            }
        }

        public void Unsubscribe()
        {
            try
            {
                lock (wcfPublishLocker)
                {
                    IDataCallback callback = OperationContext.Current.GetCallbackChannel<IDataCallback>();
                    if (subscribers.Contains(callback))
                        subscribers.Remove(callback);

                }
            }
            catch(Exception ex)
            {
                Utility.EventArgs<Exception> exEvent = new Utility.EventArgs<Exception>(ex);
                OnServerException(exEvent);
            }
        }

        public WCFSERVER()
        {
        }

        private static List<IDataCallback> _subscribers = new List<IDataCallback>();

        /// <summary>
        /// Collection of Subscribers
        /// </summary>
        public static List<IDataCallback> subscribers
        {
            get
            {
                return _subscribers;
            }
            set
            {
                _subscribers = value;
            }
        }

        private ServiceHost _host = null;

        /// <summary>
        /// The WCF Service Host
        /// </summary>
        public ServiceHost host
        {
            get
            {
                return _host;
            }
            set
            {
                _host = value;
            }
        }

        object wcfPublishLocker = new object();

        public void Connect()
        {

                host = new ServiceHost(typeof(WCFSERVER), new Uri("net.tcp://localhost:9600"));


                host.AddServiceEndpoint(typeof(IPublisher), SetNetTCPBinding(),"ISubscriber");

                EndpointAddress mexeap = new EndpointAddress(new Uri("net.tcp://localhost:9620/mex"));
                ServiceMetadataEndpoint smea = new ServiceMetadataEndpoint(SetMexTCPBinding(),mexeap);

                host.Description.Behaviors.Add(SetServiceMetaBehavior("http://localhost:9620/mex"));

                host.AddServiceEndpoint(smea);
                host.Description.Endpoints[0].Contract.SessionMode = SessionMode.Required;

                try
                {
                    host.Open();
                }
                catch (Exception e)
                {

                    Utility.EventArgs<Exception> exEvent = new Utility.EventArgs<Exception>(e);
                    OnServerException(exEvent);
                }
        }


        public event EventHandler<Utility.EventArgs<Exception>> ServerException;

        protected virtual void OnServerException(Utility.EventArgs<Exception> e)
        {
            EventHandler<Utility.EventArgs<Exception>> eh = ServerException;
            if (eh != null)
                eh(this, e);
        }


    }

    [ServiceContract(CallbackContract = typeof(IDataCallback), SessionMode=SessionMode.Required)]
    public interface IPublisher
    {
        [OperationContract(IsOneWay = true)]
        void Subscribe();

        [OperationContract(IsOneWay = true)]
        void Unsubscribe();
    }

    [ServiceContract]
    public interface IDataCallback
    {
        [OperationContract(IsOneWay = true)]
        void OnProductDataChanged(List<PRODUCT> productList);

    }

    public class PRODUCT
    {

        string Name { get; set; }
        string Code { get; set; }

        public PRODUCT(string name, string code)
        {
            this.Name = name;
            this.Code = code;

        }

    }

    public class WCFCLIENT : WCFBASE, IDataCallback
    {
        IPublisher publisher = null;

        DuplexChannelFactory<IPublisher> netPipeFactory;

        public DuplexChannelFactory<IPublisher> NetPipeFactory
        {
            get
            {
                return netPipeFactory;
            }
            set
            {
                netPipeFactory = value;
            }
        }

        public void Disconnect()
        {
            publisher.Unsubscribe();
        }


        public bool Connect()
        {
                NetPipeFactory = new DuplexChannelFactory<IPublisher>(
                    new InstanceContext(this),
                    SetNetTCPBinding(),
                    new EndpointAddress("net.tcp://localhost:9600/ISubscriber"));

            // NetPipeFactory.Faulted += pipeFactory_Faulted;
            try
            {
                publisher = NetPipeFactory.CreateChannel();
                publisher.Subscribe();
                return true;
            }
            catch (Exception e)
            {
                Utility.EventArgs<Exception> exEvent = new Utility.EventArgs<Exception>(e);
                OnClientException(exEvent);
                return false;
            }
        }

        public event EventHandler<Utility.EventArgs<Exception>> ClientException;

        protected virtual void OnClientException(Utility.EventArgs<Exception> e)
        {
            EventHandler<Utility.EventArgs<Exception>> eh = ClientException;
            if (eh != null)
                eh(this, e);
        }


        public event EventHandler<Utility.EventArgs<List<PRODUCT>>> OnProductsListChanged;

        protected virtual void OnProductListChanged(Utility.EventArgs<List<PRODUCT>> e)
        {
            EventHandler<Utility.EventArgs<List<PRODUCT>>> eh = OnProductsListChanged;
            if (eh != null)
                eh(this, e);
        }


        public void OnProductDataChanged(List<PRODUCT> productList)
        {
            try
            {
                Utility.EventArgs<List<PRODUCT>> bea = new Utility.EventArgs<List<PRODUCT>>(productList);
                OnProductListChanged(bea);
            }
            catch (Exception e)
            {

                Utility.EventArgs<Exception> exEvent = new Utility.EventArgs<Exception>(e);
                OnClientException(exEvent);
            }
        }
    }
Posted
Comments
johannesnestler 11-Aug-15 11:02am    
just had a quick look at your code, I think you just have to re-create the channel if it's in faulted state.

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900