Click here to Skip to main content
14,028,787 members
Click here to Skip to main content
Add your own
alternative version

Stats

7K views
300 downloads
12 bookmarked
Posted 1 Mar 2019
Licenced CPOL

Compact Framework for Tasks Flow Control and Parallel Computing

, 29 Mar 2019
Rate this:
Please Sign up or sign in to vote.
This article presents a compact framework for managing operations flow in various control, simulation and testing applications.

Contents

Introduction

Tasks (operations) flow control plays an important role in many software projects. Its reliable and effective implementation may be vital for project success. Several years ago, I already presented an article with a similar scope entitled Tiny Framework for Parallel Computing. Although the main approach of that work remained unchanged, some concepts are evolving, and design is remarkably different. Implementation of major types was simplified, and additional important and useful features were added to the infrastructure. My intention was to make this article self-sustained and independent of its predecessor, so some concepts are reiterated. The subject of this framework is control of processes consisting of separate operations performing either sequentially or in parallel. Examples of such processes can be machine control, complex software testing, financial flows, etc. Code is written in C# and targeted to .NET Core, but can be easily ported to other languages. It was tested in Windows and Linux (Ubuntu 18.04).

Main Concept

The two main entities of the presented framework are command and processor (that's why I called it Processor-Command Framework, or in short PCF). Command constitutes any class that implements ICommand interface. The main method of this interface is method Do(). This method performs useful activity of the command. Processor constitutes types responsible for commands proper queuing and execution of their Do() method. The methods may be called either sequentially or in parallel according to the way they were queued. Processor performs all required actions for Do() execution, like error handling, logging, etc.

The main idea of this framework is clear and strict separation between command and processor. Such an approach has several advantages, both technical and organizational. Those advantages are:

  • High flexibility in commands implementation. Assemblies (DLLs) containing commands can be loaded at runtime.
  • Processor related code is relatively stable. Being written once, it will seldom be changed, if at all. This will considerably reduce amount of testing.
  • While processor development requires multi-threading and synchronization, writing of command code is generally (however not always!) simpler, but requires more knowledge in activity domain. So the commands may be developed in many cases by technical support and QA staff.
  • The framework is applicable to tasks flow in various fields. Its activity domain is defined by its set of commands, generally loaded dynamically at runtime.

Design

Relationships between main elements of the framework are depicted below:

Infrastructure Components

The following table shows infrastructure components and their brief description:

Component Description
AsyncLockLib Synchronization types for async-await methods
CommandLib Command and State related interfaces and types
CommandsAssemblyLoaderLib Dynamic commands DLL loader
ConfigurationLib Application configuration support
LogInterfaceLib ILog interface
LogLib Logger implementation based on NLog
ProcessorLib Processor and queues related types
SerializationLib Objects serialization support
SignalRBaseHubClientLib Base class for SignalR hub clients
SignalRBaseHubServerLib Base class for SignalR hub providing object streaming
StreamingDataProviderLib Interfaces and base class for streaming data provider
TcpHelperLib Component for TCP communication

Command

In the scope of this framework, every action is a command. It can be domain related procedure like e.g. some measurements or interaction with data storage. Or command may call remote service. This can be performed either in blocking mode when remote method call gets service's response, or asynchronically, when the response will be received later on by another object. The asynchronous call will be discussed below in chapter State. Command can also switch some operation mode. It can also create a new command or even create a new processor. Logging in this framework also operates as sequential commands (please see details below).

All commands derived from abstract base class Command implementing interface ICommand with its main method Do() described above. To be executed, command first should be enqueued by appropriate processor. For the sake of flexibility, commands may be placed in a separate assemblies loaded dynamically at runtime. Such a loading is carried out with CommandsAssemblyLoader class of CommandsAssemblyLoaderLib infrastructure component. This class also provides public method CreateCommand() implementing interface ICommandFactory to create command from the uploaded assembly.

When commands containing assembly are loaded dynamically, constructor of a contained command is not available in caller code. So command may be created in three different ways. The first way is usage of reflection with Activator.CreateInstance() method. This approach does not require additional code but is very slow and therefore does not suit for often created commands. The second way is to use Expression.Lambda() method approach implemented in static class CommandCreator of CommandLib infrastructure component. This is approach is much faster as compared to direct reflection usage, but still less efficient with respect to constructor call. So in case when the command type needs to be often instantiated, it would be useful to implement class CommandFactory : ICommandFactory which in its method CreateCommand() calls constructor of command required. Type CommandsAssemblyLoader for command creation works as follows. First, in its method Load() after loading commands assembly, it tries to find there command factory implementing interface ICommandFactory. If such type is found, then it is instantiated with reflection once and is kept in the instance of CommandsAssemblyLoader. On command creation, method CreateCommand() tries to use this command factory (if found). If the command factory was not found or failed to create required command, then the Expression Lambda technique is used.

Processor

ProcessorLib component is responsible for running commands. It provides environment for commands queueing and execution. Commands are queued with Processor and ProcessorQueue types according to their priorities. Processor.ProcessAsync() executes Command.Do() methods:

private Task<ParallelLoopResult> ProcessAsync(params ICommand[] commands)
{
    if (commands == null)
        return null;

    return Task.Run(() =>
    {
        return Parallel.ForEach(commands, command =>
        {
            if (command == null || command.IsProcessed || command.Err != null)
                return;

            var commandDescription = 
                $"COMMAND Type: \"{command.GetType()}\", 
                Id:\"{command.Id}\", Priority: {command.Priority} ";

            _log?.Debug($"{_logPrefix}{commandDescription} - BEGIN.");

            try
            {
                command.Do();
                command.IsProcessed = true;

                _log?.Debug($"{_logPrefix}{commandDescription} - END.");
            }
            catch (Exception e)
            {
                command.Err = new Error { Ex = e };

                _log?.Error($"{_logPrefix}{commandDescription} produced the following exception: ",e);

                try
                {
                    _actionOnException?.Invoke(command, e);
                }
                catch (Exception ex)
                {
                    var msg = $"{_logPrefix}Exception in exception handler for 
                                 command \"{command.Id}\".";
                    _log?.Fatal(msg, ex);
                    throw new Exception(msg, ex);
                }
            }
            finally
            {
                if (command.IsProcessed)
                {
                    var now = DateTime.Now;
                    command.ProcessingLag = now - command.TimeStamp;
                    command.TimeStamp = now;

                    SetMaxProcessingLag(command.GetType(), command.ProcessingLag);
                }
                
                _log?.Debug($"{_logPrefix}{command}");
            }
        });
    });
}

In its constructor, Processor class gets logger implementation (if required) to avoid circular references with LogLib component, and optionally action on exception which may happen while executing Command.Do() method. Processor's methods:

public void Enqueue(params ICommand[] commands)

and:

public void EnqueueParallel(params ICommand[] commands)

allow caller to enqueue array of commands either sequentially or in parallel.

Note: Commands enqueued for sequential execution by the same Enqueue() method will be executed according to their individual priorities. However, commands enqueued for parallel execution by the same EnqueueParallel() method will be executed according to their highest priority.

State

Class State belongs to CommandLib component. As it is clear from its name, it maintains the state of the system. This is singleton with protected multi-threaded access properties dictionary.

private static readonly ConcurrentDictionary<string, object> _cdctProperties =
            new ConcurrentDictionary<string, object>();

Command takes data from State for processing and puts some of its output there to be used in upcoming commands. State also contains handlers for receiving and processing asynchronous messages (events) and streaming data from services. These handler objects are created by commands establishing connection to the services. Upon receiving asynchronous messages from services, they normally enqueue additional commands for processing those messages. This will be shown in our software sample discussion below.

Useful Features

Configuration

ConfigurationLib infrastructure component provides support for configuration reading from a dedicated JSON file. Class Configuration takes path to this file as an argument of its constructor and provides handy methods for reading data from the file. By default, JSON configuration file is named according to the following pattern: <Application name>.config.json.

Serialization

SerializationLib infrastructure component provides methods for binary serialization using Stream (static class SerializationBin ) and collection of useful extension methods for byte and JSON serialization.

Logging

Logging is presented with two components, namely LogInterfaceLib and LogLib. The former defines interface ILog whereas the latter provides its implementation with type Log : ILog. This implementation is based on well known NLog product and uses appropriate configuration file <Application name>nlog.config. The logger internally uses Processor-Command paradigm with its dedicated processor and command class LogCommand : Command.

Communication

It is important to provide convenient-to-use components for communication between commands and services. Presented infrastructure supports out-of-the-box communication via TCP sockets and SignalR technique.

TCP Sockets

Component TcpHelperLib provides infrastructure for TCP sockets communication including possibility of remote method call and data streaming. It is also configurable with JSON using Configuration type discussed above. Main public type for server and client alike is TcpHelper. This TCP communication mechanism implemented in the component described in details in my Code Project article, TCP Socket Off-the-shelf - Revisited with Async-Await and .NET Core.

SignalR

SignalR is a library allowing user to organize duplex communication with either WebSockets (preferred when available) or HTTP long polling. Component SignalRBaseHubServerLib provides SignalR server infrastructure whereas component SignalRBaseHubClientLib stands for the client side. SignalR communication technique used here is described in details (with very minor changes) in my Code Project article, Simple SignalR Data Streaming Infrastructure Out-of-the-Box.

Data Streaming

Both communication techniques used in this work and discussed above provide user with infrastructure for data streaming. Streaming here means procedure when client registers with server and subscribes for data sent by the server asynchronously. Server has data provider(s) responsible for generation of data. As soon as new piece of data has been generated, data provider sends it to all client subscribed for these data. Infrastructure component StreamingDataProviderLib provides generic base class StreamingDataProvider<T> for user streaming data providers. This class is parameterized with data type that will be streamed. Invocation of setter StreamingDataProvider<T>.Current triggers sending new data to subscribers. During this procedure, subscribers are checked for their validity, and non-valid ones are purged from the subscribers list.

Synchronization

Since in most of the places, multi-threading is based on async-await paradigm, the adequate synchronization mechanism is required. Infrastructure component AsyncLockLib provides appropriate synchronization classes AsyncLock from here and AsyncAutoResetEvent from this source.

Machine

MachineLib component is not part of the framework infrastructure. But it's the only type Machine provides some useful (although not mandatory) superstructure above Processor. Constructor of Machine in its current implementation loads basic configuration, creates main processor and enqueues the first command.

Test Samples

Discussion

Folder Tests contains two console test applications. Application CommanderTest includes three command classes. Its Main() method enqueues with a processor TestCommands with different priorities in variety of combinations. Commands SuspendCommand and ResumeCommand are used to suspend and then resume processor functioning. Please note that suspending command is executed by the main processor. Method Do() of SuspendCommand creates a new processor to execute ResumeCommand later and puts this processor to State. In order to resume main processor, ResumeCommand is enqueued with the resume processor. The command takes main processor from State and executes its method Resume(). Of course, in this simple case, we could execute resume method of the main processor directly without additional processor and command, but in more complex cases, usage of additional processor and command may be more appropriate. It is interesting to analyze sequence of commands execution and then play with different combinations of commands queueing. All commands are enqueued almost immediately since methods Enqueue() and EnqueueParallel() of class Processor call in their turn method Enqueue() of internal class ProcessorQueue. The latter enqueues commands, calls method Run(), executing in a thread of a thread pool, and immediately returns. However, the command of the highest priority enqueued with the first call of Processor.Enqueue() method, will be executed first (in the test this is command S-12). The rest of the commands will be executed according their priorities and enqueueing sequence. Expected output of the test is provided at the end of file ProgramCommanderTest.cs as a comment.

The second test is MainApp application. It works along with TcpSvc and SignalRSvc services. Its method Main() creates instance of Machine type. Constructor of class Machine reads JSON configuration file MainApp.config.json. From Machines array, it chooses machine with given "id": "Machine1". According to configuration, the constructor loads from directory $(OutputDir)/CommandLibs two external commands assemblies (DLLs) GeneralCommandsLib.dll and StreamHandlersCommandsLib.dll, and creates MainProcessor. Finally Machine constructor enqueues InitCommand with MainProcessor.

Note: Dynamically loaded assemblies may have references to some other assemblies, particularly, infrastructure ones. So it is recommended that all infrastructure assemblies should be referenced by main application.

Method Do() of InitCommand enqueues commands CreateTcpClientCommand and CreateSignalRClientCommand in parallel with MainProcessor. Each of them creates appropriate communication client object to be connected with TcpSvc and SignalRSvc services. Methods Do() of both commands call their respective asynchronous methods async void ConnectAsync() and immediately return. This way, lengthy connection establishing procedure with possible numerous retries does not block MainProcessor execution flow (actually because of such commands design it is not important whether enqueue them sequentially or in parallel). After connections to services were established, appropriate client objects are placed in State.

TCP and SignalR connection objects being inherently different, from user perspective, operate in much the same way. They provide possibility for duplex remote method calls and enable data streaming from service to client. To emphasize this similarity, both TcpSvc and SignalRSvc services use the same data provider component DtoProviderLib. It streams Dto object defined in ModelLib component. Both TCP and SignalR communications are configurable in files <Application name>.config.json. In case of TCP, constructor of TcpHelper class reads appropriate section of configuration file in server and client. For SignalR constructor of infrastructure class HubClient reads configuration section for client, and service which creates hub (in our case, this is SignalRSvc) reads configuration section for server. SignalR configuration provides boolean parameter "isSecureProtocol" which defines whether HTTP ("isSecureProtocol": false) or HTTPS ("isSecureProtocol": true) connection will be created.

How to Run?

Source code may be loaded to Visual Studio 2017 with PCF.sln solution and built. Output directory for MainApp will be $(OutputDir) = $(SolutionDir)/_bin/netcoreapp<version>. Assemblies that should be loaded at runtime will be placed according to configuration provided in file MainApp.config.json to directory $(OutputDir)/CommandLibs. After successful build, you may run services TcpSvc.dll, SignalRSvc.dll and application MainApp.dll either from Visual Studio or using console from their respective output directories with standard command dotnet. According to their NLog configuration files <DLL name>.nlog.config, the applications will log to their respective consoles and also to files $(OutputDir)/Log/<DLL name>-<Date YYYY-MM-DD>.txt.

Demo for this article is at the same time collection of all required files that may be deployed and run in Linux. Such a collection is formed by running _publish.cmd files for the services and MainApp. Those files contain the following command:

dotnet publish -o publish -c Release

This command results in publish directory containing all files required to run the applications (for unknown to me reason configuration file of MainApp.config.json is not automatically included and should be copied manually). Contents of these files for TcpSvc, SignalRSvc and MainApp should be copied to a single demo folder. In addition, to the same folder the directory CommandLibs containing main application runtime loaded assemblies (release version) should be placed. Then the services and main application may be started from demo folder with ordinary dotnet command.

To run demo in Linux, the first appropriate version of .NET Core (currently this is version 2.2) should be installed in Linux environment. The installation procedure for Ubuntu 18.04 x64 is described here. I tested TcpSvc, SignalRSvc and MainApp with Ubuntu 18.04 installed in Oracle VirtualBox using MobaXterm application for deployment and running the software.

Conclusions

This article presents extensible Processor-Command Framework (PCF) for operations flow control. The framework provides flexible mechanism for commands queueing and execution while enforces clear separation between processor and commands. Processor part is stable while commands ensure flexibility implementing variety of operations. PCF also offers out-of-the-box infrastructure for configuration, logging, TCP and SignalR communication, etc. This compact and simple-to-use framework is applicable in the activity domains with operations flow and can be used in conjunction with the Actor model products and message brokers.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Igor Ladnik
Software Developer (Senior)
Israel Israel


  • Nov 2010: Code Project Contests - Windows Azure Apps - Winner
  • Feb 2011: Code Project Contests - Windows Azure Apps - Grand Prize Winner



You may also be interested in...

Pro

Comments and Discussions

 
QuestionLooks pretty cool Igor Pin
Sacha Barber24-Mar-19 23:23
mvaSacha Barber24-Mar-19 23:23 
AnswerRe: Looks pretty cool Igor Pin
Igor Ladnik24-Mar-19 23:45
mvaIgor Ladnik24-Mar-19 23:45 
PraiseGreat Article Pin
Mahsa Hassankashi1-Mar-19 10:34
memberMahsa Hassankashi1-Mar-19 10:34 
GeneralRe: Great Article Pin
Igor Ladnik1-Mar-19 20:37
mvaIgor Ladnik1-Mar-19 20:37 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

Permalink | Advertise | Privacy | Cookies | Terms of Use | Mobile
Web02 | 2.8.190419.4 | Last Updated 29 Mar 2019
Article Copyright 2019 by Igor Ladnik
Everything else Copyright © CodeProject, 1999-2019
Layout: fixed | fluid