using System;
using System.Collections.Generic;
using System.Threading;
using Pegasus.Diagnostics;
using Pegasus.Threading;
namespace Pegasus.Workflow.Service.DefaultServices
{
/// <summary>
/// Implements the message service using the Pegasus ProcessQueue class.
/// Each message is queued and one worker thread dispatched the incomming messages.
/// </summary>
public class QueuedMessageService : WorkflowMessageService, IDispatchTimerTask
{
// Local Instance Values
private Dictionary<int, TimerTask> m_taskTable = new Dictionary<int, TimerTask>();
private ProcessQueue m_queue = new ProcessQueue();
/// <summary>
/// Initializes a new instance of the <see cref="QueuedMessageService"/> class.
/// </summary>
public QueuedMessageService()
{
m_queue.RegisterCallback( new WaitCallback( DispatchMessageFromQueue ) );
}
/// <summary>
/// Starts this instance of the service.
/// </summary>
/// <param name="workflowService"></param>
public override void Start( WorkflowService workflowService )
{
base.Start( workflowService );
m_queue.Start();
}
/// <summary>
/// Stops this instance of the service.
/// </summary>
public override void Stop()
{
base.Stop();
m_queue.Stop();
}
/// <summary>
/// Dispatches the timer event that was fired from the timer task.
/// </summary>
/// <param name="task">The task.</param>
/// <param name="parameters">The parameters.</param>
void IDispatchTimerTask.DispatchTimerTask( TimerTask task, WorkflowEventParameters parameters )
{
// Check Parameters
ParamCode.AssertNotNull( task, "task" );
ParamCode.AssertNotNull( parameters, "parameters" );
// Remove the task from our table.
lock( m_taskTable )
{
m_taskTable.Remove( task.GetHashCode() );
}
// Queue the message to be executed
m_queue.Enqueue( parameters );
}
/// <summary>
/// Processes the workflow start message.
/// </summary>
/// <param name="workflowId">The workflow id.</param>
/// <returns>
/// True if the message was process, False if the message should be recycled and processed later.
/// </returns>
protected override bool ProcessWorkflowStartMessage( int workflowId )
{
// Check Parameters
ParamCode.AssertRange( workflowId, 1, int.MaxValue, "workflowId" );
RestartParameters parameters = new RestartParameters();
parameters.WorkflowId = workflowId;
parameters.StartState = null;
m_queue.Enqueue( parameters );
return true;
}
/// <summary>
/// Processes the workflow start message.
/// </summary>
/// <param name="workflowId">The workflow id.</param>
/// <param name="startState">The start state.</param>
/// <returns>
/// True if the message was process, False if the message should be recycled and processed later.
/// </returns>
protected override bool ProcessWorkflowRestartMessage( int workflowId, string startState )
{
// Check Parameters
ParamCode.AssertRange( workflowId, 1, int.MaxValue, "workflowId" );
ParamCode.AssertNotEmpty( startState, "startState" );
RestartParameters parameters = new RestartParameters();
parameters.WorkflowId = workflowId;
parameters.StartState = startState;
m_queue.Enqueue( parameters );
return true;
}
/// <summary>
/// Processes the data exchange message.
/// </summary>
/// <param name="message">The message.</param>
/// <returns>
/// True if the message was process, False if the message should be recycled and processed later.
/// </returns>
protected override bool ProcessDataExchangeMessage( WorkflowMessageService.DataExchangeMessage message )
{
// Check Parameters
ParamCode.AssertNotNull( message, "message" );
m_queue.Enqueue( message );
return true;
}
/// <summary>
/// Processes the workflow event message.
/// </summary>
/// <param name="message">The message.</param>
/// <param name="sender">The sender.</param>
/// <param name="args">The <see cref="T:Pegasus.Workflow.Service2.WorkflowEventArgs"/> instance containing the event data.</param>
/// <returns>
/// True if the message was process, False if the message should be recycled and processed later.
/// </returns>
protected override bool ProcessWorkflowEventMessage( WorkflowMessageService.WorkflowEventMessage message, object sender, WorkflowEventArgs args )
{
// Check Parameters
ParamCode.AssertNotNull( message, "message" );
// sender can be null
ParamCode.AssertNotNull( args, "args" );
WorkflowEventParameters parameters = new WorkflowEventParameters();
parameters.Message = message;
parameters.Sender = sender;
parameters.Args = args;
m_queue.Enqueue( parameters );
return true;
}
/// <summary>
/// Processes the register workflow event message.
/// </summary>
/// <param name="time">The time.</param>
/// <param name="message">The message.</param>
/// <param name="sender">The sender.</param>
/// <param name="args">The <see cref="T:Pegasus.Workflow.Service2.WorkflowEventArgs"/> instance containing the event data.</param>
/// <returns></returns>
protected override int ProcessRegisterWorkflowEventMessage( DateTime time, WorkflowMessageService.WorkflowEventMessage message, object sender, WorkflowEventArgs args )
{
// Check Parameters
// time can be any valid date time
ParamCode.AssertNotNull( message, "message" );
// Sender can be null
ParamCode.AssertNotNull( args, "args" );
WorkflowEventParameters parameters = new WorkflowEventParameters();
parameters.Message = message;
parameters.Sender = sender;
parameters.Args = args;
TimerTask task = new TimerTask( this, time, parameters );
int messageId = task.GetHashCode();
m_taskTable.Add( messageId, task );
AppScheduler2.AppScheduler.ScheduleTask( task );
return messageId;
}
/// <summary>
/// Processes the unregister workflow event message.
/// </summary>
/// <param name="messageId">The message id.</param>
protected override void ProcessReleaseWorkflowEventMessage( int messageId )
{
// Check Parameters
// messageId is any value int
if( m_taskTable.ContainsKey( messageId ) )
{
TimerTask task = m_taskTable[ messageId ];
m_taskTable.Remove( messageId );
AppScheduler2.AppScheduler.UnscheduleTask( task );
}
}
/// <summary>
/// Dispatches the message from queue.
/// </summary>
/// <param name="parameter">The parameter.</param>
public void DispatchMessageFromQueue( object parameter )
{
// Check Parameters
ParamCode.AssertNotNull( parameter, "parameter" );
string typeName = parameter.GetType().Name;
switch( typeName )
{
case "RestartParameters":
RestartParameters restartParams = (RestartParameters) parameter;
if( restartParams.StartState == null )
{
DispatchWorkflowStartMessage( restartParams.WorkflowId );
}
else
{
DispatchWorkflowRestartMessage( restartParams.WorkflowId, restartParams.StartState );
}
break;
case "DataExchangeMessage":
case "WorkflowMessageService.DataExchangeMessage":
DispatchDataExchangeMessage( (WorkflowMessageService.DataExchangeMessage) parameter );
break;
case "WorkflowEventParameters":
WorkflowEventParameters eventParams = (WorkflowEventParameters) parameter;
DispatchWorkflowEventMessage( eventParams.Message, eventParams.Sender, eventParams.Args );
break;
default:
throw new ArgumentException( string.Format( "Unknown type {0} in DispatchMessageFromQueue() method", typeName ) );
}
}
}
}