Click here to Skip to main content
Click here to Skip to main content

Complete Managed Media Aggregation Part 2: Making things easier

, 11 Apr 2013
Rate this:
Please Sign up or sign in to vote.
We are going to look at some real life problems and then try to determine the best way to solve them with the principles set forth in the previous article.

Introduction

In the last part of this article, we addressed some of the fundamental principles in design paradigms as well as faced some new and hopefully useful concepts.

We also had a lot of banter which was somewhat of a tangent and not probably as easygoing as originally intended but hopefully still interesting enough to keep you thinking and reading.

We are going to keep the banter to a minimum this time and we are going to look at some real life problems and then try to determine the best way to solve them with the principles set forth in the previous article; however just like last time please remember that the ideas and code presented in this articles are not complete and only are enough to reflect the concepts I am attempting to portray.

Now without further delay, let's take an easy real work example which is going to be for the sake of this argument “Socket” communication.

Typically when you used to have to perform socket operations you needed to do things in particular way (typically synchronously) and any deviation from this pattern was highly scrutinized.

Then came the BeginXXX methods which helped to alleviate some of this by allowing the “ThreadPool” to help out by performing all the waiting necessary for the signals to occur.

These methods were helpful but drained the “ThreadPool” when used which caused other applications to slow down and possibly the entire computer.

Then the XXXXAsync methods came around which helped out even more because they used IOCompletionPorts, they are not much different than the BeginXXX other than that they have a single event ‘Completed’ which is designed to allow a single core entry point to encapsulate all operations on a particular socket; connect, send, receive and disconnect.

The synchronous polling is performed away from the thread which which is waiting on the “Completed” event to fire.

Now that we are familiar with the various Socket methods including SocketAsyncEventArgs object and how it works we will try to solve a real world problem with them.

How do you keep track of data statistics on a socket? Should this even be done at the application layer? What if it needs to be for reasons of latency etc?

Enter SocketStatistics...

They are class which is designed to work with the XXXXAsync method classes but contains overloads for any other version.

Based on my previous Article you will see I have included another class MediaCounter which provides a counter with virtually unlimited precision.

The reason for this is a counter is agnostic of operation (send or receive) and this a SocketStatistic will need to have Send a Receive counters.

Rather than just making all the fields twice I have broken down the logic into smaller units...

You will also see MediaSocketArgs which is the base class for all SocketAsyncEventArgs based communication in the library. It is primarily used in the “MediaSocketPool” which has also been included.

There is nothing too special here except a proper “Reset” method and the ability to store the buffer and initial offset and count so when “Reset” is called it can be retrieved.

The interesting thing about the MediaSocketPool is the way it encapsulated the SocketAsyncEventArgs pattern by providing a single public method for all operations performed on the pool.

You also never attach to the completed event, the pool handles all of this for you. You only have to notify the pool if the XXXXAsync method indicates it will not raise the event by calling a single method OnPoolWorkerCompleted.

Another interesting feature is the ability to “hook” into the MediaSocketPool for all operations, when any MediaSocketArgs is finished it will automatically be passed to all “hooks” in the hooks table.

This allows you to truly fire and forget, something I have not seen even in the most advanced pooling implementations.

When operations are performed in the default configuration the workers are set to certain amount for which can access the buffer concurrently without any contention whatsoever.

If a send operation is performed and there is a worker available he will be used, otherwise a new worker will be created if this is allows by the options given to the “MediaSocketPool”

After an operation completes a worked is checked for a PoolId which represents the MediaSocketPool which it was created for, if the options of the MediaSocketPool given allow new workers to be integrated into the pool then the PoolId of a dynamically created worker is set to PoolId of the  MediaSocketPool if not a new Guid is used to ensure the worker is never returned to the pool. 

Upon this happening the Buffer is not provided to the worker and a user must provide their own buffer to send or receive into by using the “SetBuffer”  method on the “MediaSocketPool”.

The reason for this is new workers are disposed and not returned to the pool. One immediate improvement I can think of is a polling mechanism which leaves the workers available for a certain amount of time before disposing them in case send rates are high but adding that functionality would be trivial at this point.

Now back to the SocketStatistics...

I have already explained everything about those but what about an additional aspect... say you want to keep track of different types of packets on the same socket... what if each packet carries a different amount of overhead? What if they are sent in combined fashions?

All of this usually can be a nightmare especially when keeping track of those pesky statistics.

This is where EncapsulatedSocketStatistics comes into play.

It adds a Dictionary of <Type, SocketStatistics> which allows you to keep try of type of any different packet.

I personally have developed the IPacket interface to allow for multiple packets to fit under a single facade while then allow them to be sub categorized by the actual type of packet they implement.

I have further integrated IPacket  by using it in the MediaSocketPool when packets are sent or received.

It works by looking at the BytesTransferred property and using that to increment the SocketStatistic counters underneath the hood but also taking into account what type of packet it was based on the Transferred property of the MediaSocketArgs which has been engineered as a IEnumerable<IPacket>.

This is important because it gives you a lot of information, the packet objects which were sent as well as their binary representation in bytes which is automatically partitioned into the OutgoingData Stack.

Overall the classes are engineered to work together and use the pieces put in place by the smaller units of code. 

The IPacket interface is light and actually makes your code faster because interfaces don’t suffer from the overhead associated with virtual calls which will probably be present in your packet class implementations.

Well now we have certainly covered a lot of topics very quickly!

I had much less banter than usual and hopefully have again presented some great design patterns for you to enjoy!

All of these + more will be present in the new version of Managed Media Aggregation which is coming along very nicely!

All of these articles represent the breakdown of existing concepts employed in the code currently to make it more efficient and scalable while also allowing more things to be done with it.

Each piece of each class has been professionally engineered to allow for the most flexibility and the best performance where possible but the cornerstone of the libraries presented are their unique patterns and encapsulations which cannot be found elsewhere until now.

Hopefully you will find them useful and they will keep you coming back for the next articles!

Thanks again for reading and at this time I will bring this article to a close!

Join me next time for other interesting goodies and related code! 

 Here are the MediaSocketArgs which encapsulate the SocketAsyncEventArgs pattern 

 #region Socket Classes

    #region MediaSocketArgs

    /// <summary>
    /// This class is not generic on purpose.
    /// The way the Completed event is declared you would have to check with "is" anyway...
    /// </summary>
    public class MediaSocketArgs : System.Net.Sockets.SocketAsyncEventArgs, IDisposable
    {
        internal readonly protected Guid Id;

        internal readonly protected Guid PoolId;

        /// <summary>
        /// The MediaTransportContext which dispatched this MediaSocketArgs
        /// </summary>
        public MediaTransportContext TransportContext { get; internal protected set; }

        /// <summary>
        /// Property which is used when sending or receiving packets at the TransportLayer.
        /// </summary>
        public IEnumerable<IPacket> Transferred { get; set; }

        /// <summary>
        /// The offset into a buffer from which this MediaSocketArgs origionally was created for.
        /// </summary>
        /// <remarks>
        /// If this MediaSocketArgs was used in a Send operation it is more efficient to use the SetBuffer method rather than copy data.
        /// After data is sent using this worker the origional offset and buffer can be restored by calling Reset.
        /// </remarks>
        public readonly int OrigionalOffset;

        /// <summary>
        /// The original size this MediaSocketArgs was origionally created with.
        /// </summary>
        public readonly int OrigionalCount;

        /// <summary>
        /// The buffer from which this MediaSocketArgs was originally created for.
        /// </summary>
        public readonly byte[] OrigionalBuffer;

        /// <summary>
        /// Creates a new MediaSocketArgs for use by a <see cref="System.Net.Sockets.Socket"/>
        /// </summary>
        /// <param name="buffer">The buffer to use for data transfer for this MediaSocketArgs.</param>
        /// <param name="offset">The offset into the given buffer.</param>
        /// <param name="count">The count of the buffer to use for data transfer taking offset into account.</param>
        public MediaSocketArgs(byte[] buffer, int offset, int count, Guid? poolId = null)
            :base()
        {
            SetBuffer(buffer, (int)offset, (int)count);
            OrigionalBuffer = buffer;
            OrigionalOffset = offset;
            OrigionalCount = count;
            PoolId = poolId ?? Guid.NewGuid();
        }

        /// <summary>
        /// Proxies a MediaSocketArgs but does not set the original buffer
        /// </summary>
        /// <param name="other"></param>
        internal MediaSocketArgs(MediaSocketArgs other) : this(null, 0, 0) { SetBuffer(other.Buffer, other.Offset, other.Count); }

        public MediaSocketArgs() : this(null, 0, 0) { }

        ~MediaSocketArgs() { SetBuffer(null, 0, 0); Dispose(); }

        /// <summary>
        /// Handles resetting the buffer, offset and count of an origionally pooled MediaSocketArgs
        /// </summary>
        /// <remarks>Only sending should cause the buffer to change. UserToken is left unmodified.</remarks>
        public void Reset()
        {
            if (Buffer != OrigionalBuffer || Offset != OrigionalOffset || Count != OrigionalCount)
            {
                SetBuffer(OrigionalBuffer, OrigionalOffset, OrigionalCount);
            }
        }

        public virtual void Dispose(bool disposing = true)
        {
            if (disposing)
            {
                SetBuffer(null, 0, 0);
                base.Dispose();
            }
        }

    }

    #endregion

Here is the Socket Statistics class which helps you keep track of send and receive rates as well as packets rates when used correctly with the MediaSocketPool

    #region SocketStatistics

    /// <summary>
    /// A class which is capable of keeping track of send and receive counters because that work can be tedious.
    /// </summary>
    /// <remarks>
    /// The work is performed ONLY when using the Socket.XXXXAsync methods.
    /// </remarks>
    public class SocketStatistics : IEnumerable<MediaCounter>
    {
        #region Fields

        /// <summary>
        /// When the statistics were create
        /// </summary>
        public readonly DateTime Created = DateTime.UtcNow;

        /// <summary>
        /// The additional amount added to any increment
        /// </summary>
        public readonly byte Overhead;

        /// <summary>
        /// Uniquely identifies the statistics
        /// </summary>
        public Guid Id;

        /// <summary>
        /// A <see cref="MediaCounter"/> for each type of operation.
        /// </summary>
        public readonly MediaCounter SendCounter, ReceiveCounter;

        #endregion

        #region Properties

        public ulong BytesSent { get { return SendCounter.Value; } }

        public ulong BytesReceived { get { return ReceiveCounter.Value; } }

        public ulong PacketsSent { get { return SendCounter.Increments; } }

        public ulong PacketsReceived { get { return ReceiveCounter.Increments; } }

        public DateTime InitalSendOperation { get { return SendCounter.InitialOperation; } }

        public DateTime InitialReceiveOperation { get { return ReceiveCounter.InitialOperation; } }

        public DateTime LastSendOperation { get { return SendCounter.LastOperation; } }

        public DateTime LastReceiveOperation { get { return ReceiveCounter.LastOperation; } }

        public BigInteger TotalSentBytes { get { return SendCounter.BigValue; } }

        public BigInteger TotalReceivedBytes { get { return ReceiveCounter.BigValue; } }

        /// <summary>
        /// Gets a value indicating if the SocketStatistics have sent or recieved any bytes
        /// </summary>
        /// <remarks>
        /// Requires Initalal Send or Receive operation as well as a counter value >

Here is the EncapsulatedSocketStatistics class which helps you keep counters per specific System.Type of IPacket

    #region EncapsulatedSocketStatistics

    /// <summary>
    /// Encapsulates a Dictionary of <see cref="Media.SocketStatistics"/> by <see cref="System.Type"/>.
    /// </summary>
    public class EncapsulatedSocketStatistics :
        IEnumerable<Type>, IEnumerable<SocketStatistics>, IEnumerable<KeyValuePair<Type,SocketStatistics>>
    {
        internal protected System.Collections.Concurrent.ConcurrentDictionary<Type, SocketStatistics> TypedStatistics = new System.Collections.Concurrent.ConcurrentDictionary<Type, SocketStatistics>();

        /// <summary>
        /// Gets or Sets the <see "Media.SocketStatistics"/> assocaited with the given <see "System.Type"/>.
        /// </summary>
        /// <param name="index">The type to retrieve, associate or update</param>
        /// <returns>The <see "Media.SocketStatistics"/> assocaited with the given <see "System.Type"/>.</returns>
        /// <remarks>
        /// If a key for a given <see "System.Type"/> already exists the assocaited <see "Media.SocketStatistics"/> is combined with it.
        /// </remarks>
        public SocketStatistics this[Type index]
        {
            get { return TypedStatistics[index]; }
            internal protected set { Add(index, value); }
        }

        /// <summary>
        /// Handles what is returned to a 
        /// </summary>
        /// <param name="type"></param>
        /// <param name="value"></param>
        /// <returns></returns>
        internal protected virtual SocketStatistics UpdateExistingSocketStatistics(Type type, SocketStatistics value) { Utility.BreakIfAttached(); return TypedStatistics[type]; }

        /// <summary>
        /// Adds or updates the given <see "Media.SocketStatistics"/> in the underlying EncapsulatedSocketStatistics.
        /// </summary>
        /// <param name="index">The <see "System.Type"/> to be associated with the given <see "Media.SocketStatistics"/></param>
        /// <param name="value">The <see "Media.SocketStatistics"/> to add to the underlying EncapsulatedSocketStatistics</param>
        public void Add(Type index, SocketStatistics value) { TypedStatistics.AddOrUpdate(index, value, UpdateExistingSocketStatistics); }

        public IEnumerator<Type> GetEnumerator() { return TypedStatistics.Keys.GetEnumerator(); }

        IEnumerator<SocketStatistics> IEnumerable<SocketStatistics>.GetEnumerator() { return TypedStatistics.Values.GetEnumerator(); }

        IEnumerator<KeyValuePair<Type, SocketStatistics>> IEnumerable<KeyValuePair<Type, SocketStatistics>>.GetEnumerator() { return TypedStatistics.GetEnumerator(); }

        System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() { return TypedStatistics.GetEnumerator(); }
    }

    #endregion

Here is the MediaSocketPool itself!

    #region MediaSocketPool

    /// <summary>
    /// Provides a reusable class for working with <see cref="MediaSocketArgs"/>.
    /// </summary>
    public class MediaSocketPool : 
        IEnumerable<Media.Common.MediaSocketPool.EventProxy>, //It can be enumerated by the Hooks currently hooked
        IDisposable
    {
        #region Statics

        public const int DefaultBlockSize = 1024;

        #endregion

        #region Nested Types

        public class MediaSocketPoolExcpetion : MediaException { }

        /// <summary>
        /// Used to provide a mechanism to call events from a another object
        /// </summary>
        internal class EventProxy
        {
            internal readonly object Sender;
            internal readonly EventHandler<MediaSocketArgs> EventHandler;
            public EventProxy(object sender, EventHandler<MediaSocketArgs> eventHandler)
            {
                Sender = sender;
                EventHandler = eventHandler;
            }
        }

        internal class DataChunk
        {
            internal readonly IEnumerable<IPacket> Source;
            internal ArraySegment<byte> Data;
            public DataChunk(IEnumerable<IPacket> source, ArraySegment<byte> data)
            {
                Source = source;
                Data = data;
            }
        }

        #endregion

        #region Fields

        bool m_Disposing;

        /// <summary>
        /// The Identifier of this MediaSocketPool
        /// </summary>
        public readonly Guid PoolId = Guid.NewGuid();

        /// <summary>
        /// Gets the known size of a block in the buffer
        /// </summary>
        public readonly int BlockSize;

        //Gets the known size of the SocketBuffer 
        internal int m_BufferSize;

        /// <summary>
        /// The size of the buffer into which raw binary data can be received.
        /// When used with the default BufferSize the buffer fits roughly 3 packets at <see cref=" RtpPacket.MaxPacketSize"/>
        /// </summary>
        internal byte[] SocketBuffer;

        /// <summary>
        /// The <see cref="System.Collections.Concurrent.ConcurrentStack"/> of SocketAsyncEventArgs which can be used to perform Send or Receive operations.
        /// </summary>
        internal System.Collections.Concurrent.ConcurrentStack<MediaSocketArgs> SocketWorkers;

        /// <summary>
        /// Data which has been provided to the pool to be sent at a time which is possible
        /// </summary>
        internal System.Collections.Concurrent.ConcurrentStack<DataChunk> OutgoingData;

        /// <summary>
        /// A place to address all used workers so IDisposable can dispose immediately
        /// </summary>
        internal System.Collections.Concurrent.ConcurrentDictionary<Guid, MediaSocketArgs> AddressedWorkers = new System.Collections.Concurrent.ConcurrentDictionary<Guid, MediaSocketArgs>();

        //For used workers

        /// <summary>
        /// Indicates the amount of Senders or Receivers available if set to a non 0 value in the Constructor.
        /// </summary>
        internal int SendersAvailable = -1, ReceiversAvailable = -1;

        #endregion

        #region Properties

        /// <summary>
        /// Indicates the total amount of bytes which are queued to be sent from the MediaSocketPool
        /// </summary>
        public int OutgoingDataSize { get { return OutgoingData.Sum(d => d.Data.Count); } }

        /// <summary>
        /// Indicates the size in bytes of the MediaSocketPool's buffer
        /// </summary>
        public int BufferSize { get { return m_BufferSize; } }

        /// <summary>
        /// The mimimum amount of SocketWorkers that must be available for each send operation
        /// </summary>
        /// <remarks>
        /// 0 By default, -1 indicates never use a worker
        /// </remarks>
        public int MaxSenders { get; internal protected set; }

        /// <summary>
        /// The mimimum amount of SocketWorkers that must be available for each receive operation.
        /// </summary>
        /// <remarks>
        /// 0 By default, -1 indicates never use a worker
        /// </remarks>
        public int MaxReceivers { get; internal protected set; }

        /// <summary>
        /// Gets a value indicating if any socket operation can be performed by a worker
        /// </summary>
        public bool WorkersExhusted { get { return SocketWorkers == null || SocketWorkers.Count == 0; } }

        /// <summary>
        /// Gets a value indicating if any socket operation can be performed by a worker
        /// </summary>
        public bool SendingWorkersAvailable { get { return MaxSenders != -1 && SocketWorkers == null && SocketWorkers.Count > MaxSenders; } }

        /// <summary>
        /// Gets a value indicating if any socket operation can be performed by a worker
        /// </summary>
        public bool ReceivingWorkersAvailable { get { return MaxReceivers != -1 && SocketWorkers == null && SocketWorkers.Count > MaxReceivers; } }

        /// <summary>
        /// Allows post event processing on this MediaSocketPool per each MediaSocketArgs dispatched on a Async method.
        /// </summary>
        internal System.Collections.Concurrent.ConcurrentBag<EventProxy> Hooks = new System.Collections.Concurrent.ConcurrentBag<EventProxy>();

        #endregion

        #region Events

        /// <summary>
        /// The pattern which is presented for handling any async event from a MediaSocketPool
        /// </summary>
        /// <param name="sender">Specifically not object because Hooks are available</param>
        /// <param name="e">The completed worker</param>
        public delegate void PoolWorkerCompletedHandler(MediaSocketPool sender, MediaSocketArgs e);

        /// <summary>
        /// The event fired on a system thread for each worker action completed.
        /// </summary>
        public event PoolWorkerCompletedHandler PoolWorkerCompleted;

        public void OnPoolWorkerCompleted(MediaSocketArgs e) { if (!m_Disposing) PoolWorkerCompleted(this, e); }

        /// <summary>
        /// The main entry point for a SocketAsyncEventArgs object after a AsyncSocketOperation.
        /// Each MediaSocketPool encapsultes the SocketAsyncEventArgs pattern by providing Hooks.
        /// </summary>
        /// <param name="sender">The boxed objec to raied the event which will be chcked for variance against the MediaSocketPool type</param>
        /// <param name="e">The SocketAsycnEventArgs which will be checked for variance against the MediaSocketArgs type</param>
        static void CoreSocketAsyncEventArgsCompleted(object sender, SocketAsyncEventArgs e)
        {
            if (sender is MediaSocketPool && e is MediaSocketArgs)
            {
                MediaSocketPool pool = (MediaSocketPool)sender;
                MediaSocketArgs arg = (MediaSocketArgs)e;
                if (!pool.m_Disposing)
                {
                    //Raise the event.
                    pool.OnPoolWorkerCompleted(arg); //Though about proxying the arg here if the last operation was a receive etc...
                    //Put the worker back immediately
                    pool.EnqueWorker(arg);
                }
            }
        }

        /// <summary>
        /// Dispatches the MediaSocketArgs worker to notify each EventProxy created as a result of hooking the MediaSocketPool
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        static void MediaSocketPoolWorkerCompleted(MediaSocketPool sender, MediaSocketArgs e)
        {
            if (!sender.m_Disposing)
                foreach (EventProxy proxy in sender.Hooks)
                    proxy.EventHandler(proxy.Sender ?? sender, e);
        }

        #endregion

        #region Constructors / Destructor

        /// <summary>
        /// Creates and Partitions the SocketBuffer into Sectors and assigns a MediaSocketArg for each Sector in the SocketBuffer.
        /// </summary>
        /// <param name="blockSize">The size in bytes of each sector of the buffer</param>
        /// <param name="sendingWorkers">0 Indicates a Sender will never be avilable</param>
        /// <param name="receivingWokers">0 Indicates a Receiver will never be available</param>
        public MediaSocketPool(int blockSize = DefaultBlockSize, int sendingWorkers = -1, int receivingWokers = -1)
        {
            BlockSize = blockSize;
            MaxSenders = sendingWorkers;
            MaxReceivers = receivingWokers;
            PoolWorkerCompleted += MediaSocketPoolWorkerCompleted;
            Initialize();
        }

        ~MediaSocketPool() { Dispose(true); }

        #endregion

        #region Methods

        public void AddHook(object sender, EventHandler<MediaSocketArgs> handler)
        {
            if (!m_Disposing) Hooks.Add(new EventProxy(sender, handler));
        }

        /// <summary>
        /// Removes all hooks from the MediaSocketPool which match the given criteria
        /// </summary>
        /// <param name="handler">The hook to remove</param>
        /// <param name="sender">The optional sender of the hook</param>
        public void RemoveHooks(EventHandler<MediaSocketArgs> handler, object sender = null)
        {
            EventProxy hook;
            //While there is a hook
            while (Hooks.TryTake(out hook))
                //If the hook matched the handler (AND the sender was given and it matches)
                if (hook.EventHandler == handler && (sender != null ? hook.Sender == sender : true))
                {
                    //Mark the hook for GC
                    hook = null;
                    continue; 
                }
                else Hooks.Add(hook);//Add the hook back
        }

        /// <summary>
        /// Initializes
        /// <see cref="http://msdn.microsoft.com/en-us/library/system.net.sockets.socketasynceventargs.aspx"/>
        /// Throws a <see cref="System.OutOfMemoryException"/> if no additional memory can be acquired.
        /// </summary>
        /// <param name="additional">The additional amount of bytes to initialize with</param>
        [System.Runtime.CompilerServices.MethodImplAttribute(System.Runtime.CompilerServices.MethodImplOptions.Synchronized)]
        internal protected void Initialize()
        {
            try
            {
                MediaSocketArgs arg;

                //Sanity
                if (MaxSenders < -1) MaxSenders = -1;

                if (MaxReceivers < -1) MaxReceivers = -1;

                int availble = Math.Abs(MaxSenders + MaxReceivers);

                //If we are not insane
                if (availble > 0)
                {
                    //If the MediaSocketPool was not previous initialized
                    if (SocketWorkers == null) SocketWorkers = new System.Collections.Concurrent.ConcurrentStack<MediaSocketArgs>();

                    //Prepare a place for Outgoing data to be stored when no workers are available to send
                    if (OutgoingData == null) OutgoingData = new System.Collections.Concurrent.ConcurrentStack<DataChunk>();

                    //Set the BufferSize and Create or Resize the SocketBuffer
                    if (SocketBuffer == null) SocketBuffer = new byte[m_BufferSize = BlockSize * BlockSize];
                    else Array.Resize<byte>(ref SocketBuffer, m_BufferSize = (SocketBuffer.Length + BlockSize));

                    //Starting at 0
                    int offset = 0;

                    //While there are args already in the Queue
                    while (SocketWorkers.TryPop(out arg) && --availble >= 0)
                    {
                        //Should be unnecessary
                        arg.Reset();
                        offset += arg.Count;
                        //Ensure not double attached
                        arg.Completed -= CoreSocketAsyncEventArgsCompleted;
                        SocketWorkers.Push(arg);
                    }

                    //While the default amount of args have not been initialized
                    int maxOffset = SocketBuffer.Length;
                    while (offset < maxOffset)
                    {
                        arg = new MediaSocketArgs(SocketBuffer, offset, BlockSize);
                        offset += BlockSize;
                        arg.Completed += CoreSocketAsyncEventArgsCompleted;
                        SocketWorkers.Push(arg);
                    }
                }
            }
            catch { throw; }
        }

        internal bool AddressWorker(MediaSocketArgs worker)
        {
            if (worker == null) return false;
            worker.Completed += CoreSocketAsyncEventArgsCompleted;
            return AddressedWorkers.TryAdd(worker.Id, worker);
        }

        internal bool TryDequeOrCreateWorker(bool tryGetWorker, ref int counter, out MediaSocketArgs worker)
        {
            if (m_Disposing)
            {
                worker = null;
                return false;
            }

            if (!tryGetWorker)
            {
                //If the worker can be attained
                if (SocketWorkers.TryPop(out worker))
                {
                    //If track is being kept of the senders keep it
                    if (SendersAvailable > 0) --SendersAvailable;
                    //Indicate success
                    return AddressWorker(worker);
                }
                return false;
            }
            else if (counter == -1)
            {
                worker = new MediaSocketArgs(null, 0, 0);
                return AddressWorker(worker);
            }

            worker = null;
            return false;
        }

        internal void EnqueWorker(MediaSocketArgs worker)
        {
            //Event is removed immediately to prevent people from firing it on the pool again
            worker.Completed -= CoreSocketAsyncEventArgsCompleted;

            //If we are not already disposing and the worked was removed from the AddressedWorkers
            if (!m_Disposing && AddressedWorkers.TryRemove(worker.Id, out worker))
            {
                //Determine if the worker was not allocated on the fly return him
                if (worker.PoolId == PoolId)
                {
                    //Reset the buffer, offset and position
                    worker.Reset();
                    //Use the worker again
                    SocketWorkers.Push(worker);
                    //If track is being kept keep it
                    if (worker.LastOperation == SocketAsyncOperation.Send && SendersAvailable > -1) ++SendersAvailable;
                    else if (worker.LastOperation == SocketAsyncOperation.Receive && ReceiversAvailable > -1) ++ReceiversAvailable;
                }
                else
                {
                    //Otherwise dispose the worker
                    worker.Dispose();
                }
            }
        }

        /// <summary>
        /// Gets a worker which can be used outside the MediaSocketPool.
        /// </summary>
        /// <param name="type">The type of operation to be performed</param>
        /// <param name="worker">The worker to assign</param>
        /// <returns>Try if the worker was assigned</returns>
        /// <remarks>Remeber to call OnPollOperationCompleted if XXXXAsync methods indicate they will not raise an event.</remarks>
        public bool TryGetWorker(SocketAsyncOperation type, out MediaSocketArgs worker)
        {
            if (m_Disposing)
            {
                worker = null;
                return false;
            }

            //Determine which worker type to get
            switch (type)
            {
                case SocketAsyncOperation.SendPackets:
                case SocketAsyncOperation.SendTo:
                case SocketAsyncOperation.Send:
                    {
                        return TryDequeOrCreateWorker(SendingWorkersAvailable, ref SendersAvailable, out worker);
                    }
                case SocketAsyncOperation.ReceiveMessageFrom:
                case SocketAsyncOperation.ReceiveFrom:
                case SocketAsyncOperation.Receive:
                    {
                        return TryDequeOrCreateWorker(ReceivingWorkersAvailable, ref ReceiversAvailable, out worker);
                    }
                case SocketAsyncOperation.Accept:
                case SocketAsyncOperation.Connect:
                case SocketAsyncOperation.Disconnect:
                    {
                        //Not really supported as of yet but this causes a worker to try to be obtained
                        return TryDequeOrCreateWorker(true, ref ReceiversAvailable, out worker);
                    }
                case SocketAsyncOperation.None:
                default:
                    {
                        worker = null;
                        return false;
                    }
            }
        }

        /// <summary>
        /// Adds outgoing data to the MediaSocketPool to be sent when possible.
        /// Data which is larger then the BlockSize of the Pool will be broken up into multiple pieces.
        /// </summary>
        /// <param name="data">The data to send</param>
        /// <param name="offset">The offset into the data to start sending</param>
        /// <param name="count">The optional maximum amount of data to send where -1 indicates to use the Length of data.</param>
        //public void EnqueOutgoingData(byte[] data, int offset, int count = -1)
        //{
        //    if (data == null) return;
        //    int dataLength = data.Length;
        //    if (dataLength == 0 || offset < 0) return;
        //    if (count < 0) count = dataLength;
        //    if (offset + count > dataLength) throw new ArgumentOutOfRangeException();
        //    int pieceOffset = offset,
        //    pieceSize = Math.Min(BlockSize, count);
        //    while (count > 0)
        //    {
        //        //Add a piece of the data to the OutgoingData stack
        //        OutgoingData.Push(new ArraySegment<byte>(data, pieceOffset, count));
        //        //Move the offset for the next piece
        //        pieceOffset += pieceSize;
        //        //Subtract the count from the size of the piece created
        //        count -= pieceSize;
        //    }
        //}

        public void EnqueOutgoingData(IEnumerable<IPacket> packets)
        {
            if (packets == null || packets.Count() == 0) return;
            //We start at 0
            int pieceOffset = 0;
            //For each packet given
            foreach (IPacket packet in packets)
            {
                //Get the binary data of the packet
                byte[] stack = packet.ToBytes().ToArray();
                //Set the size of the piece
                int pieceSize = Math.Min(BlockSize, stack.Length);
                //while there is more data in the packet 
                while (pieceOffset > 0)
                {
                    //Add a piece of the data to the OutgoingData stack with the packet set as the outgoing packet if this is the first piece
                    //Subsequent pieces will have the packet set to null, reason being is the Counters for statistics will need a IPacket to incrmenet correctly
                    //packets which are larger then the BlockSize will need to be split up on multiple workers...
                    //When the send is complete the counters are incremented and the Transferred property is used to incrment the counters.
                    //If there is no IPackets in the Transferred property then just the BytesTransferred from the MediaSocketArgs is used.
                    OutgoingData.Push(new DataChunk(pieceOffset == 0 ? packet.Yield() : null, new ArraySegment<byte>(stack, pieceOffset, pieceSize)));
                    //Move the offset for the next piece
                    pieceOffset += pieceSize;
                }
            }
        }

        /// <summary>
        /// Sends data from the OutgoingData stack.
        /// </summary>
        /// <param name="sendingSocket">The socket to send the data on</param>
        [System.Runtime.CompilerServices.MethodImplAttribute(System.Runtime.CompilerServices.MethodImplOptions.Synchronized)]        
        internal void SendOutgoingData(Socket sendingSocket)
        {
            MediaSocketArgs worker;
            DataChunk chunk;
            //While there is data to be sent and if we can get a worker
            if (OutgoingData.TryPeek(out chunk) && TryGetWorker(SocketAsyncOperation.Send, out worker))
            {
                //Create a place to store the buffers
                List<ArraySegment<byte>> segments = new List<ArraySegment<byte>>();

                IEnumerable<IPacket> toTransfer = Enumerable.Empty<IPacket>();

                //Peek pop depending on size
                while (OutgoingData.TryPeek(out chunk) && //If there is data to be sent 
                        OutgoingData.TryPop(out chunk))//Pop the segment out                    
                {
                    //If the source had a packet Concatenate then packet to the toTransfer enumerable
                    if (chunk.Source != null) toTransfer.Concat(chunk.Source);
                    segments.Add(chunk.Data);//Add the segment to the list
                }

                //Set the state of the operation to the packets which were used to create the DataChunks.
                worker.Transferred = toTransfer;

                //Assign the data to the worker
                worker.BufferList = segments;

                //Send the data using the socket
                if (!sendingSocket.SendAsync(worker)) OnPoolWorkerCompleted(worker);
            }
        }
        
        #endregion

        #region IDisposable

        public void Dispose() { Dispose(true); }

        [System.Runtime.CompilerServices.MethodImplAttribute(System.Runtime.CompilerServices.MethodImplOptions.Synchronized)]
        void Dispose(bool disposing)
        {
            if (m_Disposing = disposing)
            {
                //All events at the root are taken away
                PoolWorkerCompleted -= MediaSocketPoolWorkerCompleted;

                //The MediaSocketArgs 
                MediaSocketArgs worker;

                foreach (Guid key in AddressedWorkers.Keys.ToArray())
                {
                    //There should be a key for each worker
                    if (AddressedWorkers.TryGetValue(key, out worker))
                    {
                        //If we could not remove the worker break if attached
                        if (!AddressedWorkers.TryRemove(key, out worker) || worker == null) Utility.BreakIfAttached();
                        else worker.Dispose();
                    }
                }

                //Dispose remaining unused workers
                while (SocketWorkers.TryPop(out worker)) worker.Dispose();

                //Remove all hooks
                EventProxy hook;
                while (Hooks.TryTake(out hook)) { hook = null; }
            }
        }

        void IDisposable.Dispose() { Dispose(true); }

        #endregion
        
        #region IEnumerator

        IEnumerator<EventProxy> IEnumerable<EventProxy>.GetEnumerator() { return Hooks.GetEnumerator(); }

        System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() { return Hooks.GetEnumerator(); }

        #endregion
    }

    #endregion

    #endregion

And here is an example of the IPacket interface

    #region Packet Classes

    #region IPacket

    /// <summary>
    /// An interface to encapsulate binary data which usually traverses an ethernet line.
    /// </summary>
    public interface IPacket
    {
        /// <summary>
        /// Encapsulates the object as binary data
        /// </summary>
        /// <returns>The binary representation of the IPacket</returns>
        IEnumerable<byte> ToBytes();
        
        /// <summary>
        /// The length of the IPacket in bytes
        /// </summary>
        int Length { get; }
        
        /// <summary>
        /// The DateTime(UTC) in which the IPacket was Sent
        /// </summary>
        /// <remarks>
        /// To determine if the IPacket was received use (!IPacket.Sent.HasValue)
        /// </remarks>
        DateTime? Sent { get; set; }

        /// <summary>
        /// The DateTime(UTC) in which the IPacket was Created
        /// </summary>        
        DateTime Created { get; set; }

        /// <summary>
        /// An optional paramerter which identifies the IPacket on a broad spectrum.
        /// This can indication Audio, Video, or something totally different depending on what the underlying MediaTransportContext utilizes it for.
        /// </summary>
        byte? Channel { get; set; }
    }

    #endregion
    
    #region MediaPacket

    public abstract class MediaPacket : IPacket
    {
        /// <summary>
        /// The length of the packet in bytes including any application layer or transport layer overhead.
        /// </summary>
        public abstract int Length { get; }

        /// <summary>
        /// The channel to send the RtpPacket on or the channel it was received from
        /// </summary>
        public byte? Channel { get; set; }

        /// <summary>
        /// Indicates the Date and Time the MediaPacket instance was created
        /// </summary>
        /// <remarks>
        /// Usually provided in Utc, always call ToUniversalTime or ToLocalTime to be sure.
        /// </remarks>
        public DateTime Created { get; set; }

        /// <summary>
        /// Indicates the Date and Time the MediaPacket was sent.
        /// </summary>
        /// <remarks>
        /// Usually provided in Utc, always call ToUniversalTime or ToLocalTime to be sure.
        /// </remarks>
        public DateTime? Sent { get; set; }

        /// <summary>
        /// Get a value which represents the Payload of the MediaPacket
        /// </summary>
        public abstract byte[] Payload { get; internal protected set; }

        /// <summary>
        /// Gets a binary representation of the MediaPacket.
        /// </summary>
        /// <returns>A byte[] which is <see cref="MediaPacket.Length"/> is Length</returns>
        public abstract IEnumerable<byte> ToBytes();

        /// <summary>
        /// Creates a new MediaPacket
        /// </summary>
        /// <param name="channel">The optional channel to be assoicated with the <see cref="MediaPacket.Channel"/></param>
        /// <remarks>Sets Created to DateTime.UtcNow</remarks>
        public MediaPacket(byte? channel = null)
        {
            Created = DateTime.UtcNow;

            if (channel.HasValue) Channel = channel;
        }
    }

    #endregion

    #endregion

And Finally the MediaCounter class which is the smallest unit presented here and is responsible for a majority of the work involved!

    #region MediaCounter

    public class MediaCounter
    {
        #region Fields

        /// <summary>
        /// When the statistics were create
        /// </summary>
        public readonly DateTime Created = DateTime.UtcNow;

        /// <summary>
        /// The additional amount added to any increment
        /// </summary>
        public readonly byte Overhead;

        /// <summary>
        /// Uniquely identifies the statistics
        /// </summary>
        public Guid Id;

        internal DateTime //Samplepoints
            m_InitialOperation = DateTime.MinValue, //Send
            m_LastOperation = DateTime.MinValue;

        internal ulong //biggest ValueTypes availale AFAIK
            m_Value, m_Cycles, m_Incremented;

        #endregion

        #region Properties

        public bool Cycled { get { return m_Cycles > 0; } }

        public bool Incremented { get { return m_Incremented > 0; } }

        public ulong Cycles { get { return m_Cycles; } }

        public ulong Increments { get { return m_Incremented; } }

        public ulong Value { get { return m_Value; } }

        public DateTime InitialOperation { get { return m_InitialOperation; } }

        public DateTime LastOperation { get { return m_LastOperation; } }

        /// <summary>
        /// Gets a value indicating if the SocketStatistics have sent or recieved any bytes
        /// </summary>
        /// <remarks>
        /// Requires Initalal Send or Receive operation as well as a counter value > 0
        /// </remarks>
        public BigInteger BigValue
        {
            get
            {
                if (m_Cycles > 0) return m_Value + (m_Cycles * ulong.MaxValue);
                return m_Value;
            }
        }

        /// <summary>
        /// Provides a calculation where the Overhead is removed from the Value.
        /// </summary>
        public BigInteger CalculatedOverhead { get { return m_Incremented * Overhead; } }

        public bool PerformedOperations { get { return (m_InitialOperation != DateTime.MinValue); } }

        #endregion

        #region Methods

        [System.Runtime.CompilerServices.MethodImplAttribute(System.Runtime.CompilerServices.MethodImplOptions.Synchronized)]
        public void Increment(ulong amount, bool includeOverhead = true, bool incrementIncrements = true)
        {
            DateTime now = DateTime.UtcNow;

            if (m_InitialOperation == DateTime.MinValue) m_InitialOperation = now;

            if (m_LastOperation == DateTime.MinValue) m_LastOperation = now;

            unchecked//Ensure OverflowException doensn't happen
            {
                //We performed an operation *and we are keeping track*
                if (incrementIncrements) ++m_Incremented;
                ulong check = m_Value;//Check the value for overflow
                if ((m_Value += amount + 
                    (includeOverhead ? (ulong)Overhead : 0)) < check) //Include overhead?
                    ++m_Cycles; //Indicate overflow occured
            }
        }

        public void Increment(int amount, bool includeOverhead = true, bool incrementIncrements = true) { Increment((ulong)amount, includeOverhead, incrementIncrements); }

        public double Rate(double divider, DateTime? start = null, DateTime? end = null) { return CalculateRate(ref m_Value, ref divider, start, end); }

        public double IncrementRate(double divider, DateTime? start = null, DateTime? end = null) { return CalculateRate(ref m_Incremented, ref divider, start, end); }

        internal double CalculateRate(ref ulong value, ref double divider, DateTime? start = null, DateTime? end = null)
        {
            if (divider == 0) return 0;
            if (!start.HasValue && m_InitialOperation == DateTime.MinValue) start = Created;
            if (!end.HasValue && m_LastOperation == DateTime.MinValue) end = DateTime.UtcNow;
            start = start ?? m_InitialOperation;
            end = end ?? m_LastOperation;
            unchecked
            {
                return value / divider / (start.Value - end.Value).TotalSeconds;
            }
        }

        [System.Runtime.CompilerServices.MethodImplAttribute(System.Runtime.CompilerServices.MethodImplOptions.Synchronized)]
        public void ResetIncrements() { m_Incremented = 0; }

        [System.Runtime.CompilerServices.MethodImplAttribute(System.Runtime.CompilerServices.MethodImplOptions.Synchronized)]
        public void ResetCycles() { m_Cycles = 0; }

        [System.Runtime.CompilerServices.MethodImplAttribute(System.Runtime.CompilerServices.MethodImplOptions.Synchronized)]
        public void ResetValue() { m_Value = 0; }

        [System.Runtime.CompilerServices.MethodImplAttribute(System.Runtime.CompilerServices.MethodImplOptions.Synchronized)]
        public void ResetOperationTimes() { m_InitialOperation = m_LastOperation = DateTime.MinValue; }

        [System.Runtime.CompilerServices.MethodImplAttribute(System.Runtime.CompilerServices.MethodImplOptions.Synchronized)]
        public void Reset()
        {
            ResetOperationTimes();
            ResetIncrements();
            ResetCycles();
            ResetValue();
        }

        #region Constructors

        public MediaCounter(byte overhead = 0)
        {
            Id = Guid.NewGuid();
            Overhead = overhead;
        }

        public MediaCounter(int overhead = 0)
            : this((byte)overhead) { }

        public MediaCounter(MediaCounter other, bool copyOverhead = true)
        {
            m_Value = other.m_Value;
            m_Cycles = other.m_Cycles;
            if (copyOverhead) Overhead = other.Overhead;
        }

        #endregion

        #endregion

        #region Operators

        public static MediaCounter operator +(MediaCounter a, MediaCounter b)
        {
            a.m_Value += b.m_Value;
            return a;
        }

        public static MediaCounter operator -(MediaCounter a, MediaCounter b)
        {
            a.m_Value -= b.m_Value;
            return a;
        }

        public static MediaCounter operator *(MediaCounter a, MediaCounter b)
        {
            a.m_Value *= b.m_Value;
            return a;
        }

        public static MediaCounter operator /(MediaCounter a, MediaCounter b)
        {
            a.m_Value /= b.m_Value;
            return a;
        }

        public static implicit operator BigInteger(MediaCounter counter) { return counter.BigValue; }

        public static implicit operator ulong(MediaCounter counter) { return counter.m_Value; }

        public static implicit operator long(MediaCounter counter) { return (long)counter.m_Value; }

        public static implicit operator int(MediaCounter counter) { return (int)counter.m_Value; }

        public static implicit operator uint(MediaCounter counter) { return (uint)counter.m_Value; }

        public static implicit operator short(MediaCounter counter) { return (short)counter.m_Value; }

        public static implicit operator ushort(MediaCounter counter) { return (ushort)counter.m_Value; }

        #endregion
    }

    #endregion

As previously indicated you will find these classes and much more @ Managed Media Aggregation when RC2 Flawless is released in the coming weeks! 

Enjoy these concepts until then!

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

About the Author

jfriedman
Software Developer (Senior) ASTI Transportation Inc.
United States United States
Livin in a lonely world, caught the midnight train going anywhere... Only thing is it was a runaway train... and it ain't ever goin back...
 
v//
Follow on   Twitter   Google+

Comments and Discussions

 
-- There are no messages in this forum --
| Advertise | Privacy | Mobile
Web03 | 2.8.140709.1 | Last Updated 12 Apr 2013
Article Copyright 2013 by jfriedman
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid