Click here to Skip to main content
15,896,269 members
Articles / Programming Languages / C#

Data-controlled Processes Application Design

Rate me:
Please Sign up or sign in to vote.
4.65/5 (7 votes)
23 Nov 2009CPOL10 min read 30.3K   376   34  
A ready-to-use process manager provided
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;

namespace DataDriven
{
    /// <summary>
    /// Manages processes which implement IDataProcess interface. The manager is initialized in deactivated state. Call the Activate method
    /// to start executing processes. You can equeue data when inactive too.
    /// Reference:
    /// </summary>
    public class DataProcessManager : IEnumerable<IDataProcess>
    {
        private readonly List<ProcessContainer> processes = new List<ProcessContainer>();
        // Last = end of queue (where to enqueue), First = start of queue (where to dequeue from).
        private readonly LinkedList<DataContainer> waitingData = new LinkedList<DataContainer>();
        //private readonly List<ProcessInputRequirements> InputRequirements = new List<ProcessInputRequirements>();
        private Thread loopProcessesThread;
        private volatile bool active;
        private volatile int threadCount = 0;
        private volatile bool sheduling;
        private volatile bool dataArrived;
        private int? threadLimit = 2 * Environment.ProcessorCount + 1;

        /// <summary>
        /// Gets or sets maximum number of threads used for processing, excluding the manager's thread used to distribute input data.
        /// By default it is set to twice the number of availible processors. If there are many processes which accept the same type
        /// of input data and under some other circumstances then it may happen that more threads will be created.
        /// </summary>
        public int? ThreadLimit
        {
            get { return threadLimit; }
            set
            {
                value += 1; // DPM thread.
                if (active)
                    throw new InvalidOperationException("Cannot set a thread limit while manager is active.");
                if (value < 1)
                    throw new ArgumentOutOfRangeException("value",
                                                          "DPM needs at least one thread to work. To deactivate DPM use the Deactivate method.");
                if (value < threadCount)
                    throw new ArgumentOutOfRangeException("value",
                        "Cannot set a thread limit to less value than number of currently running threads. Consider setting a thread limit before activating the manager.");
                threadLimit = value;
            }
        }
        public bool IsEverythingDone
        {
            get
            {
                return waitingData.Count == 0 && threadCount == (active ? 1 : 0) && !sheduling;
            }
        }

        #region IEnumerable<IDataProcess> Members

        public IEnumerator<IDataProcess> GetEnumerator()
        {
            return processes.ConvertAll(dpc => dpc.Process).GetEnumerator();
        }

        IEnumerator IEnumerable.GetEnumerator()
        {
            return GetEnumerator();
        }

        #endregion

        /// <summary>
        /// Fires if there was produced an output which cannot be input for any process.
        /// </summary>
        public event EventHandler<OutputEventArgs> Output;

        /// <summary>
        /// Starts distributing input data. First go the enqueued stuff.
        /// </summary>
        public void Activate()
        {
            active = true;
            loopProcessesThread = new Thread(LoopProcesses)
                                  {
                                      Name = "LoopProcess"
                                  };
            loopProcessesThread.Start();
        }

        /// <summary>
        /// Stops sending input data to processes. All upcoming input will be enqueued.
        /// However, existing process threads will not be stopped.
        /// </summary>
        public void Deactivate()
        {
            active = false;
            loopProcessesThread.Join();
        }

        private void InvokeOutput(OutputEventArgs args)
        {
            EventHandler<OutputEventArgs> handler = Output;
            if (handler != null) handler(this, args);
        }

        /// <summary>
        /// Adds processes to the pool.
        /// </summary>
        /// <param name="dataProcess">List of processes to add.</param>
        // /// <param name="inputRequirements">Input requirements.</param>
        public void Add(IDataProcess dataProcess)
        {
            Add(new AddProcessParams(dataProcess));
        }

        /// <summary>
        /// Adds a group of processes to the pool.
        /// </summary>
        /// <param name="processGroup">List of processes to add.</param>
        public void Add(DataProcessSet processGroup)
        {
            foreach (AddProcessParams process in processGroup)
            {
                ProcessContainer container = GetContainer(process);
                processes.Add(container);
            }
        }

        /// <summary>
        /// Adds processes to the pool and sets maximum number of threads.
        /// </summary>
        /// <param name="addParams">Options for a new added process.</param>
        public void Add(AddProcessParams addParams)
        {
            if (active)
                throw new InvalidOperationException("Deactivate the manager before modyfing the process pool.");
            var container = GetContainer(addParams);
            /*ProcessInputRequirements requirements = dataProcess.GetRequirements();
            requirements.Process = container;
            InputRequirements.AddActions(requirements);*/
            processes.Add(container);
        }

        private static ProcessContainer GetContainer(AddProcessParams addParams)
        {
            return new ProcessContainer
                   {
                       Process = addParams.Process,
                       ThreadLimit = addParams.ThreadLimit,
                       SingleRun = addParams.SingleRun,
                       Requirements = addParams.InputRequirements,
                       AllowRecursion = addParams.AllowRecursion
                   };
        }

        /// <summary>
        /// Resets status of processes which had been set to "single run" so they will be able to execute again.
        /// </summary>
        public void Reset()
        {
            lock (processes)
                foreach (ProcessContainer processContainer in processes)
                    processContainer.WasExecuted = false;
        }

        /// <summary>
        /// Distributes input data between processes.
        /// The operation is performed in another thread, so the control returns immediately to the caller.
        /// </summary>
        /// <param name="param">Settings for sending data, like priority.</param>
        /// <param name="data">Availible input data objects.</param>
        public void SendData(SendDataParams param, IEnumerable data)
        {
            foreach (object o in data)
                SendData(new DataContainer
                         {
                             Data = o,
                             SendParams = param
                         });
        }

        public void SendData(params object[] data)
        {
            SendData((IEnumerable)data);
        }
        public void SendData(SendDataParams param, params object[] data)
        {
            SendData(param, (IEnumerable) data);
        }
        public void SendData(IEnumerable data)
        {
            foreach (var o in data)
                SendData(new DataContainer
                {
                    Data = o
                });
        }

        private void SendData(DataContainer data)
        {
            if (active)
                lock (waitingData)
                    PriorityEnqueue(data);
            else
                PriorityEnqueue(data); // no lock necessary
        }
        void PriorityEnqueue(DataContainer data)
        {
            LinkedListNode<DataContainer> node = waitingData.Last;
            if (node == null)
                waitingData.AddFirst(data);
            else
            {
                for (; node.Previous != null && data.SendParams.Priority < node.Value.SendParams.Priority; node = node.Previous)
                {
                }
                waitingData.AddAfter(node, data);
            }
            dataArrived = true;
        }

        bool CanStartNewThread
        {
            get { return threadLimit == null || threadCount < threadLimit; }
        }
        private void Idle()
        {
            while (!dataArrived)
                Thread.Sleep(0);
            return;
        }
        private void LoopProcesses()
        {
            // A lot of overhead here, but it was necessary to avoid uncomfortable situations.
            threadCount++;
            while (active)
            {
                sheduling = true;
                while (active)
                {
                    DataContainer availibleData;
                    lock (waitingData)
                    {
                        // Here is an actual WHILE condition (needs to be inside a lock block).
                        if (waitingData.Count == 0)
                            break;
                        availibleData = waitingData.Dequeue();
                    }


                    bool anyValid = false;
                    bool anyExecuted = false;
                    bool enqueue = false;
                    var threadsToStart = new Dictionary<Thread, StartProcessParams>();
                    foreach (ProcessContainer processContainer in processes)
                    {
                        if (!((!processContainer.AllowRecursion && availibleData.Track.Contains(processContainer)) ||
                            (processContainer.SingleRun && processContainer.WasExecuted)))
                        {
                            if (processContainer.AvailibleData.WouldNeed(availibleData, processContainer.Requirements))
                            {
                                if ((!CanStartNewThread || (processContainer.ThreadLimit != null &&
                                     processContainer.ThreadCount >= processContainer.ThreadLimit))
                                    && !availibleData.SendParams.IgnoreThreadLimit)
                                {
                                    // If a process has reached it's thread limit or overall limit has been reached
                                    // and data has NOT an IgnoreThreadLimit flag,
                                    // then put the data at the end of the queue.
                                    if (!anyExecuted) // prevent executing the same data more than once.
                                        enqueue = true;
                                }
                                if (!enqueue)
                                {
                                    processContainer.AvailibleData.AddInput(availibleData);
                                    if (processContainer.Requirements.IsSatisfiedBy(processContainer.AvailibleData))
                                    {
                                        var threadAndParams = CreateThread(processContainer, out anyExecuted);
                                        threadsToStart.Add(threadAndParams.Key, threadAndParams.Value);
                                    }
                                }

                                anyValid = true;
                            }
                        }
                    } // END foreach process
                    if (enqueue)
                        lock (waitingData)
                            PriorityEnqueue(availibleData);
                    if (!anyValid)
                        InvokeOutput(new OutputEventArgs(availibleData.Data, availibleData.Source == null
                                                             ? null
                                                             : availibleData.Source.Process));
                    foreach (KeyValuePair<Thread, StartProcessParams> pair in threadsToStart)
                    {
                        pair.Key.Start(pair.Value);
                    }
                    threadsToStart.Clear();
                    Thread.Sleep(0);
                } // END while(active)

                sheduling = false;
                Idle(); // Sleep longer to save CPU only when there is nothing left in the queue
            } // END while(active)
            threadCount--;
        }

        private KeyValuePair<Thread, StartProcessParams> CreateThread(ProcessContainer processContainer, out bool anyExecuted)
        {
            var startProcessParams = new StartProcessParams
                                     {
                                         Input = processContainer.AvailibleData,
                                         Process = processContainer
                                     };
            // Start collecting data from the beginning.
            processContainer.NewData();
            var thread = new Thread(StartProcess)
                         {
                             Name = processContainer.Process.ToString()
                         };

            //availibleData.Track.AddActions(processContainer);
            // do it as early as possible: that's why I didn't put thCount++ inside StartProcess method.
            processContainer.ThreadCount++;
            threadCount++;

            //thread.Start(startProcessParams);
            processContainer.WasExecuted = true;
            anyExecuted = true;
            return new KeyValuePair<Thread, StartProcessParams>(thread, startProcessParams);
        }

        private void StartProcess(object startProcessParams)
        {
            var args = (StartProcessParams)startProcessParams;
            object[] output = args.Process.Process.Do(args.Input);

            if (output != null)
                foreach (object item in output)
                {
                    // if some data remain unchanged, then use the previous container
                    // to keep a process trace.
                    DataContainer container;
                    if (args.Input.FindContainer(item) != null)
                    {
                        container = args.Input.FindContainer(item);
                        container.Track.Add(args.Process);
                    }
                    else
                        container = new DataContainer
                                    {
                                        Data = item,
                                        Source = args.Process
                                    };
                    SendData(container);
                }

            args.Process.ThreadCount--;
            threadCount--;
        }

        #region Nested types

        public class OutputEventArgs : EventArgs
        {
            private readonly object output;
            private readonly IDataProcess process;

            public OutputEventArgs(object output, IDataProcess process)
            {
                this.output = output;
                this.process = process;
            }

            public object Output
            {
                get { return output; }
            }

            public IDataProcess Process
            {
                get { return process; }
            }
        }

        private class StartProcessParams
        {
            public InputSet Input;
            public ProcessContainer Process;
        }

        #endregion
    }


    static class ListExtensions
    {
        public static void Enqueue<T>(this LinkedList<T> list, T item)
        {
            list.AddLast(item);
        }
        public static T Dequeue<T>(this LinkedList<T> list)
        {
            int count = list.Count;
            if (count == 0)
                throw new InvalidOperationException("List is empty.");
            T item = list.First.Value;
            list.RemoveFirst();
            return item;
        }
    }
}

namespace System.Runtime.CompilerServices
{
    [AttributeUsage(AttributeTargets.Method, AllowMultiple = false, Inherited = false)]
    public class ExtensionAttribute : Attribute
    {
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer
Poland Poland
My name is Jacek. Currently, I am a Java/kotlin developer. I like C# and Monthy Python's sense of humour.

Comments and Discussions