using System;
using System.Collections.Generic;
using System.Threading;
using Pegasus.Diagnostics;
using Pegasus.Threading;
namespace Pegasus.Workflow.Service.DefaultServices
{
/// <summary>
/// Implements the message service using the Pegasus AppThreadPool class.
/// Each message is dispatched to a worker thread.
/// </summary>
public class ThreadedMessageService : WorkflowMessageService, IDispatchTimerTask
{
// Local Instance Values
private Dictionary<int, TimerTask> m_taskTable = new Dictionary<int, TimerTask>();
private AppThreadPool m_threadPool;
/// <summary>
/// Initializes a new instance of the <see cref="T:ThreadedMessageService"/> class.
/// </summary>
public ThreadedMessageService()
{
m_threadPool = new AppThreadPool();
m_threadPool.UnhandledException += new AppThreadPool.UnhandledExceptionHandler( ThreadPool_UnhandledException );
}
/// <summary>
/// Gets the underlying thread pool.
/// </summary>
/// <value>The thread pool.</value>
public AppThreadPool ThreadPool
{
get
{
return m_threadPool;
}
}
/// <summary>
/// Starts this instance of the service.
/// </summary>
/// <param name="workflowService"></param>
public override void Start( WorkflowService workflowService )
{
base.Start( workflowService );
}
/// <summary>
/// Stops this instance of the service.
/// </summary>
public override void Stop()
{
base.Stop();
// Stop all pending tasks
foreach( TimerTask task in m_taskTable.Values )
{
AppScheduler2.AppScheduler.UnscheduleTask( task );
}
m_taskTable.Clear();
m_threadPool.JoinWorkers();
}
/// <summary>
/// Dispatches the timer event that was fired from the timer task.
/// </summary>
/// <param name="task">The task.</param>
/// <param name="parameters">The parameters.</param>
void IDispatchTimerTask.DispatchTimerTask( TimerTask task, WorkflowEventParameters parameters )
{
// Check Parameters
ParamCode.AssertNotNull( task, "task" );
ParamCode.AssertNotNull( parameters, "parameters" );
// Remove the task from our table.
lock( m_taskTable )
{
m_taskTable.Remove( task.GetHashCode() );
}
// Queue the message to be executed
m_threadPool.QueueUserWorkItem( new WaitCallback( ThreadWorkflowEventMessage ), parameters );
}
/// <summary>
/// Processes the workflow start message.
/// </summary>
/// <param name="workflowId">The workflow id.</param>
/// <returns>
/// True if the message was process, False if the message should be recycled and processed later.
/// </returns>
protected override bool ProcessWorkflowStartMessage( int workflowId )
{
// Check Parameters
ParamCode.AssertRange( workflowId, 1, int.MaxValue, "workflowId" );
m_threadPool.QueueUserWorkItem( new WaitCallback( ThreadStartMessage ), workflowId );
return true;
}
/// <summary>
/// Processes the workflow start message.
/// </summary>
/// <param name="workflowId">The workflow id.</param>
/// <param name="startState">The start state.</param>
/// <returns>
/// True if the message was process, False if the message should be recycled and processed later.
/// </returns>
protected override bool ProcessWorkflowRestartMessage( int workflowId, string startState )
{
// Check Parameters
ParamCode.AssertRange( workflowId, 1, int.MaxValue, "workflowId" );
ParamCode.AssertNotEmpty( startState, "startState" );
RestartParameters parameters = new RestartParameters();
parameters.WorkflowId = workflowId;
parameters.StartState = startState;
m_threadPool.QueueUserWorkItem( new WaitCallback( ThreadRestartMessage ), parameters );
return true;
}
/// <summary>
/// Processes the data exchange message.
/// </summary>
/// <param name="message">The message.</param>
/// <returns>
/// True if the message was process, False if the message should be recycled and processed later.
/// </returns>
protected override bool ProcessDataExchangeMessage( DataExchangeMessage message )
{
// Check Parameters
ParamCode.AssertNotNull( message, "message" );
m_threadPool.QueueUserWorkItem( new WaitCallback( ThreadDataExchangeMessage ), message );
return true;
}
/// <summary>
/// Processes the workflow event message.
/// </summary>
/// <param name="message">The message.</param>
/// <param name="sender">The sender.</param>
/// <param name="args">The <see cref="T:Pegasus.Workflow.Service2.WorkflowEventArgs"/> instance containing the event data.</param>
/// <returns>
/// True if the message was process, False if the message should be recycled and processed later.
/// </returns>
protected override bool ProcessWorkflowEventMessage( WorkflowMessageService.WorkflowEventMessage message, object sender, WorkflowEventArgs args )
{
// Check Parameters
ParamCode.AssertNotNull( message, "message" );
// sender can be null
ParamCode.AssertNotNull( args, "args" );
WorkflowEventParameters parameters = new WorkflowEventParameters();
parameters.Message = message;
parameters.Sender = sender;
parameters.Args = args;
m_threadPool.QueueUserWorkItem( new WaitCallback( ThreadWorkflowEventMessage ), parameters );
return true;
}
/// <summary>
/// Processes the register workflow event message.
/// </summary>
/// <param name="time">The time.</param>
/// <param name="message">The message.</param>
/// <param name="sender">The sender.</param>
/// <param name="args">The <see cref="T:Pegasus.Workflow.Service2.WorkflowEventArgs"/> instance containing the event data.</param>
/// <returns></returns>
protected override int ProcessRegisterWorkflowEventMessage( DateTime time, WorkflowEventMessage message, object sender, WorkflowEventArgs args )
{
// Check Parameters
// time can be any valid date time
ParamCode.AssertNotNull( message, "message" );
// sender can be null
ParamCode.AssertNotNull( args, "args" );
WorkflowEventParameters parameters = new WorkflowEventParameters();
parameters.Message = message;
parameters.Sender = sender;
parameters.Args = args;
TimerTask task = new TimerTask( this, time, parameters );
int messageId = task.GetHashCode();
m_taskTable.Add( messageId, task );
AppScheduler2.AppScheduler.ScheduleTask( task );
return messageId;
}
/// <summary>
/// Processes the unregister workflow event message.
/// </summary>
/// <param name="messageId">The message id.</param>
protected override void ProcessReleaseWorkflowEventMessage( int messageId )
{
// Check Parameters
// messageId is any value int
if( m_taskTable.ContainsKey( messageId ) )
{
TimerTask task = m_taskTable[ messageId ];
m_taskTable.Remove( messageId );
AppScheduler2.AppScheduler.UnscheduleTask( task );
}
}
/// <summary>
/// Process unhandled exception from the thread pool.
/// </summary>
/// <param name="callback">The callback.</param>
/// <param name="state">The state.</param>
/// <param name="e">The e.</param>
private void ThreadPool_UnhandledException( WaitCallback callback, object state, Exception e )
{
// Check Parameters
// callback could be null
// state could be null
ParamCode.AssertNotNull( e, "e" );
WorkflowService.FireUnhandledException( this, e );
}
/// <summary>
/// ThreadProc to process the workflow start message.
/// </summary>
/// <param name="parameter">The parameter.</param>
private void ThreadStartMessage( object parameter )
{
// Check Parameters
ParamCode.AssertNotNull( parameter, "parameter" );
ParamCode.Assert( parameter is int, "parameter", "Parameter is wrong type, expected WorkflowId (int), received {0}", parameter.GetType() );
DispatchWorkflowStartMessage( (int) parameter );
}
/// <summary>
/// ThreadProc to process the workflow restart message.
/// </summary>
/// <param name="parameter">The parameter.</param>
private void ThreadRestartMessage( object parameter )
{
// Check Parameters
ParamCode.AssertNotNull( parameter, "parameter" );
ParamCode.Assert( parameter is RestartParameters, "parameter", "Parameter is wrong type, expected RestartParameters, received {0}", parameter.GetType() );
RestartParameters restartParams = (RestartParameters) parameter;
DispatchWorkflowRestartMessage( restartParams.WorkflowId, restartParams.StartState );
}
/// <summary>
/// Threads the data exchange message.
/// </summary>
/// <param name="parameter">The parameter.</param>
private void ThreadDataExchangeMessage( object parameter )
{
// Check Parameters
ParamCode.AssertNotNull( parameter, "parameter" );
ParamCode.Assert( parameter is WorkflowMessageService.DataExchangeMessage, "parameter", "Parameter is wrong type, expected WorkflowMessageService.DataExchangeMessage, received {0}", parameter.GetType() );
DispatchDataExchangeMessage( (WorkflowMessageService.DataExchangeMessage) parameter );
}
/// <summary>
/// Threads the workflow event message.
/// </summary>
/// <param name="parameter">The parameter.</param>
private void ThreadWorkflowEventMessage( object parameter )
{
// Check Parameters
ParamCode.AssertNotNull( parameter, "parameter" );
ParamCode.Assert( parameter is WorkflowEventParameters, "parameter", "Parameter is wrong type, expected WorkflowEventParameters, received {0}", parameter.GetType() );
WorkflowEventParameters parameters = (WorkflowEventParameters) parameter;
DispatchWorkflowEventMessage( parameters.Message, parameters.Sender, parameters.Args );
}
}
}