|
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
namespace DataDriven
{
/// <summary>
/// Manages processes which implement IDataProcess interface. The manager is initialized in deactivated state. Call the Activate method
/// to start executing processes. You can equeue data when inactive too.
/// Reference:
/// </summary>
public class DataProcessManager : IEnumerable<IDataProcess>
{
private readonly List<ProcessContainer> processes = new List<ProcessContainer>();
// Last = end of queue (where to enqueue), First = start of queue (where to dequeue from).
private readonly LinkedList<DataContainer> waitingData = new LinkedList<DataContainer>();
//private readonly List<ProcessInputRequirements> InputRequirements = new List<ProcessInputRequirements>();
private Thread loopProcessesThread;
private volatile bool active;
private volatile int threadCount = 0;
private volatile bool sheduling;
private volatile bool dataArrived;
private int? threadLimit = 2 * Environment.ProcessorCount + 1;
/// <summary>
/// Gets or sets maximum number of threads used for processing, excluding the manager's thread used to distribute input data.
/// By default it is set to twice the number of availible processors. If there are many processes which accept the same type
/// of input data and under some other circumstances then it may happen that more threads will be created.
/// </summary>
public int? ThreadLimit
{
get { return threadLimit; }
set
{
value += 1; // DPM thread.
if (active)
throw new InvalidOperationException("Cannot set a thread limit while manager is active.");
if (value < 1)
throw new ArgumentOutOfRangeException("value",
"DPM needs at least one thread to work. To deactivate DPM use the Deactivate method.");
if (value < threadCount)
throw new ArgumentOutOfRangeException("value",
"Cannot set a thread limit to less value than number of currently running threads. Consider setting a thread limit before activating the manager.");
threadLimit = value;
}
}
public bool IsEverythingDone
{
get
{
return waitingData.Count == 0 && threadCount == (active ? 1 : 0) && !sheduling;
}
}
#region IEnumerable<IDataProcess> Members
public IEnumerator<IDataProcess> GetEnumerator()
{
return processes.ConvertAll(dpc => dpc.Process).GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
#endregion
/// <summary>
/// Fires if there was produced an output which cannot be input for any process.
/// </summary>
public event EventHandler<OutputEventArgs> Output;
/// <summary>
/// Starts distributing input data. First go the enqueued stuff.
/// </summary>
public void Activate()
{
active = true;
loopProcessesThread = new Thread(LoopProcesses)
{
Name = "LoopProcess"
};
loopProcessesThread.Start();
}
/// <summary>
/// Stops sending input data to processes. All upcoming input will be enqueued.
/// However, existing process threads will not be stopped.
/// </summary>
public void Deactivate()
{
active = false;
loopProcessesThread.Join();
}
private void InvokeOutput(OutputEventArgs args)
{
EventHandler<OutputEventArgs> handler = Output;
if (handler != null) handler(this, args);
}
/// <summary>
/// Adds processes to the pool.
/// </summary>
/// <param name="dataProcess">List of processes to add.</param>
// /// <param name="inputRequirements">Input requirements.</param>
public void Add(IDataProcess dataProcess)
{
Add(new AddProcessParams(dataProcess));
}
/// <summary>
/// Adds a group of processes to the pool.
/// </summary>
/// <param name="processGroup">List of processes to add.</param>
public void Add(DataProcessSet processGroup)
{
foreach (AddProcessParams process in processGroup)
{
ProcessContainer container = GetContainer(process);
processes.Add(container);
}
}
/// <summary>
/// Adds processes to the pool and sets maximum number of threads.
/// </summary>
/// <param name="addParams">Options for a new added process.</param>
public void Add(AddProcessParams addParams)
{
if (active)
throw new InvalidOperationException("Deactivate the manager before modyfing the process pool.");
var container = GetContainer(addParams);
/*ProcessInputRequirements requirements = dataProcess.GetRequirements();
requirements.Process = container;
InputRequirements.AddActions(requirements);*/
processes.Add(container);
}
private static ProcessContainer GetContainer(AddProcessParams addParams)
{
return new ProcessContainer
{
Process = addParams.Process,
ThreadLimit = addParams.ThreadLimit,
SingleRun = addParams.SingleRun,
Requirements = addParams.InputRequirements,
AllowRecursion = addParams.AllowRecursion
};
}
/// <summary>
/// Resets status of processes which had been set to "single run" so they will be able to execute again.
/// </summary>
public void Reset()
{
lock (processes)
foreach (ProcessContainer processContainer in processes)
processContainer.WasExecuted = false;
}
/// <summary>
/// Distributes input data between processes.
/// The operation is performed in another thread, so the control returns immediately to the caller.
/// </summary>
/// <param name="param">Settings for sending data, like priority.</param>
/// <param name="data">Availible input data objects.</param>
public void SendData(SendDataParams param, IEnumerable data)
{
foreach (object o in data)
SendData(new DataContainer
{
Data = o,
SendParams = param
});
}
public void SendData(params object[] data)
{
SendData((IEnumerable)data);
}
public void SendData(SendDataParams param, params object[] data)
{
SendData(param, (IEnumerable) data);
}
public void SendData(IEnumerable data)
{
foreach (var o in data)
SendData(new DataContainer
{
Data = o
});
}
private void SendData(DataContainer data)
{
if (active)
lock (waitingData)
PriorityEnqueue(data);
else
PriorityEnqueue(data); // no lock necessary
}
void PriorityEnqueue(DataContainer data)
{
LinkedListNode<DataContainer> node = waitingData.Last;
if (node == null)
waitingData.AddFirst(data);
else
{
for (; node.Previous != null && data.SendParams.Priority < node.Value.SendParams.Priority; node = node.Previous)
{
}
waitingData.AddAfter(node, data);
}
dataArrived = true;
}
bool CanStartNewThread
{
get { return threadLimit == null || threadCount < threadLimit; }
}
private void Idle()
{
while (!dataArrived)
Thread.Sleep(0);
return;
}
private void LoopProcesses()
{
// A lot of overhead here, but it was necessary to avoid uncomfortable situations.
threadCount++;
while (active)
{
sheduling = true;
while (active)
{
DataContainer availibleData;
lock (waitingData)
{
// Here is an actual WHILE condition (needs to be inside a lock block).
if (waitingData.Count == 0)
break;
availibleData = waitingData.Dequeue();
}
bool anyValid = false;
bool anyExecuted = false;
bool enqueue = false;
var threadsToStart = new Dictionary<Thread, StartProcessParams>();
foreach (ProcessContainer processContainer in processes)
{
if (!((!processContainer.AllowRecursion && availibleData.Track.Contains(processContainer)) ||
(processContainer.SingleRun && processContainer.WasExecuted)))
{
if (processContainer.AvailibleData.WouldNeed(availibleData, processContainer.Requirements))
{
if ((!CanStartNewThread || (processContainer.ThreadLimit != null &&
processContainer.ThreadCount >= processContainer.ThreadLimit))
&& !availibleData.SendParams.IgnoreThreadLimit)
{
// If a process has reached it's thread limit or overall limit has been reached
// and data has NOT an IgnoreThreadLimit flag,
// then put the data at the end of the queue.
if (!anyExecuted) // prevent executing the same data more than once.
enqueue = true;
}
if (!enqueue)
{
processContainer.AvailibleData.AddInput(availibleData);
if (processContainer.Requirements.IsSatisfiedBy(processContainer.AvailibleData))
{
var threadAndParams = CreateThread(processContainer, out anyExecuted);
threadsToStart.Add(threadAndParams.Key, threadAndParams.Value);
}
}
anyValid = true;
}
}
} // END foreach process
if (enqueue)
lock (waitingData)
PriorityEnqueue(availibleData);
if (!anyValid)
InvokeOutput(new OutputEventArgs(availibleData.Data, availibleData.Source == null
? null
: availibleData.Source.Process));
foreach (KeyValuePair<Thread, StartProcessParams> pair in threadsToStart)
{
pair.Key.Start(pair.Value);
}
threadsToStart.Clear();
Thread.Sleep(0);
} // END while(active)
sheduling = false;
Idle(); // Sleep longer to save CPU only when there is nothing left in the queue
} // END while(active)
threadCount--;
}
private KeyValuePair<Thread, StartProcessParams> CreateThread(ProcessContainer processContainer, out bool anyExecuted)
{
var startProcessParams = new StartProcessParams
{
Input = processContainer.AvailibleData,
Process = processContainer
};
// Start collecting data from the beginning.
processContainer.NewData();
var thread = new Thread(StartProcess)
{
Name = processContainer.Process.ToString()
};
//availibleData.Track.AddActions(processContainer);
// do it as early as possible: that's why I didn't put thCount++ inside StartProcess method.
processContainer.ThreadCount++;
threadCount++;
//thread.Start(startProcessParams);
processContainer.WasExecuted = true;
anyExecuted = true;
return new KeyValuePair<Thread, StartProcessParams>(thread, startProcessParams);
}
private void StartProcess(object startProcessParams)
{
var args = (StartProcessParams)startProcessParams;
object[] output = args.Process.Process.Do(args.Input);
if (output != null)
foreach (object item in output)
{
// if some data remain unchanged, then use the previous container
// to keep a process trace.
DataContainer container;
if (args.Input.FindContainer(item) != null)
{
container = args.Input.FindContainer(item);
container.Track.Add(args.Process);
}
else
container = new DataContainer
{
Data = item,
Source = args.Process
};
SendData(container);
}
args.Process.ThreadCount--;
threadCount--;
}
#region Nested types
public class OutputEventArgs : EventArgs
{
private readonly object output;
private readonly IDataProcess process;
public OutputEventArgs(object output, IDataProcess process)
{
this.output = output;
this.process = process;
}
public object Output
{
get { return output; }
}
public IDataProcess Process
{
get { return process; }
}
}
private class StartProcessParams
{
public InputSet Input;
public ProcessContainer Process;
}
#endregion
}
static class ListExtensions
{
public static void Enqueue<T>(this LinkedList<T> list, T item)
{
list.AddLast(item);
}
public static T Dequeue<T>(this LinkedList<T> list)
{
int count = list.Count;
if (count == 0)
throw new InvalidOperationException("List is empty.");
T item = list.First.Value;
list.RemoveFirst();
return item;
}
}
}
namespace System.Runtime.CompilerServices
{
[AttributeUsage(AttributeTargets.Method, AllowMultiple = false, Inherited = false)]
public class ExtensionAttribute : Attribute
{
}
}
|
By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.
If a file you wish to view isn't highlighted, and is a text file (not binary), please
let us know and we'll add colourisation support for it.
My name is Jacek. Currently, I am a Java/kotlin developer. I like C# and Monthy Python's sense of humour.