Hey guys,
In our office we are working on large development of data processing.
The purpose of the project is to receive a large amount of text files via FTP servers,
and then execute on each file certain validations and at the end of the process to insert all the data into the SQL server.
Each file can contain millions of records.
Each entry in the file contains sections of information that are located according
to a predetermined position where each segment is validated according to a business model.
If a particular section is not validated, we insert the error neatly into the DataTable and at the end of the run process the data is saved in SQL using the Bulking method.
Here is the structure of the DataTable:
dtErrors = new DataTable();
dtErrors.Columns.Add(TblErrors_Consts.IdRow, typeof(int));
dtErrors.Columns.Add(TblErrors_Consts.OperatorId, typeof(int));
dtErrors.Columns.Add(TblErrors_Consts.FileTypeId, typeof(int));
dtErrors.Columns.Add(TblErrors_Consts.FileName, typeof(string));
dtErrors.Columns.Add(TblErrors_Consts.ErrorRow, typeof(int));
dtErrors.Columns.Add(TblErrors_Consts.ErrorCodeId, typeof(int));
dtErrors.Columns.Add(TblErrors_Consts.AddDate, typeof(DateTime));
And this is the method that is responsible for entering the record into a DataTable:
public void AddError(FileDetailsBE fd, int ErrorCode, int lineCounter = 0)
{
MethodBase m = MethodBase.GetCurrentMethod();
try
{
DataRow dr = dtErrors.NewRow();
dr[TblErrors_Consts.OperatorId] = fd.ParentSafe.OperatorId;
dr[TblErrors_Consts.FileTypeId] = fd.FileTypeId;
dr[TblErrors_Consts.FileName] = fd.FileName;
dr[TblErrors_Consts.ErrorRow] = lineCounter;
dr[TblErrors_Consts.ErrorCodeId] = (int)ErrorCode;
dr[TblErrors_Consts.AddDate] = CurrentDateTime;
dtErrors.Rows.Add(dr);
}
catch (Exception ex)
{
GP.JobBE.Errors.Add(new JobErrorBE(ex, m));
}
finally
{
Console.ForegroundColor = ConsoleColor.Green;
}
}
During our QA tests, we found that when all the records are wrong, the performance is drastically reduced when it is only at the point of saving the data in the temporary DataTable (even without reaching the database retention stage).
For this reason, we thought to parse all records in a file using Parallel.ForEach, and then of course we encountered a problem inserting a new record into the DataTable because the DataTable is not Thread Safe.
Even working with Try Lock has not solved this.
My question is this: How can all the rows be processed in a parallel method and that the wrong entries will still enter the DataTable?
This is the code section that is responsible for running on all records (DATA represents another DataTable which basically holds all the records that came from the text file):
Parallel.ForEach(DATA.AsEnumerable(), row =>
{
LineCounter++;
if (LineCounter % divider == 0)
{
SaveDataTablesToDB(fd);
}
try
{
line = row[0].ToString();
if (line.Trim().Length.Equals(0))
{
return;
}
eventCode = line.Substring(0, 3);
if (eventCode != ImportManager_Consts.Event999)
{
EventBE eventTableBE = GetRelevantTable(fd, eventCode);
if (eventTableBE != null)
{
if (line.TrimStart().TrimEnd().Length != eventTableBE.CharactersQty)
{
fd.Errors.AddError(fd, (int)ErrorCodes.IllegalEventLength, LineCounter);
return;
}
ProcessLine(fd, line, eventTableBE, LineCounter);
}
}
}
catch (Exception ex)
{
ConsoleUtils.WriteErrorLine_NoObject(string.Format(ImportManager_Consts.LineCounter, LineCounter.ToString()), GP.Info);
ConsoleUtils.WriteErrorLine_NoObject(string.Format(ImportManager_Consts.ProcessLine, line), GP.Info);
GP.JobBE.Errors.Add(new JobErrorBE(ex, m));
fd.Errors.AddError(fd, (int)ErrorCodes.GeneralError, LineCounter);
}
});
What I have tried:
I replaced the DataTable in a List that holds a business entity and added the errors into the List.
At the end of running all records, all items in the List are passed to the DataTable and then I make Bulking into the SQL.
I discovered that there are a lot of leaks and the data are not kept well at all.