Click here to Skip to main content
Click here to Skip to main content

Broadcasting Data using .NET TPL Dataflow

, 18 Jan 2014 CPOL
Rate this:
Please Sign up or sign in to vote.
How to broadcast data using .NET TPL dataflow

Introduction

.NET TPL Dataflow is built upon existing .NET TPL (Task Parallel Library) and mainly for promoting actor-based programming. In an actor-based model, an actor communicates with other actors or with the outside world by sending and receiving messages. Dataflow seems to be useful when you are dealing with multiple operations which has to communicate with each other or when you are dealing with data that has to be processed exactly when it becomes available.

The Dataflow components we are going to use in our sample are ActionBlock<T>, TransformBlock<T> and BroadcastBlock<T>. All blocks of TPL Dataflow implements IDataflowBlock interface. Complete() and Fault() are the member methods of the IDataflowBlock and Completion is the member property. Complete method is used for successful completion of the task. Fault method gets called on exceptions and Completion property returns a Task which will be executed by the Dataflow block asynchronously.

ITargetBlock and ISourceBlock are the interfaces which demand consideration for study as Dataflow blocks implement them. ITargetBlock is for receiving messages and ISourceBlock for sending the messages. ActionBlock implements ITragetBlock and is an execution block which calls Action delegate when it receives a message. TransformBlock implements both ITargetBlock and ISourceBlock and also an execution block. TransformBlock is capable of both sending and receiving messages and hence accepts a Func delegate. BroadcastBlock also implements both ITargetBlock and ISourceBlock and is mainly used in broadcasting messages to multiple components where components are interested in most recent value sent.

Jump Start

Let us build a sample WPF application which demonstrates broadcasting of messages using TPL Dataflow blocks using .NET 4.5 and Visual Studio 2012.

The requirement is building a simple Stock Index Display panel which displays stock index values. Display includes index name, current value, pt. change and % change. Broadcaster will broadcast index, current value and previous value for the stock index. Other display components like pt.change and % change should be calculated as transformations and should be displayed on user choice and when data is available.

I have designed the UI to look as below.

The flow can be as mentioned in the steps below:

  1. Build a broadcaster (using BroadcastBlock) for broadcasting stock indices randomly and asynchronously.
  2. Build an ActionBlock to receive the broadcasted message and display in UI (Index and Current Value).
  3. Build a TransformBlock to receive broadcasted message and then calculate the pt. change and % change and then send the result as message for UI.
  4. Build an ActionBlock to receive message sent from TransformBlock and display it in UI (Pt. change and % change).

Note: To work with TPL Dataflow, you need to download and install Microsoft.Tpl.Dataflow using NuGet. Once you create a new WPF project, open Package Manager Console and run the command Install-Package Microsoft.Tpl.Dataflow. Reference to System.Threading.Tasks.Dataflow will be added to the project automatically. Dataflow components are found under the namespace System.Threading.Tasks.Dataflow.

Let us code the application now.

1. Build the BroadCaster

Let's add a class StockIndexBroadCaster which acts as a broadcaster of stock indices for our sample asynchronously. We are going to add a method BroadCastIndexData() which returns output of type Task. Let us make the method async. As seen in the code, we have a Task.Delay for 2 seconds which makes StockIndexBroadCaster to broadcast messages for every 2 seconds.

Set the BroadcastBlock of StockIndex type through constructor and call the SendAsync method of the BroadcastBlock to broadcast messages asynchronously. SendAysnc method returns output of type Task and hence is awaitable.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace BroadCastingSample
{
    public class StockIndexBroadCaster
    {
        private readonly BroadcastBlock<StockIndex> _stockBroadCaster;

        public StockIndexBroadCaster(BroadcastBlock<StockIndex> broadcaster)
        {
            _stockBroadCaster = broadcaster;
        }

        /// <summary>
        /// Broadcasts stock indexes for every 2 seconds asynchronously
        /// </summary>
        /// <returns></returns>
        public async Task BroadCastIndexData()
        {
            var random = new Random();
            decimal previousval = 1000;
            for (int i = 0; i < 100; i++)
            {
                await Task.Delay(2000);
                var stockIndex = new StockIndex()
                {
                    Index = "TestIndex" + (i + 1),
                    CurrentValue = random.Next(1000, 5000),
                    PreviousValue = previousval
                };
                previousval = stockIndex.PreviousValue;
                await _stockBroadCaster.SendAsync(stockIndex);
            }
        }
    }
}

Our StockIndex class will look as below and will be a data carrier.

    public class StockIndex
    {
        public string Index { get; set; }

        public decimal CurrentValue { get; set; }

        public decimal PreviousValue { get; set; }
    } 

In MainWindow.xaml.cs, using BroadcastBlock as below.

 private BroadcastBlock<StockIndex> _broadcast;

        public MainWindow()
        {
            InitializeComponent();
        }

        private void Window_Loaded(object sender, RoutedEventArgs e)
        {
            LblCurrentValue.Content = string.Empty;
            LblIndex.Content = string.Empty;
            LblPercentChange.Content = string.Empty;
            LblPtChange.Content = string.Empty;
            _broadcast = new BroadcastBlock<StockIndex>(f => f); } 

2. Consume messages of broadcaster

Make the button click asynchronous and create an ActionBlock for updating UI. UpdateUiAtStart() method returns ActionBlock of StockIndex which updates the UI as seen below. Link the broadcaster to the UI updater (ActionBlock) using LinkTo method of the BroadcastBlock. Now start broadcasting.

        private async void BtnStart_Click(object sender, RoutedEventArgs e)
        {
            //Create an ActionBlock to update UI
            ActionBlock<StockIndex> updateMeAtStart = UpdateUiAtStart();

            //Link BroadCastBlock to the ActionBlock so that ActionBlock gets the message and updates the UI
            _broadcast.LinkTo(updateMeAtStart);

            //Start broadcastin stock indices
            var stockBroadCaster = new StockIndexBroadCaster(_broadcast);
            await stockBroadCaster.BroadCastIndexData();
        } 
    //Returns ActionBlock for updating UI once broadcasting starts
        private ActionBlock<StockIndex> UpdateUiAtStart()
        {
            return new ActionBlock<StockIndex>(x =>
            {
                LblCurrentValue.Content = x.CurrentValue.ToString();
                LblIndex.Content = x.Index;
            },
         new ExecutionDataflowBlockOptions()
         {
             TaskScheduler =
                 TaskScheduler.FromCurrentSynchronizationContext()
         });
        }  

3. Build the TrasformBlock and update the UI

Create an ActionBlock of Tuple<string, string>for updating UI after transformations. UpdateUiAfterTransformation() returns an ActionBlock which updates the UI after receiving message from TransformBlock.

Create a TransformBlock which accepts Func delegate as below and link broadcaster to changeCalculator for receiving broadcasted messages. Now let us link changeCalculator to updateChanges ActionBlock for updating the UI after processing the calculations.

        private void BtnStartTransofrmations_Click(object sender, RoutedEventArgs e)
        {
            //Create an ActionBlock to update UI for transformations
            ActionBlock<Tuple<string, string>> updateChanges = UpdateUiAfterTransformation();

            //TransformBlock which calculates pt.change and % change and returns the result
            var changeCalculator = new TransformBlock<StockIndex, 
                Tuple<string, string>>(f =>
            {
                decimal ptchange = f.CurrentValue - f.PreviousValue;
                decimal percentchange = 
                    (f.CurrentValue - f.PreviousValue) * 100 / f.CurrentValue;
                return new Tuple<string, string>
                    (ptchange.ToString(), percentchange.ToString());
            }
                );

            //Link BroadCastBlock to the TransformerBlock so that TransformerBlock gets message 
            //and calculates the changes and returns the result
            _broadcast.LinkTo(changeCalculator);

            //Linnk TransformerBlock to the ActionBlock so that 
            //ActionBlock gets the message and updates the UI
            changeCalculator.LinkTo(updateChanges);
        } 
   //Returns ActionBlock for updating UI once transformations are done
        private ActionBlock<Tuple<string, string>> UpdateUiAfterTransformation()
        {
            return new ActionBlock<Tuple<string, string>>(x =>
            {
                LblPtChange.Content = x.Item1;
                LblPercentChange.Content = x.Item2;
            },
         new ExecutionDataflowBlockOptions()
         {
             TaskScheduler =
                 TaskScheduler.FromCurrentSynchronizationContext()
         });
        } 

Note: More about LinkTo method can be found in this MSDN article.

Conclusion

We are able to build a broadcasting model using TPL Dataflow. It seems that building parallel dataflows using TPL Dataflow does not seem to be complicated and you can keep the code tidy.

Points of Interest

Exception handling and customizing dataflow blocks.

History

  • 18th January, 2014: Initial post

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Thrivikram Hathwar
Software Developer (Senior) Societe Generale
India India
No Biography provided

Comments and Discussions

 
QuestionApplication of TPL Pinmemberabhishekrvce19-Jan-14 17:48 
AnswerRe: Application of TPL PinmemberThrivikram Hathwar23-Jan-14 5:16 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web02 | 2.8.141015.1 | Last Updated 18 Jan 2014
Article Copyright 2014 by Thrivikram Hathwar
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid