using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Clifton.Tools.Data;
// Known issues with the PFX: http://blogs.msdn.com/pfxteam/archive/2007/11/29/6558570.aspx
// eek! Some of these bugs are rather surprising. I guess Microsoft doesn't practice "fix the bug before adding a new feature".
// How to cancel a task in PFX: http://blogs.msdn.com/pedram/archive/2008/01/02/how-to-cancel-a-task-in-parallel-fx.aspx
// First look at Parallel FX and self-replicating tasks: http://blogs.msdn.com/pedram/archive/2007/12/04/first-look-at-parallel-fx-and-self-replicating-tasks.aspx
namespace Clifton.Concurrent
{
public delegate void WaitCallback();
public class TaskMessageManager
{
internal static TaskMessageManager defaultTMM=new TaskMessageManager();
protected Dictionary<string, TaskMessageQueue> messageQueueMap;
protected Dictionary<Task, TaskMessageQueue> taskQueueMap;
public static TaskMessageManager Default
{
get {return defaultTMM;}
}
public bool HasTasks
{
get { return taskQueueMap.Count > 0; }
}
/// <summary>
/// Returns a Task instance given the task name, or null if not found.
/// </summary>
public Task GetTaskByName(string taskName)
{
Task ret=null;
if (String.IsNullOrEmpty(taskName))
{
throw new ArgumentNullException("The task name cannot be null or empty.");
}
lock (taskQueueMap)
{
foreach (Task t in taskQueueMap.Keys)
{
if (t.Name == taskName)
{
ret = t;
break;
}
}
}
return ret;
}
/// <summary>
/// Waits indefinitely until all tasks managed by the TMM have been completed.
/// </summary>
public void Wait()
{
while (HasTasks) { Thread.Sleep(10); }
}
public void Wait(WaitCallback callback)
{
if (callback == null)
{
throw new ArgumentNullException("Callback cannot be null");
}
while (HasTasks) { callback(); }
}
/// <summary>
/// Wait indefinitely until the queue is empty.
/// </summary>
/// <param name="queueName"></param>
public void Wait(string queueName)
{
if (String.IsNullOrEmpty(queueName))
{
throw new ArgumentNullException("Queue name cannot be null or empty.");
}
TaskMessageQueue tqm;
lock (taskQueueMap)
{
if (!messageQueueMap.ContainsKey(queueName))
{
throw new ArgumentException("The queue " + queueName + " is not part of the queue collection.");
}
tqm = messageQueueMap[queueName];
}
while (tqm.NumMessages > 0) { Thread.Sleep(10); }
//Trace.WriteLine("!tmm:" + queueName + " is empty.");
}
public void Wait(string queueName, WaitCallback callback)
{
if (String.IsNullOrEmpty(queueName))
{
throw new ArgumentNullException("Queue name cannot be null or empty.");
}
if (callback == null)
{
throw new ArgumentNullException("Callback cannot be null");
}
TaskMessageQueue tqm;
lock (taskQueueMap)
{
if (!messageQueueMap.ContainsKey(queueName))
{
throw new ArgumentException("The queue " + queueName + " is not part of the queue collection.");
}
tqm = messageQueueMap[queueName];
}
while (tqm.NumMessages > 0) { callback(); }
//Trace.WriteLine("!tmm:" + queueName + " is empty.");
}
/// <summary>
/// Registers a task with the message queue. The task cannot have already been
/// previous registered. The queue name is required to inform the TMM as to which
/// queue the task is listening.
/// </summary>
public void RegisterTask(Task task, string queueName)
{
if (String.IsNullOrEmpty(queueName))
{
throw new ArgumentNullException("The queue name cannot be null or empty.");
}
if (String.IsNullOrEmpty(task.Name))
{
throw new TaskMessageManagerException("The task must have a name.");
}
if (taskQueueMap.ContainsKey(task))
{
// It appears that the TaskManager is re-using task instances? Is this part of the thread pooling?
// For example, single stepping through the Clone method resulting in this exception when
// self replication was enabled.
throw new TaskMessageManagerException("The task " + task.Name + " is already registered.");
}
Debug.WriteLine("!tmm:Registering task: " + task.Name);
// Register the task in a thread safe manner into the task-queue map.
lock (taskQueueMap)
{
if (!messageQueueMap.ContainsKey(queueName))
{
// register a new task map.
messageQueueMap[queueName] = new TaskMessageQueue(queueName);
}
// Add the task to parties interested in the queue.
messageQueueMap[queueName].AddTask(task);
lock (taskQueueMap)
{
// We need to remember the queue to which the task is connected.
taskQueueMap[task] = messageQueueMap[queueName];
}
}
// Wire up the completed event so we can remove the task from our list when it's completed.
task.Completed += new EventHandler(OnTaskCompleted);
}
/// <summary>
/// Returns true if the task is registered, false otherwise.
/// </summary>
public bool IsRegistered(Task task)
{
if (String.IsNullOrEmpty(task.Name))
{
throw new TaskMessageManagerException("The task must have a name.");
}
bool ret;
lock (taskQueueMap)
{
ret = taskQueueMap.ContainsKey(task);
}
return ret;
}
/// <summary>
/// Posts a message to a queue, which will be consumed by the first task listening
/// on that queue that is released.
/// </summary>
public void PostMessage(string queueName, IMessage taskMessage)
{
PostMessage(null, queueName, taskMessage);
}
/// <summary>
/// Posts a message to a queue, which will be consumed by the first task listening
/// on that queue that is released.
/// </summary>
public void PostMessage(Task fromTask, string queueName, IMessage taskMessage)
{
lock (taskQueueMap)
{
if (!messageQueueMap.ContainsKey(queueName))
{
throw new ArgumentException("The queue name " + queueName + " doesn't exist.");
}
IMessage msg = CloneMessage(taskMessage);
PostMessage(messageQueueMap[queueName], new TaskMessage(fromTask, msg));
}
}
/// <summary>
/// Posts a message to the task's message queue.
/// </summary>
/// TODO: We may want this to post only to this task, even if the message queue is
/// shared by multiple tasks.
public void PostMessage(Task toTask, IMessage taskMessage)
{
IMessage msg = CloneMessage(taskMessage);
PostMessage(taskQueueMap[toTask], new TaskMessage(null, msg));
}
/// <summary>
/// Posts a message to the task's message queue.
/// </summary>
/// TODO: We may want this to post only to this task, even if the message queue is
/// shared by multiple tasks.
public void PostMessage(Task fromTask, Task toTask, IMessage taskMessage)
{
IMessage msg = CloneMessage(taskMessage);
PostMessage(taskQueueMap[toTask], new TaskMessage(fromTask, msg));
}
/// <summary>
/// Returns the next message that is in this task's message queue, waiting forever or
/// until the task is cancelled.
/// </summary>
public TaskMessage GetMessage()
{
return GetMessage(Task.Current, -1);
}
/// <summary>
/// Returns the next message that is in this task's message queue, waiting for the specified
/// timeout. If timed out, the TMM returns NoMessage.
/// </summary>
public TaskMessage GetMessage(int timeout)
{
return GetMessage(Task.Current, timeout);
}
/// <summary>
/// Clones the message. Will throw an exception if the message is not serializable.
/// </summary>
/// TODO: For now, we are cloning the message as opposed to sending a true
/// serialization of the message, then deserializing when we receive it. This
/// is satisfactory for an initial test of this architecture, but of course it
/// doesn't support sending messages across application boundaries.
protected IMessage CloneMessage(IMessage msg)
{
IMessage ret;
if (msg is IClonableMessage)
{
ret = ((IClonableMessage)msg).DeepClone();
}
else
{
ret = Cloner.CloneObject<IMessage>(msg);
}
return ret;
}
/// <summary>
/// Sends a serialized message to the target task. Calling this method
/// assumes that the IMessage instance has been serialized/cloned. This
/// ensures that the target task gets its own copy of the message and cannot
/// in any way reference the objects in the message from the sender.
/// </summary>
/// <param name="toTask"></param>
/// <param name="message"></param>
protected void PostMessage(TaskMessageQueue tqm, TaskMessage message)
{
Debug.WriteLine("!tmm:PostMessage " + tqm.Name + " -> " + message.Message.ToString());
lock (taskQueueMap)
{
// If it's stop message, queue up enough messages for the current number of listeners.
// This ensures that all listening tasks get the message (unless of course the task does
// something strange, like get a message twice).
if (message.Message is StopMessage)
{
for (int i = 0; i < tqm.NumTasks; i++)
{
tqm.Enqueue(message);
}
}
else
{
// Otherwise post one instance of the message.
tqm.Enqueue(message);
}
}
}
// The problem with this code is that it does all this testing and
// setup each time the task calls into it.
// Can this be changed so that the setup is done once for the task
// in question?
protected TaskMessage GetMessage(Task task, int timeout)
{
Debug.WriteLine("!tmm:GetMessage " + task.Name);
TaskMessageQueue tmQueue=null;
// We still need to honor tasks being cancelled, so even if the
// timeout is infinite (-1), internally, we don't block forever.
// Access the message queue map in a thread safe manner.
lock (taskQueueMap)
{
if (!taskQueueMap.ContainsKey(task))
{
throw new TaskMessageManagerException("The target task is not registered with the message manager.");
}
tmQueue = taskQueueMap[task];
}
TaskMessage tm = null;
Stopwatch startWaiting = new Stopwatch();
startWaiting.Start();
while (tm == null)
{
long maxTimeout = GetRemainingTimeToWait(startWaiting, timeout);
// The task timeout has been exceeded with no messages.
// Return to the task with a NoMessage message.
if (maxTimeout <= 0)
{
Debug.WriteLine("!tmm:Returning NoMessage");
tm = new TaskMessage(null, NoMessage.Default);
}
else
{
// Gets a message or null, in which case we continue waiting.
// This is a thread-safe call into our queue manager.
tm = tmQueue.Dequeue(maxTimeout);
/* -- Because of problems with interaction between multiple
* threads and "smart" termination of unecessary tasks, this
* code has been removed for the moment.
if (tm == null)
{
// If the task has nothing to do, and we've got additional
// tasks running on this same message queue, then stop this
// task. We don't need this task with no messages to process
// and other tasks listening for messages. This dynamically
// adjusts the number of tasks replicated to work on a particular
// message queue.
// Does this end up fighting with the TaskManager's
// task replcation algorithm? Should the task itself replicate?
// Should the message manager replicate the task instead?
lock (taskMessageQueueMap)
{
// this task can stop if there's another task that will
// be processing messages. But does the TaskManager
// clone the task when our task becomes busy again???
if (taskMessageQueueMap[task.Name].NumTasks > 1)
{
// Remove the task here, so that another task that
// times out with no messages doesn't remove itself
// as well (because the count of NumTasks is still
// >1 if we do this only at A (below).
taskMessageQueueMap[task.Name].RemoveTask(task);
Debug.WriteLine("!tmm:Sending StopMessage");
tm = new TaskMessage(null, StopMessage.Default);
}
}
}
*/
}
// But first, we check if the task has been cancelled.
ThrowIfCanceled(task);
}
return tm;
}
protected void OnTaskCompleted(object sender, EventArgs e)
{
// Obviously, the task will not be in the GetMessage loop.
Task task = (Task)sender;
//Trace.WriteLine("!tmm:Task Completed: " + task.Name);
try
{
// Remove the task in a thread safe manner.
lock (taskQueueMap)
{
// Remove the task from the queue list of task listeners.
// With SelfReplicating set as the task creation option,
// this throws a key not found exception if we set a breakpoint in
// the Clone routine and single step through that code, letting the
// tasks go through the process of checking for messages and
// reducing to 1 task, as there are no messages queued yet.
// Does the TaskManager create another task automatically?
// *** A
// We still have to do this here in case the task terminated
// normally. There probably is still an issue here, that
// normally terminating task and one waiting for messages
// will also terminate itself before this task has been
// removed.
taskQueueMap[task].RemoveTask(task);
// If there are no more listening tasks, remove the task from the map.
if (!taskQueueMap[task].HasListeners)
{
//Trace.WriteLine("!tmm:Tasks for " + taskQueueMap[task].Name + " are all completed.");
// This has the potential side effect of throwing an exception if the application
// thinks that the queue is still around after all tasks have terminated. This will
// be resolved when the queue is automatically created when messages are posted to it,
// or when we support TMM instances and use a single queue per TMM architecture.
messageQueueMap.Remove(taskQueueMap[task].Name);
}
taskQueueMap.Remove(task);
}
}
catch (Exception ex)
{
Debug.WriteLine("!tmm error:" + ex.Message);
}
}
protected long GetRemainingTimeToWait(Stopwatch startWaiting, int timeout)
{
// by default, on an infinite wait, we timeout every 1/4 second
// to check for thread cancel messages.
long ret = 250;
if (timeout != -1)
{
// This is the remaining time to wait that the task has requested.
ret = timeout - startWaiting.ElapsedMilliseconds;
}
return ret;
}
protected void ThrowIfCanceled(Task task)
{
if (task.IsCanceled)
{
Debug.WriteLine("!tmm:Cancelling: " + task.Name);
throw new TaskCanceledException(task);
}
}
protected TaskMessageManager()
{
messageQueueMap = new Dictionary<string, TaskMessageQueue>();
taskQueueMap = new Dictionary<Task, TaskMessageQueue>();
}
}
}
// what we need:
// message queues
// messages queues specific to each thread, by task name
// ability to add a message to a particular task
// ability for a task to get its next "job" (a message)
// the message manager is responsible for queue synchronization
// the message manager is responsible for task blocking
// a task can request to block infinitely or for a specific timeout
// when timing out, if there are no messages, the task gets a "no message" message returned.
// another built in message is "stop"
// messages are cloned, so that what's put in the queue is dereferenced from the sender.
// how would we simulate a thread-specific heap?
// This "known correctness bug" with PFX: "Tasks blocked for significant periods of time may cause runaway thread
// injection and out of memory conditions when the default policy is used. "
// is probably a show stopper because we will potentially be blocking for a long period of time if there are no messages
// to process.
// However, in the face of these obstacles, let us continue, with the faith that we won't encounter these issues and
// that shorty the TPL will be fixed.
// Another problem -- the task starts executing right away, which causes a synchronization problem when managing
// the task in a separate collection from that of the TaskManager. To work around this problem, we are going to
// have the task register itself. This means we are going, for the moment, to work with only a single instance
// of the task manager.
// The features of the Thread class ought to be exposed in the Task class, namely the ability to create the thread
// but not immediately run it, and also the ability to suspend and resume a thread. I suppose this would get into
// conflict with the TaskManager and perhaps goes beyond the design goals of the TaskManager.