65.9K
CodeProject is changing. Read more.
Home

How to Use TPL Dataflow for Reading Files and Inserting to Database

starIconstarIconstarIconstarIconstarIcon

5.00/5 (4 votes)

Dec 8, 2017

CPOL

1 min read

viewsIcon

13882

How to use TPL dataflow for reading files and inserting to database

Introduction

A very common scenario in applications is to read a number of files (e.g., .doc and .pdf files) and insert them to database. This article will explain and show how to use TPL Dataflow to create a pipeline for this work.

TPL Dataflow in my opinion is a very useful library which makes producer consumer pattern very easy and helps get rid of most synchronization primitives.

As there are few samples on this topic, I have decided to put up this sample.

Using the Code

Few methods I have omitted out like BulkInsertResumes which take IEnumerable<Resume> as input and return BulkImportResumeResult. BulkImportResumeResult is a class having 2 Lists as properties for holding inserted resumes to SQL, and another holds all the resumes which failed while inserting.

Dataflow_BulkImportResumes method takes IEnumerable<string> as input, user can call this method by passing a folder or multiple folders where either .doc or .pdf files are located.

This method creates a pipeline using TPD dataflow. First block is a TransformMany block which takes a string (folder) as input and returns multiple file URLs as string. First block is connected with 2 blocks, i.e., Pdf block and word block for reading pdf and word files. Since I wanted to use bulk insert to SQL, therefore pdf and word blocks are connected to batch block with batch size of 50. Batch block connects with insertosqlblock which does inserting to SQL. insertosqlblock is a Transform block which returns ImportResult as output. This block is connected to last block which performs indexing using lucene.Net.

public async Task Dataflow_BulkImportResumes(IEnumerable<string> folders)
        {            
            var firstBlock = new TransformManyBlock<string, string>(input =>
            {
                var files = new List<string>();
                files.AddRange(Directory.EnumerateFiles(input, "*.doc", SearchOption.AllDirectories));
                files.AddRange(Directory.EnumerateFiles(input, "*.pdf", SearchOption.AllDirectories));
                files.RemoveAll(x =>
                    (Path.GetFileName(x).StartsWith("~") &&
                        (x.EndsWith("doc", StringComparison.OrdinalIgnoreCase) ||
                        x.EndsWith("docx", StringComparison.OrdinalIgnoreCase))
                    ));

                outputOfTransformMany.AddRange(files.Distinct().ToList());
                return files.Distinct().ToList();
            });
            
            var pdfBlock = new TransformBlock<string, Resume>(file =>
            {
                var resume = new Resume();
                resume.Name = System.IO.Path.GetFileNameWithoutExtension(file);
                resume.Url = System.IO.Path.GetFileName(file);
                resume.DataInPlainText = PDFDocumentHelper.Read(file);
                resume.DateUploaded = System.IO.File.GetLastWriteTime(file).Date;
                return resume;
            });

            var wordBlock = new TransformBlock<string, Resume>(file =>
            {
                var resume = new Resume();
                resume.Name = System.IO.Path.GetFileNameWithoutExtension(file);
                resume.Url = System.IO.Path.GetFileName(file);
                resume.DataInPlainText = WordDocumentHelper.Read(file);
                resume.DateUploaded = System.IO.File.GetLastWriteTime(file).Date;
                return resume;
            }, new ExecutionDataflowBlockOptions 
                      { MaxDegreeOfParallelism = Environment.ProcessorCount });
            
            var batchBlock = new BatchBlock<Resume>(50);
            var insertToSqlBlock = new TransformBlock<IEnumerable<Resume>, BulkImportResumeResult>(x =>
            {
                return BulkInsertResumes(x);
            });

            var importResult = new List<BulkImportResumeResult>();
            var lastBlock = new ActionBlock<BulkImportResumeResult>(x =>
            {
                //build index using lucene
                luceneService.BuildIndex(x.ImportedResumes);
                importResult.Add(x);
            });

            var options = new DataflowLinkOptions { PropagateCompletion = true };
            firstBlock.LinkTo(pdfBlock, options, x => x.ToLower().EndsWith(".pdf"));
            firstBlock.LinkTo(wordBlock, options, 
                    x => x.ToLower().EndsWith(".doc") | x.ToLower().EndsWith(".docx"));

            pdfBlock.LinkTo(batchBlock);
            wordBlock.LinkTo(batchBlock);

            batchBlock.LinkTo(insertToSqlBlock, options);
            insertToSqlBlock.LinkTo(lastBlock, options);

            foreach (var item in folders)
            {
                firstBlock.Post(item);
            }

            firstBlock.Complete();

            await Task.WhenAll(pdfBlock.Completion, wordBlock.Completion)
                .ContinueWith(x => batchBlock.Complete());

            await lastBlock.Completion;

            wordBlock.Completion.ContinueWith(t =>
            {
                logService.Info("word block faulted");
            }, TaskContinuationOptions.OnlyOnFaulted);

            pdfBlock.Completion.ContinueWith(t =>
            {
                logService.Info("pdf block faulted");
            }, TaskContinuationOptions.OnlyOnFaulted);            
        }

Suggestions and improvements are most welcome.