Click here to Skip to main content
Licence CPOL
First Posted 8 Jun 2011
Views 11,146
Downloads 550
Bookmarked 35 times

Concurrent Observable Collection, Dictionary, and Sorted Dictionary

By | 12 Sep 2011 | Article
Provides implementations of concurrent observable collection classes for binding to WPF controls so that the collections can be updated from a thread that isn't the WPF GUI thread
Test Application

Introduction

This article provides a description of how I implemented a Dictionary type class that can be bound to a WPF list control, but can be updated from a thread that is not the WPF user interface thread. This is useful for when you have worker threads updating a Dictionary and you want to have the contents of the Dictionary bound to a control. Microsoft does provide some ObservableCollection classes in .NET 4.0, but unfortunately ObservableDictionary isn't one of them, also the ObservableCollection classes they do provide aren't multi-threading friendly. While the classes I provide in this article aren't as efficient as the collections found in the new .NET 4.0 System.Collections.Concurrent namespace, they do allow worker threads to update an ordered collection bound to a view. There are some things in my implementation that I feel could be done better, so I welcome your comments, both positive and negative, at the bottom of this article.

Background

I wanted a Threadsafe Observable Sorted Dictionary, but before I got there I had to go through these steps:

  • Threadsafe Observable Collection
  • Observable Dictionary
  • Threadsafe Observable Dictionary
  • Observable Sorted Dictionary
  • Threadsafe Observable Sorted Dictionary

The Threadsafe Observable Collection

I started with the .NET framework ObservableCollection and set about making it threadsafe, keeping in mind the following things:

  • Prevent the collection being modified while a value is being read out
  • Prevent the collection being modified for the duration of external event handling
  • The collection is ordered, not a queue, so items need to be inserted
  • Need to be able to hand off an enumerable that, due to the collection's ordered nature, won't be affected by subsequent changes to the collection

What I came up with was a base class that uses a .NET ReaderWriterLock to control read and write access to an encapsulated ObservableCollection object, and iterates through all the event handlers and uses the dispatcher of any that are of DispatcherObject type to forward the collection changed events. To create an enumerable that won't be affected by subsequent changes, I just created a snapshot of the collection, so not the most elegant solution, but sufficient for my needs.

Here's the procedure for when the collection is written to:

  1. Acquire read lock
  2. Upgrade to write lock
  3. Set flag indicating a new enumerable snapshot is required
  4. Update encapsulated ObservableCollection
  5. Handle event from encapsulated collection:
    1. Downgrade lock to read lock
    2. Iterate through event handlers
    3. If event handler is a DispatcherObject invoke the handler using the Dispatcher, else call the handler in the current thread.
  6. If lock is still a write lock, downgrade to a read lock
  7. Release read lock

Here's the procedure for when the collection is read from:

  1. Acquire read lock
  2. Return value
  3. Release read lock

Here's the procedure for when the enumerator is requested:

  1. Acquire read lock
  2. Check collection snapshot flag to see if a new snapshot is required
  3. If new collection snapshot is required:
    1. Acquire write lock on the collection snapshot
    2. Create a new collection snapshot
    3. Clear the collection snapshot update required flag
    4. Release write lock on the collection snapshot
  4. Return the enumerator from the snapshot

Looks fairly straight forward, but there is a problem. Hidden in these procedures is a race condition where a WPF ListView type control requests an enumerator in the middle of a write and has the following occur:

  1. ListView waits for write lock to be released
  2. Collection write procedure releases write lock then waits to invoke update on the ListView thread
  3. ListView gets enumerator for the already updated collection and displays the new list
  4. Collection write procedure fires item added event
  5. ListView gets item added event, and adds an already displayed item to the view, hence incorrectly creating a duplicate in the view

This can occur during view initialization because the enumerator isn't requested until the ListView is first made visible, at which point the list might be in the middle of being updated. To get around this, you need a separate snapshot for each handler, but not all handlers will have a Dispatcher thread that you can map to a snapshot. I played around with some Thread Level Storage type approaches, but surrendered to mapping Dispatcher threads to versions of snapshots. So the procedure for when the collection is written to now has extra steps, and looks like this:

  1. Acquire read lock
  2. Upgrade to write lock
  3. Copy the current collection snapshot handle
  4. Set flag indicating a new enumerable snapshot is required
  5. Update encapsulated ObservableCollection
  6. Handle event from encapsulated ObservableCollection:
    1. Build list of Dispatcher threads that haven't been updated yet
    2. Downgrade lock to read lock
    3. Iterate through event handlers
    4. If event handler is a DispatcherObject invoke the handler using the Dispatcher, else call the handler in the current thread.
    5. Remove dispatcher thread from list of threads that haven't been updated yet
  7. If lock is still a write lock, downgrade to a read lock
  8. Release read lock

On the read side, if the reading thread is in the list of threads that haven't been updated then it is handed the old snapshot.

The solution isn't perfect, and can rarely be tripped up when the view is initializing or responding to a reset and misses 2 updates of the snapshot. This solution does cater to dispatchers on multiple threads though. If you only have 1 dispatcher thread then you could just do the collection update on the dispatcher thread, which would be easy enough to implement starting off with the code I've provided.

Write Procedure Code

/// <summary>
/// Calls the read function passed in, and if it returns true,
/// then calls the next read function, else calls the write
/// function.
/// </summary>
protected TResult DoBaseReadWrite<TResult>(Func<bool> readFuncTest, 
	Func<TResult> readFunc, Func<TResult> writeFunc) {
  readWriteLock.AcquireReaderLock(Timeout.Infinite);
  try {
    if(readFuncTest()) {
      return readFunc();
    } else {
      lockCookie = readWriteLock.UpgradeToWriterLock(Timeout.Infinite);
      try {
        this.previousBaseSnapshot = this.BaseSnapshot;
        newSnapshotRequired = true;
        TResult returnValue = writeFunc();
        return returnValue;
      } finally {
        if(readWriteLock.IsWriterLockHeld) {
          readWriteLock.DowngradeFromWriterLock(ref lockCookie);
          lockCookie = default(LockCookie);
        }
      }
    }
  } finally {
    readWriteLock.ReleaseReaderLock();
  }
}
/// <summary>
/// Handles write access to the base collection when a return value is required
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="writeFunc"></param>
/// <returns></returns>
protected TResult DoBaseWrite<TResult>(Func<TResult> writeFunc) {
  readWriteLock.AcquireReaderLock(Timeout.Infinite);
  try {
    lockCookie = readWriteLock.UpgradeToWriterLock(Timeout.Infinite);
    this.previousBaseSnapshot = this.BaseSnapshot;
    newSnapshotRequired = true;
    return writeFunc();
  } finally {
    if(readWriteLock.IsWriterLockHeld) {
      readWriteLock.DowngradeFromWriterLock(ref lockCookie);
      lockCookie = default(LockCookie);
    }
    readWriteLock.ReleaseReaderLock();
  }
}
/// <summary>
/// Handles passing on the CollectionChanged event when the base collection
/// fires it.
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
protected void OnCollectionChanged(NotifyCollectionChangedEventArgs e) {

  var otherHandlers = new List<NotifyCollectionChangedEventHandler>();
  var eventHandler = this.CollectionChanged;
  if(eventHandler != null) {

    // Iterate through all the handlers, and group by dispatcher thread

    foreach(NotifyCollectionChangedEventHandler handler 
		in eventHandler.GetInvocationList()) {
      DispatcherObject dispatcherObject = handler.Target as DispatcherObject;
      if(dispatcherObject != null && dispatcherObject.CheckAccess() == false) {
            
        // Get the dispatcher thread id, and add it to the list of threads
        // that need to be handed an enumerable for the previous snapshot.
        // The threads are removed from the list when they're used to invoke
        // the collection changed event.
            
        int threadId = dispatcherObject.Dispatcher.Thread.ManagedThreadId;
        List<NotifyCollectionChangedEventHandler> handlers;
        if(requiresPreviousEnumerator.TryGetValue(threadId, out handlers)) {
          handlers.Add(handler);
        } else {
          handlers = new List<NotifyCollectionChangedEventHandler>();
          handlers.Add(handler);
          requiresPreviousEnumerator.TryAdd(threadId, handlers);
        }
      } else {
        otherHandlers.Add(handler);
      }
    }
  }

  readWriteLock.DowngradeFromWriterLock(ref lockCookie);
  lockCookie = default(LockCookie);

  // We have now transitioned to a read lock so another modification, which 
  // requires a write lock, won't occur until we release the read lock, but
  // at the same time anything can read the collection, including enumerators
  // unless the read thread is in the requiresPreviousEnumerator list, in which
  // case it will be handed the previous snapshot.

  if(eventHandler == null)
    return;

  var handlersByThread = new List<KeyValuePair<int, 
	List<NotifyCollectionChangedEventHandler>>>(requiresPreviousEnumerator);
  foreach(var handlers in handlersByThread) {
    var firstHandler = handlers.Value[0];
    DispatcherObject dispatcherObject = firstHandler.Target as DispatcherObject;
    dispatcherObject.Dispatcher.Invoke(DispatcherPriority.DataBind, (Action)(() => {
      List<NotifyCollectionChangedEventHandler> removedHandlers;

      // remove the current thread id deom the requiresPreviousEnumerator list
      requiresPreviousEnumerator.TryRemove(handlers.Key, out removedHandlers);

      // Iterate through all the dispatchers
      foreach(var handler in handlers.Value) {
        try {
          handler(this, e);
        } catch(Exception) { }
      }
    }));
  }

  // Execute non-dispatcher handlers on current thread
  foreach(var handler in otherHandlers) {
    try {
      handler(this, e);
    } catch(Exception) { }
  }
}

Read Procedure Code

/// <summary>
/// Handles read access from the base collection
/// </summary>
protected TResult DoBaseRead<TResult>(Func<TResult> readFunc) {
  readWriteLock.AcquireReaderLock(Timeout.Infinite);
  try {
    return readFunc();
  } finally {
    readWriteLock.ReleaseReaderLock();
  }
}

Get Enumerator Code

/// <summary>
/// Gets the enumerator for a snapshot of the collection
/// </summary>
public IEnumerator<T> GetEnumerator() {
  return this.BaseSnapshot.GetEnumerator();
}

/// <summary>
/// Gets an immutable snapshot of the collection
/// </summary>
public ImmutableCollectionBase<T> BaseSnapshot {
  get {
    return this.DoBaseRead(() => {
      if(this.requiresPreviousEnumerator.ContainsKey
	(Thread.CurrentThread.ManagedThreadId)) {
        return this.previousBaseSnapshot;
      } else {
        UpdateSnapshot();
        return this.baseSnapshot;
      }
    });
  }
}

/// <summary>
/// Updates the snapshot that is used to generate an Enumerator
/// </summary>
private void UpdateSnapshot() {
  if (this.newSnapshotRequired) {
    snapshotLock.AcquireWriterLock(Timeout.Infinite);
    if (this.newSnapshotRequired) {
      this.baseSnapshot = new ImmutableCollection<T>(this.baseCollection);
      this.newSnapshotRequired = false;
    }
    snapshotLock.ReleaseWriterLock();
  }
}

The Observable Dictionary

So now I have a method of wrapping access to an observable collection to make it suitable for updating from threads that are not the GUI thread. So all I need to do is wrap the ObservableDictionary class from .NET, however there isn't one at the current time, so I had to write my own.

The ObservableDictionary class I wrote encapsulates 2 collections:

  • An ObservableCollection of Key-Value pairs, that is used as the basis of the observable part of the ObservableDictionary
  • A Dictionary that maps the Key to the Index in the base ObservableCollection

This however does not allow me to neatly insert items into the ObservableCollection without updating a whole bunch of Dictionary entries, so what I did was store link list nodes in the Dictionary instead of indices where the order of the linking is the same as the order ObservableCollection. The link list nodes have a recursive decrement forward, and increment forward methods for shifting the index they point to when removing or inserting nodes. The code below shows how they work:

/// <summary>
/// This function effectively removes this node from the linked list,
/// and decrements the position index of all the nodes that follow it.
/// It removes the node by changing the nodes that come before and
/// after it to point to each other, thus bypassing this node.
/// </summary>
public void Remove() {
  if (this.Previous != null) {
    this.Previous.Next = this.Next;
  }
  if (this.Next != null) {
    this.Next.Previous = this.Previous;
  }
  DecrementForward();
}

/// <summary>
/// This recursive function decrements the position index of all the nodes
/// in front of this node. Used for when a node is removed from a list.
/// </summary>
private void DecrementForward() {
  if (this.Next != null) {
    this.Next.Index--;
    this.Next.DecrementForward();
  }
}

/// <summary>
/// This recursive function decrements the position index of all the nodes
/// in front of this node. Used for when a node is inserted into a list.
/// </summary>
private void IncrementForward() {
  if (this.Next != null) {
    this.Next.Index++;
    this.Next.IncrementForward();
  }
}

Once I had the ObservableDictionary class, the threadsafe version was just a matter of wrapping the ObservableDictionary with a whole lot of accessors, except for the Keys and Values properties. The Keys and Values are read out of a snapshot of the Dictionary, which is updated if necessary when the Keys or Values properties are read.

The Observable Sorted Dictionary

At this point, the sorted version of the ObservableDictionary was just simply a matter of specifying the index to insert a new item at when an item was added. For this, I used a binary sort algorithm:

/// <summary>
/// Gets the position for a key to be inserted such that the sort order is maintained.
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public int GetInsertIndex(int count, TKey key, Func<int, TKey> indexToKey) {
  return BinarySearchForIndex(0, count - 1, key, indexToKey);
}

/// <summary>
/// Searches for the index of the insertion point for the key passed in such that
/// the sort order is maintained. Implemented as a non-recursive method.
/// </summary>
/// <param name="low"></param>
/// <param name="high"></param>
/// <param name="key"></param>
/// <returns></returns>
private int BinarySearchForIndex
(int low, int high, TKey key, Func<int, TKey> indexToKey) {
  while (high >= low) {

    // Calculate the mid point and determine if the key passed in
    // should be inserted at this point, below it, or above it.
    int mid = low + ((high - low) >> 1);
    int result = this.Compare(indexToKey(mid), key);

    // Return the current position, or change the search bounds
    // to be above or below the mid point depending on the result.
    if (result == 0)
      return mid;
    else if (result < 0)
      low = mid + 1;
    else
      high = mid - 1;
  }
  return low;
}

As with the ObservableDictionary, it was just a matter of wrapping the ObservableSortedDictionary with accessors from the threadsafe ConcurrentObservableBase class I wrote.

Using the Code

In the Swordfish.NET.General project in the attached source code, I have provided the following collection classes under the namespace Swordfish.NET.Collections:

  • ObservableDictionary
  • ObservableSortedDictionary
  • ConcurrentObservableCollection
  • ConcurrentObservableDictionary
  • ConcurrentObservableSortedDictionary

The attached source code also contains a WPF application that you can use for some light interactive testing of the above collection classes.

Points of Interest

With .NET 4.0, Microsoft has implemented some unordered concurrent collections, including a ConcurrentDictionary.

John Simmons raised the need for an ObservableDictionary on The Code Project back in May 2010 and linked to an implementation.

History

  • First posted on 8th June 2011
  • Fixed some issues found by Olly Hayes on 12th September 2011

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

About the Author

John Stewien

Founder
Cheesy Design
Taiwan Taiwan

Member

John graduated from the University of South Australia in 1997 with a Bachelor of Electronic Engineering Degree, and since then he has worked on hardware and software in many fields including Aerospace, Defence, and Medical giving him over 10 of years experience in C++ and C# programming. In 2009 John Started his own contracting company doing business between Taiwan and Australia.

Sign Up to vote   Poor Excellent
Add a reason or comment to your vote: x
Votes of 3 or less require a comment

Comments and Discussions

 
You must Sign In to use this message board. (secure sign-in)
 
Search this forum  
 FAQ
    Noise  Layout  Per page   
  Refresh
Questionasp.net PinmemberMooseDePaques12:28 2 Dec '11  
AnswerRe: asp.net PinmemberJohn Stewien13:39 2 Dec '11  
QuestionSorted dictionary data in a WPF grid Pinmemberboulolou16:15 22 Sep '11  
AnswerRe: Sorted dictionary data in a WPF grid PinmemberJohn Stewien14:58 25 Sep '11  
GeneralRe: Sorted dictionary data in a WPF grid Pinmemberboulolou17:15 25 Sep '11  
GeneralRe: Sorted dictionary data in a WPF grid PinmemberJohn Stewien0:40 26 Sep '11  
QuestionDictionary.Keys and .Values not updating? PinmemberOllyHayes23:01 7 Sep '11  
AnswerRe: Dictionary.Keys and .Values not updating? PinmemberJohn Stewien15:58 8 Sep '11  
GeneralRe: Dictionary.Keys and .Values not updating? PinmemberOllyHayes23:22 8 Sep '11  
GeneralRe: Dictionary.Keys and .Values not updating? PinmemberJohn Stewien1:25 9 Sep '11  
GeneralRe: Dictionary.Keys and .Values not updating? PinmemberJohn Stewien3:40 12 Sep '11  
QuestionThreading problem? PinmemberOllyHayes6:11 6 Sep '11  
AnswerRe: Threading problem? PinmemberJohn Stewien16:31 6 Sep '11  
AnswerRe: Threading problem? PinmemberJohn Stewien21:29 6 Sep '11  
GeneralRe: Threading problem? PinmemberOllyHayes21:57 7 Sep '11  
GeneralRe: Threading problem? PinmemberJohn Stewien15:44 8 Sep '11  
GeneralRe: Threading problem? PinmemberJohn Stewien21:50 8 Sep '11  
GeneralRe: Threading problem? PinmemberJohn Stewien15:32 11 Sep '11  

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

Permalink | Advertise | Privacy | Mobile
Web04 | 2.5.120517.1 | Last Updated 12 Sep 2011
Article Copyright 2011 by John Stewien
Everything else Copyright © CodeProject, 1999-2012
Terms of Use
Layout: fixed | fluid