|
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Clifton.Concurrent
{
public class TaskMessageQueue
{
protected Queue<TaskMessage> msgQueue;
protected List<Task> taskList;
protected Semaphore sem;
protected string name;
/// <summary>
/// Returns true if the queue has listening tasks.
/// </summary>
public bool HasListeners
{
get { return taskList.Count > 0; }
}
/// <summary>
/// Returns the number of listening tasks.
/// </summary>
public int NumTasks
{
get { return taskList.Count; }
}
public int NumMessages
{
get { return msgQueue.Count; }
}
/// <summary>
/// Returns the queue name.
/// </summary>
public string Name
{
get { return name; }
}
protected TaskMessageQueue()
{
}
/// <summary>
/// Constructor for the message queue.
/// </summary>
/// <param name="name"></param>
public TaskMessageQueue(string name)
{
if (String.IsNullOrEmpty(name))
{
throw new ArgumentNullException("Name cannot be null or empty.");
}
msgQueue = new Queue<TaskMessage>();
taskList = new List<Task>();
sem = new Semaphore(0, Int32.MaxValue);
this.name = name;
}
/// <summary>
/// Adds a task to the task list associated with this queue.
/// </summary>
public void AddTask(Task task)
{
if (taskList.Contains(task))
{
throw new TaskMessageManagerException("Task already associated with this queue.");
}
taskList.Add(task);
}
/// <summary>
/// Removes a task associated with this queue.
/// </summary>
public void RemoveTask(Task task)
{
if (!taskList.Contains(task))
{
throw new TaskMessageManagerException("The task is not associated with this queue.");
}
taskList.Remove(task);
}
/// <summary>
/// Enqueues the message and releases the semaphore for one count, releasing one waiting task.
/// </summary>
/// <param name="msg"></param>
public void Enqueue(TaskMessage msg)
{
// Add the message in a thread safe manner.
lock (msgQueue)
{
msgQueue.Enqueue(msg);
}
// Release any task waiting for a message.
sem.Release();
}
/// <summary>
/// Dequeues the message, incrementing the semaphore by one count. This routine
/// will wait the specifiied timeout amount and return null if the wait expired
/// before a message was dequeued.
/// </summary>
public TaskMessage Dequeue(long timeout)
{
if (timeout > Int32.MaxValue)
{
throw new ArgumentException("Timeout cannot exceed Int32.MaxValue.");
}
TaskMessage ret = null;
bool released = sem.WaitOne((int)timeout, false);
// If there are messages queued...
if (released)
{
// Get the message in a thread safe manner.
lock (msgQueue)
{
ret = msgQueue.Dequeue();
}
}
// Otherwise return null, indicating timeout.
return ret;
}
}
}
|
By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.
If a file you wish to view isn't highlighted, and is a text file (not binary), please
let us know and we'll add colourisation support for it.