Click here to Skip to main content
6,629,885 members and growing! (21,580 online)
Email Password   helpLost your password?
Development Lifecycle » Design and Architecture » Application Design     Intermediate License: The Code Project Open License (CPOL)

Data-controlled processes application design

By Jacek Gajek

A ready-to-use process manager provided.
C# 3.0, Windows, .NET 2.0, Architect, Dev
Posted:3 Nov 2009
Views:1,732
Bookmarked:9 times
Announcements
Loading...
 
Search    
Advanced Search
Add to IE Search
printPrint   add Share
      Discuss Discuss   Broken Article?Report  
6 votes for this article.
Popularity: 3.58 Rating: 4.60 out of 5

1

2
1 vote, 16.7%
3
1 vote, 16.7%
4
4 votes, 66.7%
5

Introduction

We've got used to understanding a program as a flow of instructions.

Program {
    init
    wait for user input
    OnUserRequest {
        process the request
        send commands to a database
        process 1 on data 1
    process 2 on data 1
    process 1 on data 2
    join processing results
        present the results
    }
    wait until user ask me to do something
    OnUserRequest {
        process 1 on data 1
    process 3 on data 1
    process 1 on data 2
    process 2 on data 2
    join processing results
        update a database
    }
    user has had enought
    exit
}

somewhere here data is flying around:
data 1
data 2

The instructions are most important here. If we suspend a program, then we know what instruction is currently performed. The data has secondary importance, and we need additional development tools which would determine what information is currently processed. Moreover, all operations (like "process n on data m") are performed sequentially. A developer needs to put in extra effort to parallelize them. In my opinion, it should be done automatically -- if a process is able to operate on a specific kind of data, then it should just do it in a separate thread and signalize when it's done. Why should process 1 on data 1 be performed after process 1 on data 2 if they do not depend on each other? If process 1 on data 1 is repeated on the second user request, why do we have to use a complicated syntax to do it, like instantiating an object of a class which contains the appropriate methods?

The idea is to let processes float around and concentrate on data, which is the most important actor in today's applications.

Program {
    (...)
    send data 1
    send data 2
    (...)
}

Processes {
    process 1
    process 2
    process 3
}

Let's send data to a Data Process Manager (DPM) which automatically starts a chain of suitable processes. Then, we can say to a user, "Please wait a moment until all processing is complete". No direct messing with threads anymore!

Using the Code

To apply a data process design pattern, you are supposed to split your program into independent processes which can perform atomic operations on data. Programmatically speaking, you have to create a class implementing the IDataProcess interface per process.

To give a practical example of how to use the data process pattern, we will check the Pythagorean trigonometric identity:

As said above, the equation must be divided into independent processes. Here they are:

  1. Sin2 evaluator
  2. Cos2 evaluator
  3. Adder

After implementation, we will have to somehow arrange them to execute in the correct order. Since everything is controlled by data and all processes may be running concurrently, how do we ensure that an adder adds sin2x + cos2x, and not, for example, cos2x + x? How do we ensure that the adder process receives exactly two numbers?

Implementation of processes

A definition of a process consist of two parts:

  1. Input data specification. How many inputs are there and what requirements do they have to meet?
  2. Process body. A method which performs an operation on input data and produces output.

These two members are defined in the IDataProcess interface, which we are going to implement right now.

class Sin2Process : IDataProcess
{
    public object[] Do(InputSet input)
    {
        var number = input.SingleData<Number>();
        number.X = Math.Sin(number.X);
        number.X *= number.X;
        return new[] { number };
    }

    public ProcessInputRequirements GetRequirements()
    {
        return new ProcessInputRequirements(o => o.Data is Number);
    }
}

The Do method receives a set of input objects, which may come from different processes as long as they meet both explicitly defined and implied requirements. The first line acquires an object of type Number. Next, we perform mathematical operations, and return an array containing an element - a modified number. But wait: why do we pass an instance of a Number wrapper class instead of a double? Because, this process modifies (performs operations) on input data and does not create new instances. If a new instance was created, then the Data Process Manager (DPM) could not determine the data's "trace", which basically is a list of processes which operated on it. Remember the rule:

Never create a new instance of data if not necessary.

Thus, data which travels from one process to another should be of a reference type (a class). That is, neither a struct nor a primitive type. You can modify contents of passed objects, but do not create new ones unless it is indispensable.

The GetRequirements method is more intriguing. It returns a ProcessInputRequirements instance, which tells DPM when it can call the Do method. There are various types of restrictions you can add. For now, let's stay with the syntax above. We define a lambda expression which predicates if data can be accepted.

We can define a Cos2Process in a similar manner.

class Cos2Process : IDataProcess
{
    public object[] Do(InputSet input)
    {
        var number = input.SingleData<Number>();
        number.X = Math.Cos(number.X);
        number.X *= number.X;
        return new[] { number };
    }

    public ProcessInputRequirements GetRequirements()
    {
        return new ProcessInputRequirements(o => o.Data is Number);
    }
}

An addition process is next in the row:

class AddProcess : IDataProcess
{
    public object[] Do(InputSet input)
    {
        Number[] numbers = input.AllData<Number>();
        double sum = numbers[0].X + numbers[1].X;
        return new[] { new Number { X = sum } };
    }

    public ProcessInputRequirements GetRequirements()
    {
        return new ProcessInputRequirements(a => a.Data is Number,
                                            b => b.Data is Number);
    }
}

As you can see, we can tell DPM that a process needs more than one input by separating multiple lambda expressions with commas. It can also be done this way:

return new ProcessInputRequirements
{
   {
       new PredicateRequirement
       {
           Predicate = a => a.Data is Number,

       }, 2
       }
};

Well, that's it. Now we have to add the defined processes to a DPM and send test data.

Using the Data Process Manager (DPM)

The DPM is an engine which stores all process instances and distributes incoming data among them. It checks all constraints, manages the number of threads, queues incoming data, and tracks objects to give a control over a process flow.

In the first place, we have to instantiate the DPM and add processes using a collection initializer.

var manager = new DataProcessManager
{
  new Sin2Process(),
  new Cos2Process(),
  new AddProcess()
};

This will add all three processes. Now, we have to handle output data.

// Fires if there was produced output which cannot be input for any process.
manager.Output += manager_Output;

static void manager_Output(object sender, DataProcessManager.OutputEventArgs e)
{
    Console.WriteLine("Output: {0}", e.Output);
}

Now, we send test data and activate the manager.

manager.SendData(new Number{X= 123d});

// You need to activate the manager to start process.
// Although you can still send data when the manager is activeted,
// it will produce unnecessary locking overhead
// on each SendData call. Activating is more expensive than locking, though.
manager.Activate();

Finally, we would like to deactivate the manager when everything is done. As it should be in a very-simple-to-use library, there is an IsEverythingDone property...

while (!manager.IsEverythingDone)
   Thread.Sleep(100);
manager.Deactivate();
Console.ReadKey();

Now, compile and run. Of course, it didn't work.

What exactly happened:

  1. The data of type Number is sent.
  2. The DPM executes the processes Sin and Cos. I also add the number to an individual list of available data for the Add process. All needed data will be cumulated as long as all required inputs are collected.
  3. The Sin and Cos processes start parallel, and return a modified number, which is still the same instance of a Number class.
  4. Since we still have only one instance of data, the Add process will never get two different inputs, and thus will not be executed.
  5. There are no processes which can use available data. The Output event is fired.

And the output is printed:

Output: 2,50373221144008E-05

Which in this case is equal to sin2(cos2x) or cos2(sin2x), depending on the threading scenario.

Thinking rationally, we would expect an infinite loop. Since Sin produces a Number, both Sin and Cos should be re-run. Why did it not happen? Because, the DPM does not allow recursion by default. You can override this setting for an individual process, with:

var manager = new DataProcessManager
{
  new AddProcessParams(new Sin2Process())
  {
      AllowRecursion = true
  },
  new Cos2Process(),
  new AddProcess()
};

Now, no output is printed because there is always a process which can accept available data. And so, sin2(sin2(...sin2(cos2(sin2 x))...)) is calculated. Note that there is no traditional procedural recursion. There will never happen a stack overflow. However, we must always look for unexpected recursion problems.

OK, back to our Pythagorean problem. How do we solve it? First of all, we need two instances of a Number class. So, we add another process, a Number Generator, which accepts a double.

class NumberGeneratorProcess : IDataProcess
{
    public object[] Do(InputSet input)
    {
        var number = input.SingleData<double>();
        return new[] { new Number { X = number } };
    }

    public ProcessInputRequirements GetRequirements()
    {
        return new ProcessInputRequirements(obj => obj.Data is double);
    }
}

First, we try a naive solution:

var manager = new DataProcessManager
{
  new NumberGeneratorProcess(),
  new NumberGeneratorProcess(),
  new Sin2Process(),
  new Cos2Process(),
  new AddProcess()
};

Of course, the DPM does not magically know what should be executed in the first place. The Add process creates a new instance of data so it is captured by the Sin and Cos processes again. At some point, there is not enough data to perform an addition, and the program ends.

Output: 1
Output: 1
Output: 0,905604165244399
Output: 0,905604165244399
Output: 0,992729192480268
Output: 0,992729192480268

Sequences, groups, and sets

I have provided an automatic way to control data flow by arranging processes into groups which have various properties. The complete and working solution of the problem above is shown here:

var manager = new DataProcessManager
{
  new DataProcessSequence
  {
      new DataProcessLooseSet
      {
          new DataProcessSequence
          {
              new NumberGeneratorProcess(),
              new Sin2Process()
          },
          new DataProcessSequence
          {
              new NumberGeneratorProcess(),
              new Cos2Process()
          }
      },
      new AverageSumProcess(),
      new AverageProcess()
  }
};

I think it's quite optimal. The complex and difficult processes of calculating sin and cos are executed in parallel. At the same time, the Add process waits until it has data from both of them. Output:

Output: 0,999896776901505
Output: 0,999534958029976
Output: 0,999724379719487
Output: 1

Because we added an AverageProcess, we get more and more accurate result at each iteration. In the end, every sin2x has its cos2x, and we get a trigonometric identity. Note: rounding errors may happen.

During object initialization, all this tree-like structure is flattened to a simple list. However, additional constraints are added. In this specific case, it will be:

  • Number Generator - no restriction
  • Sin/Cos2Process - the previous process must be a Number Generator. I mean, a concrete instance of that process which, in the listing above, stands directly over the Sin/Cos2Process.
  • Add Process - the previous process for each input must be either Sin2 Process or Cos Process. Note that the initializer "intelligently" determines that the only possible "endings" of the LooseSet before Add are Sin and Cos, because they are finish points of the sequences.

There are three structures provided:

  • DataProcessSequence - each process (excluding the first one) can operate on data coming from the previous one. However if the previous "process" of process A is a set of processes, then A can operate on data which comes from any of the possible endings of that set. I say "endings" instead of just "any of them", because sometimes, like in an example above, it is possible to narrow down the list of possible finish points.
  • DataProcessParallel - a process cannot use input coming from other processes in the same set.
  • DataProcessLooseSet - virtually does nothing, but it still stands as a set, so if put in a sequence, then it's treated as a single process.

You can create your own structures by implementing the DataProcessSet abstract class.

Points of Interest

Performance

The DP design pattern is strongly related to parallelism. The DPM starts each process in a separate thread, so for a process programmer, it is essential to properly distribute work among threads, taking a target machine configuration into account. DPM is very inefficient on single-core machines. In fact, it is slow. There is a lot overhead related to managing data flow, and it cannot be eliminated if we want to have a practical solution and do not have to think about everything. In the following example, we will modify the sin-cos problem to operate on ranges of numbers instead of numbers themselves. Now, the processes will exchange a Range object, and the NumberGenerator will accept a RangeSpecification.

class RangeSpecification
{
    public double LowerBound = 0;
    public double UpperBound = 1000;
    public double Step = 0.00001;
    // Warning: takes about 2 GB of memory

    public int Count
    {
        get { return (int)((UpperBound - LowerBound) / Step); }
    }
}

class Range
{
    public double[] Array;
    public override string ToString()
    {
        return string.Format("{0} numbers", Array.Length);
    }
}

I assume that you can imagine the rest of the code, but thinking performance-wise, I would like to put your attention on a great optimization used. In NumberGeneratorProcess:

class NumberGeneratorProcess : IDataProcess
{
    internal static readonly int RangeCount = 2 * Environment.ProcessorCount;

    public object[] Do(InputSet input)
    {
        var spec = input.SingleData<rangespecification>();
        
        var ranges = new Range[RangeCount];

        for (int i = 0; i < RangeCount; i++)
        {
            ranges[i] = new Range { Array = new double[spec.Count / RangeCount] };
            double x = spec.LowerBound + spec.Count * i * spec.Step / RangeCount;
            for (int j = 0; j < ranges[i].Array.Length; j++, x += spec.Step)
                ranges[i].Array[j] = x;
        }
        return ranges;
    }

    public ProcessInputRequirements GetRequirements()
    {
        return new ProcessInputRequirements(obj => obj.Data is RangeSpecification);
    }
}

What exactly was done? Instead of sending a single range object, we send a number of them. What is the result? My quad-core processor uses about 20-40% of its power. The DPM (by default) limits the number of threads to 2*ProcessorCount + 1 = 8 for processes and 1 for DPM exclusively. Since not all processes can run in parallel, 4*ProcessorCount threads are needed in the code above. As a result, the Cos aren't actually running in parallel, because an available thread pool is emptied by Sins because they are first in a list of processes. Now, if we add a line:

manager.ThreadLimit = 4 * Environment.ProcessorCount;

or remove the "*2" from NumberGeneratorProcess:

internal static readonly int RangeCount = Environment.ProcessorCount;

then CPU utilization is 100%. That is, all four cores use their full power. You should always the fit number of threads to a thread limit. You never want one process to wait for another. Although DPM tries to do its best, it is always better to manually handle the thread limit if you know something more about the processes you use.

Remarks

  • To increase performance and responsiveness, the Data Process Manager (DPM) limits the number of threads to twice the number of available CPU cores. You can change or turn off thread limiting using the DataProcessManager.ThreadLimit property. The limit does not include the manager's thread.
  • You can also adjust the individual thread limits for each process by setting the ThreadLimit in the AddProcessParams parameter of the Add method.
  • More threads than CPU count => longer waitng, more result at the same time.
  • The number of threads equal to CPU count => optimal performance, results come in packs of size the same as the CPU count.
  • Less threads than CPUs => less or no improvement due to a parallelization.
  • You do not have to stick to a single DataProcessManager instance. If you have groups of irrelevant processes, then spread them among a number of managers. Each manager fires its own Output event. Although OutputEventArgs has a Source property, splitting a big switch block into separate methods is recommended. However, a DPM is a quite heavy object.
  • You cannot add new processes while a DPM is active. Use the Deactivate method before adding a new process. Even when deactivated, you can neither modify nor remove already added processes.
  • Modifying the GetRequirements method will do anything -- it is called only once, when adding a process to the DPM.
  • Please use the message board in the bottom of this page to report bugs, suggest other solutions, or share interesting remarks.

History

  • 2009-11-02 - Original version posted.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

About the Author

Jacek Gajek


Member
My name is Jacek Gajek. I study computer science in the University of Technology in Wrocław. I like C#. I like cats. I like Monthy Python's sense of humour.
Location: Poland Poland

Other popular Design and Architecture articles:

Article Top
You must Sign In to use this message board.
FAQ FAQ 
 
Noise Tolerance  Layout  Per page   
  (Refresh) 
-- There are no messages in this forum --

General General    News News    Question Question    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

PermaLink | Privacy | Terms of Use
Last Updated: 3 Nov 2009
Editor: Smitha Vijayan
Copyright 2009 by Jacek Gajek
Everything else Copyright © CodeProject, 1999-2009
Web18 | Advertise on the Code Project