Click here to Skip to main content
13,193,551 members (48,744 online)
Click here to Skip to main content
Add your own
alternative version

Stats

50.3K views
1.1K downloads
134 bookmarked
Posted 26 Sep 2016

Multithreading in C# .NET 4.5 (Part 2)

, 30 Sep 2016
Rate this:
Please Sign up or sign in to vote.
A high level overview of the latest threading technologies and patterns with examples.

Introduction

Part 1 established some of the basic principles about how multithreading improves performance as well as the give and take of performance in multithreading design elements. In Part 2, my goal is to expose the latest threading technologies with some high level usage of them. It's tips to get started and is not meant to be a deep dive into any single area of concern or technology.

Background

If you're not familiar with basic threading concepts, please review Part 1.

Before you read this article, you should be familiar with:

  • Collections
  • Basic multithreading
  • Thread slicing
  • Costs of threading
  • Design patterns

A Woven History of Objects

Despite the different tools required for threading and thread safety, it's proper thread management that is arguably most critical to ensuring thread performance optimization. The .NET 1.1 object that first provided thread management is the ThreadPool.

As with many technologies, it's the ability to build on what we've already done that enables us to do more which is definitely the case for threading. The ThreadPool and thread management is really what's being built on most so this article is going to focus on thread management.

Don't get me wrong. There are many other great .NET 1.1 threading objects to include:

  • Mutex/Semaphore
  • Monitor
  • ManualResetEvent/AutomaticResetEvent

When you get into advanced threading optimization, these are valuable tools, but there are two main reasons why I won't focus on them in this article:

  1. It's beyond the scope of the 80% case.
  2. Most modern threading technologies handle their purposes for you.

Fast forward to when .NET 4.0 comes along and we get a boom to threading with introduction of the Task Parallel Library (TPL) and Parallel LINQ (PLINQ). While these have a huge amount of additions to threading features and ease of development, you can really boil down the libraries to introducing the Task. To describe a Task as simply as possible, it's a Thread that has been automatically assigned to a ThreadPool.

Having the system manage threads should be the default behavior, but with the ThreadPool you still have to explicitly create and assign the resources. Having it be taken care of inherently in the Task just makes what we should be doing anyway easier. I regularly joke that my job as a programmer is to write as little code as possible, but these defaults are also meant to help prevent you from shooting yourself in the foot.

Jokes apart, while the TPL changes are about more than just writing less code, PLINQ is seriously just about writing less code.

Below is an example of PLINQ:

Parallel.For(
    0,
    1000,
    new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount },
    (i) => { /* Do work */ });

This is creating 1,000 Task objects which could be done in a simple for loop; however, you have to manage the index in a thread safe manner and there are plenty of standard thread controls you'd have to add in yourself. Notice the ParallelOptions parameter? It's going to limit the number of concurrent threads to the number of cores on the CPU - a good general recommendation.

The ParallelOptions is a great example of how modern threading technologies are leveraging older technologies. Concurrent thread execution limit is accomplished via a semaphore. So that's part of why, for now, I'm not explicitly going into Semaphore and other .NET 1.1 threading object usage.

In addition to the TPL and PLINQ, .NET 4.0 also introduced IObservable. While there was no direct support for IObservable in the standard .NET libraries, there was the easily consumable Reactive Extensions (Rx) via NuGet. Reactive Extensions also contain many features which improve the ease of development and management of threaded work. IObservable is actually leveraging the TPL too.

Next thing you know is .NET 4.5 incorporates the Asynchronous Task Protocol (ATP) and everybody is clamoring over it; however, similarly to the Task was built off of the older ThreadPool management, the ATP was really built off the existing continuation Task abilities.

The boost to efficiency with a continuation Task is that you don't have threads chewing up allocated slice time waiting for subsequent processes to complete. Additionally, it's more efficient to join back on a thread synchronization context, then invoke on the original thread or creating even more threads to handle the continuation.

Sixty Percent of the Time, It Works All the Time

ATP is not a silver bullet, unfortunately a lot of people inexperienced in threading treat it as such. There's still a lot more to learn than ATP to be able to multithread effectively.

Regardless of the awesome technologies available to work with, you have to make sure your code is thread safe. If you are not sure if a particular item you're using is thread safe, odds are it's not. Double check the object's thread safety before doing anything else with it.

As an example, the List{T} is not thread-safe. It means if you're manipulating the collection on multiple threads that it may not end up how you expect. Missing elements, elements that should have been removed, bad count values, etc. Running the sample program below will demonstrate.

class Program
{
    static void Main(string[] args)
    {
        ListThreadSafety();

        Console.WriteLine("Press any key to exit...");
        Console.ReadKey();
    }

    static void ListThreadSafety()
    {
        int count = 1000000;

        var list = new List<int>(count);
    
        Parallel.For(0, count, (i) => list.Add(i));

        Console.WriteLine($"The total count should be {count}.  The list count is {list.Count}.");
    }
}

The Add is not an atomic operation - an operation which can be considered to be a single operation. Underneath the hood, it's manipulating other fields and properties in a series of steps to add the element to the collection. This consequently means it is prone to race conditions.

Race conditions are non-deterministic outcomes depending on thread execution order. Similarly to coding errors having both compiler errors and logical errors, race conditions can come from logical errors too. For example, if you create two threads that are dependent on finishing in creation order even though everything inside those two threads are properly synchronized.

Regardless of how awesome async/await are, they won't do anything to prevent race conditions. It means you have to pay particular attention to what you're threading and how you're threading it.

What gives? Why not just make everything thread safe? Well thread synchronization is expensive so you don't want to use it unless you need it which brings us to efficiency...

Taking something that's not threaded using async and updating it to use async won't always make it substantially better. It depends a lot on what kind of work the threads are doing and the environment it's running in.

Async has the most benefit when your computer is the one doing the heavy lifting. Heavy computations, file IO, etc. Reason being during high CPU usage the more that you can limit wasting thread slices, the more those heavy tasks can get done. If the CPU is idle waiting for external calls to come back, then excessive thread slices aren't really going to have a big performance impact.

The lesson here is for tasks you already have threaded if you want to switch to async/await make sure there's justifiable benefit before spending the development time and risking bugs to update your code. Also remember that for executing simple tasks, it is usually more efficient to just complete the tasks single threaded than taking on the overhead of threading.

Concurrent Collections

The System.Collections.Concurrent namespace was added in .NET 4.0 and (as you probably can guess), they are thread-safe collections optimized for threaded operations. There are five concurrent collections:

What's not so obvious is these collections are optimized for the producer-consumer pattern. The very important distinction in producer-consumer versus general threading is producer-consumer is meant to support adding to and taking from the same collection at the same time.

I personally use the ConcurrentBag{T} and ConcurrentDictionary{TKey,TValue} in general threading just so I don't have to maintain a sync root object, but for threading scenarios with critical performance I wouldn't recommend it by default. In that case, you should look into optimizing the code for the specific implementation.

To justify using the producer-consumer pattern, both production and consumption need to be an expensive task. Expensive in this case simply meaning worth the additional cost and overhead of creating threads. External calls, database operations, file IO, etc.

To implement the producer-consumer pattern properly, threads need to be created for simultaneous production and consumption such as below:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    class Program
    {
        static void Main(string[] args)
        {
            ProducerConsumerExample();

            Console.WriteLine("Press any key to exit...");
            Console.ReadKey();
        }

        static void ProducerConsumerExample()
        {
            var count = 100;

            var productionTasks = new ConcurrentBag<Task>();
            var consumptionTasks = new ConcurrentBag<Task>();

            // Creating blocking collection with bounding to free up threads for consumption
            var blockingCollection = new BlockingCollection<int>(10);

            var consumerMain = new TaskFactory().StartNew(() =>
            {
                while (true)
                {
                    try
                    {
                        // For indeterminable operations use cancellation token
                        var consumed = blockingCollection.Take();

                        consumptionTasks.Add(new TaskFactory().StartNew(() =>
                        {
                            // Expensive operation, i.e. calling an API based on data
                            Thread.Sleep(100);
                            Console.WriteLine($"Consumed {consumed} from blocking collection.");
                        }, TaskCreationOptions.LongRunning));
                    }
                    catch (InvalidOperationException)
                    {
                        // Done
                        Console.WriteLine($"Completed consumption task generation.");
                        break;
                    }
                };
            }, TaskCreationOptions.LongRunning);

            Parallel.For(0, count, (i) =>
            {
                productionTasks.Add(new TaskFactory().StartNew(() =>
                {
                    // Expensive operation, i.e. database retrieval
                    Thread.Sleep(100);                  
                    blockingCollection.Add(i);
                    Console.WriteLine($"Added {i} to blocking collection.");
                }, TaskCreationOptions.LongRunning));
            });

            // If tasks are determinable like this example...
            Task.WaitAll(productionTasks.ToArray());
            blockingCollection.CompleteAdding();

            // Wait on the main consumer to finish adding tasks, then wait on tasks...
            consumerMain.Wait();
            Task.WaitAll(consumptionTasks.ToArray());
        }
    }        
}

In this particular example, the consumer is being ended in a simple way - via exception. The Take() is a blocking call and will throw an InvalidOperationException if called (or in progress) when the BlockingCollection{T} has been completed. There are more elegant ways to terminate processing via a CancellationToken or many other threading mechanisms if needed.

WPF Patterns

A primary goal of WPF implementation is to keep all heavy work off the Dispatcher thread. While that can seem simple, the complexity comes in when trying to then coordinate the results of the work back with the Dispatcher.

Below are basic examples how this can be accomplished:

Example 1 - async/await

public DelegateCommand CheckAsyncCommand { get; set; }

CheckAsyncCommand = new DelegateCommand(async () => await CheckAsync());

public async Task CheckAsync()
{
    await new TaskFactory().StartNew(() =>
    {
        if (Thread.CurrentThread.ManagedThreadId == 
                 Application.Current.Dispatcher.Thread.ManagedThreadId)
        {
            throw new InvalidOperationException();
        }

        Thread.Sleep(2000);
    }, TaskCreationOptions.LongRunning);

    IncrementCount();
}

Example 2 - ContinueWith

public DelegateCommand CheckContinuationCommand { get; set; }

CheckContinuationCommand = new DelegateCommand(() => CheckContinuation());

public void CheckContinuation()
{
    new TaskFactory().StartNew(() =>
    {
        if (Thread.CurrentThread.ManagedThreadId == 
                Application.Current.Dispatcher.Thread.ManagedThreadId)
        {
            throw new InvalidOperationException();
        }

        Thread.Sleep(2000);
    }, TaskCreationOptions.LongRunning).ContinueWith((task) => 
    IncrementCount(), TaskScheduler.FromCurrentSynchronizationContext());

Example 3 - Dispatcher.Invoke

public DelegateCommand CheckInvokeCommand { get; set; }

CheckInvokeCommand = new DelegateCommand(() => CheckInvoke());

public void CheckInvoke()
{
    new TaskFactory().StartNew(() =>
    {
        if (Thread.CurrentThread.ManagedThreadId == 
              Application.Current.Dispatcher.Thread.ManagedThreadId)
        {
            throw new InvalidOperationException();
        }

        Thread.Sleep(2000);

        Application.Current.Dispatcher.Invoke(() => IncrementCount());
    }, TaskCreationOptions.LongRunning);
}

Example 4 - Observable

private Subject<object> _subject = new Subject<object>();

_subject.ObserveOnDispatcher().Subscribe(_ => IncrementCount());
CheckObservableCommand = new DelegateCommand(() => CheckObservable());

public void CheckObservable()
{
    Observable.Start(() =>
    {
        if (Thread.CurrentThread.ManagedThreadId == 
           Application.Current.Dispatcher.Thread.ManagedThreadId)
        {
            throw new InvalidOperationException();
        }

        Thread.Sleep(2000);

        _subject.OnNext("Finished.");
    }, NewThreadScheduler.Default);
}

The first thing to understand with these examples is the DelegateCommand. It is the Prism implementation of the ICommand interface. The commands are then bound to a Button in the application. Henceforth, the DelegateCommand is being executed on the Dispatcher.

If you don't pass the work in the command off to a new thread, the command will be executed solely on the Dispatcher. If you're concerned with having a responsive UI, this is a bad thing...

In the demo project, there is a ProgressBar with IsIndeterminate set. Any work on the Dispatcher will subsequently cause the marquee to pause. The goal is to make it easily and clearly identifiable when the Dispatcher is being used in the demo.

You can verify command execution on the Dispatcher will freeze the marquee with the "Run Synchronous" Button. The other commands using the examples above will do its work on a new thread so it should not significantly freeze the marquee.

NOTE: The marquee may still blip when running the threaded commands because the work to create the new Task is on the Dispatcher.

NOTE: Most mechanisms to create managed threads don't guarantee they'll actually be on a new thread. Use the respective creation options to help ensure a new thread if you want to make sure it's on a new thread.

The async/await implementation should be the default standard because it's relatively simple to implement and successfully frees up the Dispatcher. The con to this approach is its simplicity as well. There's really not a whole lot of room to naturally hook up complex execution or notifications. Granted you could build them in, but that's the point to some of the other examples.

The benefit to the ContinueWith approach is you can piece together the Task in a strategy pattern fashion. There is direct control/influence over any individual Task that could be in the chain. The con with this approach is added complexity forcing everything into a Task.

Dispatcher.Invoke is simplistic, handles most cases, and can be called from any thread. In contrast when calling Increment() without explicitly making sure we're on the Dispatcher, the assumption is the call is always from the Dispatcher. With command binding that is default expectation, although there may be reason to make the call from other background processes which this will handle. The con is that the Invoke is a more expensive call.

The Observable is one of my favorites because there is a ton of feature rich support around them; however, it does require more plumbing and increases code complexity. I favor this approach to using the ContinueWith when there's complex execution required; however, that does introduce another technology that not everyone may be familiar with.

Reactive Extensions

Reactive Extensions (Rx) is a library supporting asynchronous implementations of the observer pattern. There are a ton of useful features and it's built as a monad so you can chain functionality together just like IEnumerable and IQueryable.

There are two basic actors in the observer pattern: subject and observer. The subject is the source of the data and the observer is anyone watching that data.

It's important not to confuse the observer pattern with the publish-suscribe pattern. The latter is more of an architectural pattern that can be built utilizing the observer pattern. Rx makes this a little confusing because it uses some mixed terminology (i.e., you call Subscribe to watch an IObservable{T}).

Anything you can observe in Rx implements the IObservable{T} interface. The subject is supported by the, you guessed it, ISubject{T} interface. The out-of-the-box implementations of ISubject{T} are also IObservable{T}. So you're pushing and listening to the same source object.

Rx is leveraging the TPL so you will see some similar functionality right away like the fact that IObservable{T} is awaitable. Also like the Task has a TaskScheduler, Rx has the IScheduler allowing you to easily influence how the thread is managed. If you caught it in the WPF Observable example, there's even a handy-dandy extension IObservable{T}.ObserveOnDispatcher() to make joining to the Dispatcher even easier.

One of the patterns I really like to implement with IObservable is to abstract the threading details such as:

private Subject<object> _mySubject = new Subject<object>();

public IObservable<object> MySubject
{
    get
    {
        return _mySubject.ObserveOn(NewThreadScheduler.Default);
    }
}

If you're creating a Task that should be started on a new thread, you need start the Task yourself if you want to ensure that happens. The problem is sometimes it's optimal to have the consumer start the Task, like when you want to attach a continuation Task. While it certainly can be done with a Task, this is just a much simpler approach.

Here are some other really useful features...

  • Observable.Merge: combines several IObservable{T} sources into a single IObservable{T}.
  • Observable.Timer: fires a single event at the specified duration.
  • Observable.Interval: fires an event at the specified interval.
  • IObservable{T}.Where: predicate to determine if the IObservable should be fired.
  • IObservable{T}.Throttle: fires the IObservable only after it has stopped changing for the specified duration.
  • IObservable{T}.Take(int count): fires the IObservable for only the specified number of times.
  • IObservable{T}.TakeUntil(IObservable{T} source): fires the IObservable until the specified source fires.
  • IObservable{T}.StartWith(T initialValue): causes the IObservable to fire on subscription with the provided initial value.

With all of these ways to build an IObservable{T}, you can Subscribe to them in a multithreaded fashion with out-of-the-box functionality. It's why I wanted to include talking about Rx in the modern approaches to handling threading in .NET.

I hope that observing data asynchronously with features like this starts to trigger all sorts of potential uses. Unfortunately, I'm not going to turn the article into an article on Reactive Extensions so I won't cover these features in depth. The point here is the Rx library is another great toolbox for asynchronous and threaded programming.

Tips and Tricks

Every thread created should have its logic surrounded in a try {} catch {}. I'm not saying you should swallow the Exception thrown, but inside a thread an unhandled exception is likely to either crash your entire application or kill the thread silently - both bad. You want to make sure to handle any Exception inside a thread in a predictable manner.

NOTE: Exceptions in a PLINQ loop will throw an AggregrateException.

Any time you create a Task, there's no guarentee it will actually create a new thread. If you want to try to nudge new thread creation, make sure to use TaskCreationOptions.LongRunning.

How a Task in PLINQ is stopped/cancelled that is running isn't very intuitive how it usually works. It is common for any Task already generated to actually run to completion. If you need to account for this behavior, please do adequate testing/investigation.

The only way to create a foreground thread is to use a Thread.

The Mutex/Semaphore are a machine level objects. If you want to coordinate access to resources across multiple applications on the same machine this is a way to achieve that.

The BackgroundWorker is inferior to the Task. Unless you're stuck targeting .NET 2.0 or .NET 3.5 don't use it.

Use IObservable{T}.Select to select a common Type (possibly a state) so that you can use Observable.Merge with different source Types.

By default, use a CompositeDisposable to store subscriptions from any observable and Dispose of it accordingly to help ensure your subscriptions aren't the source of a memory leak.

A lock is owned by thread so don't use the same object for a lock in more than one place as it can lead to unintentional pass through.

Points of Interest

The tips and tricks are a few things that come to mind for basic capabilities. I'll try to add more later as I think of them. If you need a tip/trick for a problem, feel free to leave feedback and I'll let you know if I have anything.

History

  • 2016-09-30: Updated code files - updated producer-consumer snippet x2
  • 2016-09-28: Updated producer-consumer example snippet - faulty check to test completion fixed
  • 2016-09-27: Edited article - changed background to foreground
  • 2016-09-26: Initial version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

PureNsanity
Architect
United States United States
More than fourteen years of experience programming with six years of WPF application development. Dedicated business application developer with large focus on performance tuning for multithreaded applications. Experience developing for small scale applications and large scale distributed systems at senior and architect levels. Experience developing for a wide range of industry applications to include artificial lift, baggage tracking, distribution, customer relation management, and commodities.

You may also be interested in...

Comments and Discussions

 
QuestionAsync + multiple threads Pin
tugrulGtx9-Apr-17 10:26
membertugrulGtx9-Apr-17 10:26 
AnswerRe: Async + multiple threads Pin
PureNsanity10-Apr-17 12:40
professionalPureNsanity10-Apr-17 12:40 
GeneralRe: Async + multiple threads Pin
tugrulGtx10-Apr-17 13:29
membertugrulGtx10-Apr-17 13:29 
GeneralRe: Async + multiple threads Pin
PureNsanity10-Apr-17 14:44
professionalPureNsanity10-Apr-17 14:44 
QuestionMy vote is 5 Pin
Avi Farah31-Oct-16 21:53
memberAvi Farah31-Oct-16 21:53 
General[My vote of 2] My vote of 2 Pin
Erick Mattew30-Sep-16 5:16
memberErick Mattew30-Sep-16 5:16 
GeneralRe: [My vote of 2] My vote of 2 Pin
PureNsanity30-Sep-16 6:15
professionalPureNsanity30-Sep-16 6:15 
QuestionWhen to use which multithreading technique . . . Pin
Member 1211074629-Sep-16 8:20
memberMember 1211074629-Sep-16 8:20 
AnswerRe: When to use which multithreading technique . . . Pin
PureNsanity29-Sep-16 8:49
professionalPureNsanity29-Sep-16 8:49 
GeneralMy vote of 5 Pin
Franc Morales28-Sep-16 18:23
memberFranc Morales28-Sep-16 18:23 
QuestionArticle-Interpretation Pin
MARC5628-Sep-16 1:37
memberMARC5628-Sep-16 1:37 
AnswerRe: Article-Interpretation Pin
PureNsanity28-Sep-16 4:05
professionalPureNsanity28-Sep-16 4:05 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

Permalink | Advertise | Privacy | Terms of Use | Mobile
Web02 | 2.8.171018.2 | Last Updated 30 Sep 2016
Article Copyright 2016 by PureNsanity
Everything else Copyright © CodeProject, 1999-2017
Layout: fixed | fluid