Introduction into Dataflow Programming
What is dataflow programming all about? In classical imperative programming, a program is basically a set of operations working with mutable state thus effectively hiding data paths. Dataflow programming is more like a series of workers/robots on an assembly line, who execute only when input material arrives. Imperative programming style can introduce non-determinism in case of concurrent execution (multithreading) without proper synchronization. In dataflow programming, program execution depends on the actual data, or on the data availability to be precise. Dataflow programming yields completely deterministic programs.
Let’s introduce the concept of dataflow variable which is one of the main concepts of dataflow programming. Dataflow variable can have two possible states: bound (assigned a value) or unbound (no value has been yet assigned). Whenever a thread tries to read the value of unbound dataflow variable, it gets blocked until some other thread bounds the variable. Dataflow variable can be bound only once, successive tries to bind the variable will fail. So, what is dataflow programming? With dataflow variable, one can also build blocking queues and streams. Actor model can be implemented using such blocking queues.
Basically, you can get more information on dataflow programming from this Wikipedia article. Also there is a nice article in Groovy GPars guide.
Overview of the Article
This article presents basic implementations of dataflow variable in both C# and F#. Also, this article demonstrates examples of dataflow programming in C# using futures. The best effect of dataflow programming is achieved in programming languages that follow declarative model principles. In our case, C# is imperative language and programming in a dataflow style requires developers to be self-disciplined. Surprisingly, but F# being considered to be a functional programming language, and therefore following declarative programming paradigm, also enables developers to program in an imperative programming way (via mutable keyword). Adding dataflow variables to C# and F# does not make them automatically dataflow programming languages, because there is still no necessary syntactic sugar and language support.
Clojure is one of the most popular modern languages that enable dataflow programming. Clojure supports dataflow programming through premises. It is also possible to do a dataflow programming in other popular languages like Groovy, Scala, Ruby using open-source libraries like GPars for Groovy, but all those languages provide no syntactic support for dataflow variables. As a genuine dataflow programming language, I would distinguish Oz programming language which treats all variables as dataflow variables: reader trying to read an unbound/uninitialized variable will be blocked until variable is bound/initialized. On one hand, it saves us from famous
NullReferenceException exceptions, but on the other hand, it can introduce program hangs.
First, I will present implementations in C# and F# and later I will dig into the thread synchronization details.
Dataflow Variables in C#
Let’s start with the simple example of how to use a dataflow variable in C#.
var variable = new DataflowVariable<int>(); variable.Bind(value); int value = 1000 + variable;
C# is not very extensible when it comes to operator overloading (as you later see in F# implementation) and this is the reason we are using
Bind method here. Actually this is a matter of taste – whether to use operators when working with dataflow variables or simply properties/functions, but in my opinion, operators look more natural. What I love about C# is implicit conversion operators. Now the code itself:
public class DataflowVariable<t>
private readonly object syncLock = new object();
private volatile bool isInitialized = false;
private volatile object value;
private T Value
throw new System.InvalidOperationException
("Dataflow variable can be set only once.");
this.value = value;
isInitialized = true;
public void Bind(T newValue)
this.Value = newValue;
public static implicit operator T(DataflowVariable<t> myVar)
Dataflow Variables in F#
Let’s start with the simple example of how to use a dataflow variable in F#.
let myVar = new DataflowVariable<int>() myVar <~ value let value = (1000 + !!myVar)
Here we use operator (
<~) to bind the dataflow variable and operator (
!!) to read its value. Now the code itself:
type public DataflowVariable<'T> () =
let mutable value : option<'T> = None
let syncLock = new System.Object()
member private this.Value
with get() : 'T =
match value with
| Some(initializedVal) -> initializedVal
| None ->
lock syncLock (fun () ->
while (value.IsNone) do
and set(newVal : 'T) =
lock syncLock (fun () ->
match value with
| Some(_) -> invalidOp
"Dataflow variable can be set only once."
| None ->
value <- Some(newVal)
static member public (<~) (var:DataflowVariable<'T>, initValue:'T) =
var.Value <- initValue
static member public (!!) (var:DataflowVariable<'T>) : 'T =
You may have noticed
[<volatilefield>] attribute. As per pretty stingy documentation, this attribute effectively replaces
volatile keyword in C#, but I haven’t performed thorough testing to verify it. What? F# has no keyword for volatile fields? And this is as it has to be. Volatile fields belong to the domain of imperative programming and F#, being first of all functional programming language (which is the implementation of declarative model), tries to avoid shared state (remember mutable keyword?). F# does not support overloading of implicit conversion operators, that’s why we need some kind of dereferencing prefix operator (
!!). F# implementation is more elegant, because we expose Option type here and thus do not have to deal with
isInitialized field as in the case of C# implementation.
Implementation Details and Some Thoughts on Thread Synchronization
For synchronization in both implementations, I have used volatile fields in conjunction with a simple pattern for
Monitor.Pulse. You can get more information regarding
Monitor.Wait in this very nice article by Joe Albahari. Volatile fields here are used to prevent instruction reordering and ensure CPU cache synchronization. Also as an option, instead of using volatile field, we could use here
Thread.VolatileRead method (we do not need to use also
Thread.VolatileWrite because actual write is done in within the
lock statement which effectively prevents reordering and flushes and invalidates CPU cache, and anyway
Thread.VolatileWrite only flushes the CPU cache but does not invalidate it). Basically, the
static VolatileRead and
VolatileWrite methods in the
Thread class reads/writes a variable while enforcing (technically, a superset of) the guarantees made by the volatile keyword.
Dataflow Programming Examples in C# and F#
In C#, I will demonstrate a simple example of dataflow programming with Parallel extensions library (futures and continuations). Basically using
Task.Factory.ContinueWhenAll, one can achieve similar results as with dataflow variables, but dataflow variables provide developers with much more flexibility.
var input1 = new DataflowVariable<int>();
var input2 = new DataflowVariable<int>();
var output1 = new DataflowVariable<int>();
var output2 = new DataflowVariable<int>();
Task<int> task1 = Task.Factory.StartNew<int>(
output1.Bind(input1 + input2);
Task<int> task = Task.Factory.StartNew<int>(() =>
output2.Bind(input1 + output1);
Console.WriteLine(10 + output1 + output2);
This article described the basic implementation of dataflow variables in C# and F# programming languages and basic examples of dataflow programming using continuations/futures. Please consider this article as a starting point in a journey into the world of dataflow programming.
- 3rd September, 2010: Initial post