I am using https://docs.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-2.1&tabs=visual-studio as a reference to create Background service and process queues.
I have ImportService.cs class where csvfile from HTTP request comes, then I want to add it to queue which processes that CSV file and writes results to database. This is my service class where I have
IBackgroundTaskQueue
instance
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using CsvHelper;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Services.Services
{
public class ImportService : BaseService, IImportService
{
private readonly IUploadDataRepository _uploadDataRepository;
private readonly ConfigurationSettings _configurationSettings;
public IBackgroundTaskQueue Queue { get; }
private const string AZURE_BLOB_CONTAINER = "blobcontainer";
public ImportService(IServiceProvider services, IBackgroundTaskQueue queue) : base(services)
{
_uploadDataRepository = services.GetUploadDataRepository();
_configurationSettings = services.GetService<ConfigurationSettings>();
Queue = queue;
}
public async Task UploadToBlobStorage(IFormFile file, int userId, Type type)
{
var fileFormat = GetFileFormat(file);
var tempFilePath = await GetTemporaryPath(file);
var fileName = userId.ToString() + "-" + DateTime.Now + "." + fileFormat;
string storageConnectionString = _configurationSettings.ConnectionStrings.BlobStorageConnection;
CloudStorageAccount account = CloudStorageAccount.Parse(storageConnectionString);
var blobClient = account.CreateCloudBlobClient();
var blobContainer = blobClient.GetContainerReference(AZURE_BLOB_CONTAINER);
await blobContainer.CreateIfNotExistsAsync();
await blobContainer.SetPermissionsAsync(new BlobContainerPermissions { PublicAccess = BlobContainerPublicAccessType.Blob });
CloudBlockBlob blockBlob = blobContainer.GetBlockBlobReference(fileName);
using (var fileStream = File.OpenRead(tempFilePath))
{
await blockBlob.UploadFromStreamAsync(fileStream);
}
Queue.QueueBackgroundWorkItem(async token =>
{
Console.WriteLine("ITEM QUEUED PROCESS IT??");
});
await _uploadDataRepository.Add(uploadData);
}
}
Below I'll add classes created from microsoft example:
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Services.Services.Contracts {
public interface IBackgroundTaskQueue {
void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem);
Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken);
}
}
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Services.Services {
public abstract class QueuedHostedService: BackgroundService {
private readonly ILogger _logger;
private IBackgroundTaskQueue TaskQueue {
get;
}
protected QueuedHostedService(IBackgroundTaskQueue taskQueue, ILoggerFactory loggerFactory) {
TaskQueue = taskQueue;
_logger = loggerFactory.CreateLogger < QueuedHostedService > ();
Console.WriteLine("QueuedHostedService initialized");
}
protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
_logger.LogInformation("Queued Hosted Service is starting.");
while (!cancellationToken.IsCancellationRequested) {
var workItem = await TaskQueue.DequeueAsync(cancellationToken);
try {
await workItem(cancellationToken);
} catch (Exception ex) {
_logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem));
}
}
}
private void DoWork(object state) {
Console.WriteLine("PROCCESS FILEE???");
}
}
}
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Services.Services {
public class BackgroundTaskQueue: IBackgroundTaskQueue {
private ConcurrentQueue < Func < CancellationToken, Task >> _workItems =
new ConcurrentQueue < Func < CancellationToken, Task >> ();
private SemaphoreSlim _signal = new SemaphoreSlim(0);
public void QueueBackgroundWorkItem(Func < CancellationToken, Task > workItem) {
if (workItem == null) {
throw new ArgumentNullException(nameof(workItem));
}
_workItems.Enqueue(workItem);
_signal.Release();
}
public async Task < Func < CancellationToken, Task >> DequeueAsync(CancellationToken cancellationToken) {
await _signal.WaitAsync(cancellationToken);
_workItems.TryDequeue(out
var workItem);
return workItem;
}
}
}
What I have tried:
My question is where should that file be processed? In ImportService? Or in QueuedHostedService? If in
QueuedHostedService
how should I pass and access that file? What would be the best practice for that? I wanted to create
DoWork()<\pre> function in <pre>QueuedHostedService
which processes that file but I'm not sure how. Or should processing be done in Import service class?