Introduction
This is the third of my series of articles. This time, I am going to use my locking
classes to create a thread safe, type safe generic collection. I am also going to
use this opportunity to touch on a few concepts, like extension functions.
Background
A quick word, this article uses the ReaderWriterLock wrappers and
ConvergeWait from my previous 2 articles, and references my DB
class from an upcoming article.
Sometimes, when coding, you may need to fill a collection as quickly as possible;
a good example of this is bringing back large amounts of data from a database. By
breaking the retrieve into several chunks, and executing each on a separate thread,
you can drastically speed up the retrieve; however, you either have to do the work
on each thread, or put each record into a collection in some thread safe way. Working
the data in individual threads might work for some situations, but is unacceptable
for situations where the data must be displayed to a user. So the only choice is
to speed up the retrieve.
So, let's look at some example code that might return the data in a single collection.
Note: this code makes the assumption that we are getting data out of a table with
a 32bit auto-incrementing ID column, with no holes.
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private readonly ConvergeWait _convergeWait = new ConvergeWait(1);
private void Form1_Load(object sender, EventArgs e)
{
Int32 recordCount = DB.ExecuteScalar("select count(*) from table1");
Int32 maxThreads = recordCount / 10;
if (recordCount < 10)
{
maxThreads = 1;
}
else
{
if ((maxThreads % 10) > 0)
{
maxThreads += 1;
}
}
_convergeWait.Reset(maxThreads);
List<Class2> list = new List<Class2>();
for (int i = 0; i < maxThreads; i++)
{
Class2 class2 = new Class2(i, _convergeWait, list);
ThreadPool.QueueUserWorkItem(class2.ThreadFunc, null);
}
_convergeWait.WaitOne();
}
}
internal class Class2
{
private readonly Int32 _baseRecordNumberToGet = 0;
private readonly ConvergeWait _wait = null;
private readonly List<Class2> _list = null;
public Class2(int i, ConvergeWait wait, List<Class2> list)
{
_baseRecordNumberToGet = i;
_wait = wait;
_list = list;
}
public void ThreadFunc(object state)
{
try
{
Int32 lowerLimit = _baseRecordNumberToGet*10;
Int32 upperLimit = lowerLimit + 9;
string sql = string.Format("select * from table1 where id >= {0} and id < {1}",
lowerLimit,
upperLimit);
using (DbDataReader dr = DB.ExecuteReader(sql))
{
while (dr.Read())
{
object[] oo = new object[dr.FieldCount];
dr.GetValues(oo);
_list.Add(oo);
}
}
}
finally
{
_wait.Set();
}
}
}
The problem with this code is that it will likely throw on more than half of the
Add
s.
The obvious solution that most would come up with would be to simply wrap the
Add
function in a lock
statement.
Of course, there are a number of problems with doing that:
- Locking at the wrong place can really slow down the code unnecessarily.
- Locking the
Add
externally leaves other places open to the same problem.
- Locking the
Add
only leaves other writing functions out, so it only
solves a small portion of our underlying problem.
- The
lock
structure prevents all access to
the protected block by all but a single thread.
So, to solve this problem, instead of using a lock
, I want
to use a ReaderWriterLock
, which allows multithreaded access for reads,
and single threaded for writes.
A quick side note here. ReaderWriterLockSlim
could be used instead
of course, however I have had issues with it (inconsistant performance and intermittent
leaks of memory). Even if these problems are totally imaginary, another reason to
use it is to make this code work under 2.0 (as much as possible). Finally, the complete
code (not shown here) allows a programmer to write a dispatcher class to use any
ReaderWriter type lock they like.
Back to the point, I prefer classes to handle themselves; that way, complexity is
moved out of the implementation code and into the base code. Unfortunately, .NET
doesn't support multiple inheritance, which would be my first choice. Since
I know that this won't be the final solution, I will start with the interface
that I will use later.
public interface ILockingObject
{
ReaderWriterLock Lock { get; }
}
Next the actual implementation of a base class.
public abstract class LockingObject : ILockingObject
{
private readonly ReaderWriterLock _lock = new ReaderWriterLock();
public ReaderWriterLock Lock
{
get
{
return _lock;
}
}
public ReaderLock GetReaderLock()
{
return new ReaderLock(_lock);
}
public ReaderLock GetReaderLock(Int32 millisecondsTimeout)
{
return new ReaderLock(_lock, millisecondsTimeout);
}
public WriterLock GetWriterLock()
{
return new WriterLock(_lock);
}
public WriterLock GetWriterLock(Int32 millisecondsTimeout)
{
return new WriterLock(_lock, millisecondsTimeout);
}
public WriterLock GetWriterLock(
ReaderLock readerLock)
{
return new WriterLock(_lock);
}
public WriterLock GetWriterLock(
ReaderLock readerLock, Int32 milliseconds)
{
return new WriterLock(_lock, milliseconds);
}
}
Because this is only helpful if I am either totally encapsulating a class or I am
able to get to the source class. The next best alternative is implement the
ILockingObject
interface on a class and use the extension methods offered
by .NET 3.5 or better.
Now, I have a way to implement extension methods. I have my extension class, so
I get these functions on every single class that implements the ILockingObject
interface.
public static class LockingObjectExtensions
{
public static ReaderLock GetReaderLock(this ILockingObject lockingObject)
{
return new ReaderLock(lockingObject.Lock);
}
public static ReaderLock GetReaderLock(this ILockingObject lockingObject,
Int32 millisecondsTimeout)
{
return new ReaderLock(lockingObject.Lock, millisecondsTimeout);
}
public static WriterLock GetWriterLock(this ILockingObject lockingObject)
{
return new WriterLock(lockingObject.Lock);
}
public static WriterLock GetWriterLock(this ILockingObject lockingObject,
Int32 millisecondsTimeout)
{
return new WriterLock(lockingObject.Lock, millisecondsTimeout);
}
public static WriterLock GetWriterLock(this ILockingObject lockingObject,
ReaderLock readerLock)
{
if(lockingObject.Lock != readerLock.Lock)
{
throw new ArgumentException(
"You may not upgrade a reader from another lock.");
}
return new WriterLock(lockingObject.Lock);
}
public static WriterLock GetWriterLock(this ILockingObject lockingObject,
ReaderLock readerLock, Int32 milliseconds)
{
if (lockingObject.Lock != readerLock.Lock)
{
throw new ArgumentException(
"You may not upgrade a reader from another lock.");
}
return new WriterLock(lockingObject.Lock, milliseconds);
}
}
So finally, here is the implementation of the LockingList
class. This
implementation is on the LockingObject
base class. If you are using
.NET 3.5 or better and wish to use the extension methods, you can uncomment the
commented code. NOTE: those changes are confined to the lines above #region IList<T>
Member
public class LockingList<T> : LockingObject, IList<T>
{
private readonly List<T> _list = new List<T>();
#region IList<T> Members
public IEnumerator<T> GetEnumerator()
{
using (this.GetReaderLock())
{
return _list.GetEnumerator();
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public void Add(T item)
{
using (var rl = this.GetReaderLock())
{
using (this.GetWriterLock(rl))
{
_list.Add(item);
}
}
}
public void Clear()
{
using (var rl = this.GetReaderLock())
{
using (this.GetWriterLock(rl))
{
_list.Clear();
}
}
}
public bool Contains(T item)
{
using (this.GetReaderLock())
{
return _list.Contains(item);
}
}
public void CopyTo(T[] array, int arrayIndex)
{
using (this.GetReaderLock())
{
_list.CopyTo(array, arrayIndex);
}
}
public bool Remove(T item)
{
using (var rl = this.GetReaderLock())
{
using (this.GetWriterLock(rl))
{
return _list.Remove(item);
}
}
}
public int Count
{
get
{
using (this.GetReaderLock())
{
return _list.Count;
}
}
}
public bool IsReadOnly
{
get
{
return false;
}
}
public int IndexOf(T item)
{
using (this.GetReaderLock())
{
return _list.IndexOf(item);
}
}
public void Insert(int index, T item)
{
using (var rl = this.GetReaderLock())
{
using (this.GetWriterLock(rl))
{
_list.Insert(index, item);
}
}
}
public void RemoveAt(int index)
{
using (var rl = this.GetReaderLock())
{
using (this.GetWriterLock(rl))
{
_list.RemoveAt(index);
}
}
}
public T this[int index]
{
get
{
using (this.GetReaderLock())
{
return _list[index];
}
}
set
{
using (var rl = this.GetReaderLock())
{
using (this.GetWriterLock(rl))
{
_list[index] = value;
}
}
}
}
#endregion
public void AddRange(IEnumerable<T> collection)
{
using (var rl = this.GetReaderLock())
{
using (this.GetWriterLock(rl))
{
_list.AddRange(collection);
}
}
}
public int BinarySearch(T item)
{
using (this.GetReaderLock())
{
return _list.BinarySearch(item);
}
}
public int BinarySearch(T item, IComparer<T> comparer)
{
using (this.GetReaderLock())
{
return _list.BinarySearch(item, comparer);
}
}
public int BinarySearch(int index, int count, T item, IComparer<T> comparer)
{
using (this.GetReaderLock())
{
return _list.BinarySearch(index, count, item, comparer);
}
}
public int Capacity
{
get
{
using (this.GetReaderLock())
{
return _list.Capacity;
}
}
set
{
using (var rl = this.GetReaderLock())
{
using (this.GetWriterLock(rl))
{
_list.Capacity = value;
}
}
}
}
public List<TOutput> ConvertAll<TOutput>(Converter<T, TOutput> converter)
{
using (this.GetReaderLock())
{
return _list.ConvertAll(converter);
}
}
public T Find(Predicate<T> match)
{
using (this.GetReaderLock())
{
return _list.Find(match);
}
}
public List<T> FindAll(Predicate<T> match)
{
using (this.GetReaderLock())
{
return _list.FindAll(match);
}
}
public int FindIndex(int startIndex, int count, Predicate<T> match)
{
using (this.GetReaderLock())
{
return _list.FindIndex(startIndex, count, match);
}
}
public int FindIndex(int startIndex, Predicate<T> match)
{
using (this.GetReaderLock())
{
return _list.FindIndex(startIndex, match);
}
}
public int FindIndex(Predicate<T> match)
{
using (this.GetReaderLock())
{
return _list.FindIndex(match);
}
}
public T FindLast(Predicate<T> match)
{
using (this.GetReaderLock())
{
return _list.FindLast(match);
}
}
public int FindLastIndex(int startIndex, int count, Predicate<T> match)
{
using (this.GetReaderLock())
{
return _list.FindLastIndex(startIndex, count, match);
}
}
public int FindLastIndex(int startIndex, Predicate<T> match)
{
using (this.GetReaderLock())
{
return _list.FindLastIndex(startIndex, match);
}
}
public int FindLastIndex(Predicate<T> match)
{
using (this.GetReaderLock())
{
return _list.FindLastIndex(match);
}
}
public void ForEach(Action<T> action)
{
using (this.GetReaderLock())
{
_list.ForEach(action);
}
}
public List<T> GetRange(int index, int count)
{
using (this.GetReaderLock())
{
return _list.GetRange(index, count);
}
}
public int IndexOf(T item, int index)
{
using (this.GetReaderLock())
{
return _list.IndexOf(item, index);
}
}
public int IndexOf(T item, int index, int count)
{
using (this.GetReaderLock())
{
return _list.IndexOf(item, index, count);
}
}
public void InsertRange(int index, IEnumerable<T> collection)
{
using (var rl = this.GetReaderLock())
{
using (this.GetWriterLock(rl))
{
_list.InsertRange(index, collection);
}
}
}
public int LastIndexOf(T item)
{
using (this.GetReaderLock())
{
return _list.IndexOf(item);
}
}
public int LastIndexOf(T item, int index)
{
using (this.GetReaderLock())
{
return _list.LastIndexOf(item, index);
}
}
public int LastIndexOf(T item, int index, int count)
{
using (this.GetReaderLock())
{
return _list.LastIndexOf(item, index, count);
}
}
public void RemoveAll(Predicate<T> match)
{
using (var rl = this.GetReaderLock())
{
using (this.GetWriterLock(rl))
{
_list.RemoveAll(match);
}
}
}
public void RemoveRange(int index, int count)
{
using (var rl = this.GetReaderLock())
{
using (this.GetWriterLock(rl))
{
_list.RemoveRange(index, count);
}
}
}
public void Reverse()
{
using (var rl = this.GetReaderLock())
{
using (this.GetWriterLock(rl))
{
_list.Reverse();
}
}
}
public void Reverse(int index, int count)
{
using (var rl = this.GetReaderLock())
{
using (this.GetWriterLock(rl))
{
_list.Reverse(index, count);
}
}
}
public void Sort()
{
using (var rl = this.GetReaderLock())
{
using (this.GetWriterLock(rl))
{
_list.Sort();
}
}
}
public void Sort(Comparison<T> comparison)
{
using (var rl = this.GetReaderLock())
{
using (this.GetWriterLock(rl))
{
_list.Sort(comparison);
}
}
}
public void Sort(IComparer<T> comparer)
{
using (var rl = this.GetReaderLock())
{
using (this.GetWriterLock(rl))
{
_list.Sort(comparer);
}
}
}
public void Sort(int index, int count, IComparer<T> comparer)
{
using (var rl = this.GetReaderLock())
{
using (this.GetWriterLock(rl))
{
_list.Sort(index, count, comparer);
}
}
}
public T[] ToArray()
{
using (this.GetReaderLock())
{
return _list.ToArray();
}
}
public void TrimExcess()
{
using (var rl = this.GetReaderLock())
{
using (this.GetWriterLock(rl))
{
_list.TrimExcess();
}
}
}
public bool TrueForAll(Predicate<T> match)
{
using (this.GetReaderLock())
{
return _list.TrueForAll(match);
}
}
}
Using the Code
Using the code is simple. Because I have made sure to wrap all existing functions,
we can simply swap out the class; thus, all that needs to change is the declaration
and the new
.
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private readonly ConvergeWait _convergeWait = new ConvergeWait(1);
private void Form1_Load(object sender, EventArgs e)
{
Int32 recordCount = DB.ExecuteScalar("select count(*) from table1");
Int32 maxThreads = recordCount / 10;
if (recordCount < 10)
{
maxThreads = 1;
}
else
{
if ((maxThreads % 10) > 0)
{
maxThreads += 1;
}
}
_convergeWait.Reset(maxThreads);
LockingList<Class2> list = new LockingList<Class2>();
for (int i = 0; i < maxThreads; i++)
{
Class2 class2 = new Class2(i, _convergeWait, list);
ThreadPool.QueueUserWorkItem(class2.ThreadFunc, null);
}
_convergeWait.WaitOne();
}
}
internal class Class2
{
private readonly Int32 _baseRecordNumberToGet = 0;
private readonly ConvergeWait _wait = null;
private readonly List<Class2> _list = null;
public Class2(int i, ConvergeWait wait, List<Class2> list)
{
_baseRecordNumberToGet = i;
_wait = wait;
_list = list;
}
public void ThreadFunc(object state)
{
try
{
Int32 lowerLimit = _baseRecordNumberToGet*10;
Int32 upperLimit = lowerLimit + 9;
string sql =
string.Format("select * from table1 where id >= {0} and id < {1}",
lowerLimit,
upperLimit);
using (DbDataReader dr = DB.ExecuteReader(sql))
{
while (dr.Read())
{
object[] oo = new object[dr.FieldCount];
dr.GetValues(oo);
_list.Add(oo);
}
}
}
finally
{
_wait.Set();
}
}
}
Because the ReaderWriterLock
gives preference to reads over writes,
you will see that I aquire a "read lock" that is upgraded to "write
lock" rather than simply trying to get a write lock. Since the slimmed
down version of my "ReaderWriterLockWrappers", that is documented in my
previous article, doesn't handle this automatically, I actually show it coded
here. This is exactly what you would need to do to minimize deadlocks without
the wrappers.
Points of Interest
Prior to posting this article,
this one was brought to my attention, which upon initial examination is
unbelievably close to my code. (I guess, when faced with similar problems, people
will come up with similar solutions). While my code was developed independently,
the idea is the same, with a few key differences (other than the obvious difference
in example collections).
My code implements my ILockingObject
interface which exposes the lock
as a public property. This means any outside code can use the same lock, thus enabling
outside blocks to do things like hold the lock externally. Exposing the lock publicly
also means that recursive lock calls are much more likely to happen, which would
be a major problem with the code in that article.
Additionally, my code was originally developed prior to 3.0 being released. First
effect of this was there was no ReaderWriterLockSlim
. Second there
were no extension methods so rather than using extension methods I used new
s.
Like this:
public int Count
{
get
{
using (new ReaderLock(_lock))
{
return _dictionary.Count;
}
}
}
I have receieved some critisism for exposing the lock, and possibilities of "stale"
reads, so here is a code snippet that covers this.
LockingList<string> collection = new LockingList<string>();
for (int i = 0; i < 200; i++)
{
collection.Add(i.ToString());
}
using (var rl = collection.GetReaderLock())
{
using (collection.GetWriterLock(rl))
{
int index = collection.IndexOf("126");
if (index < -1)
{
collection.RemoveAt(index);
}
}
}
I leave it to the reader to implement the LockingDictionary
and other
collections. However there is a gotcha with ReaderWriterLock
and thus
my wrappers to be aware of, so I will implement the additional function AddIfNotExists
.
Because Dictionary
doesn't support duplicate keys, we must check
for its existance before adding the value.
public bool AddIfNotExists(TKey key, TValue value)
{
using(var rl = this.GetReaderLock())
{
if(_dictionary.ContainsKey(key))
{
return false;
}
using(this.GetWriterLock(rl))
{
if (_dictionary.ContainsKey(key))
{
return false;
}
_dictionary.Add(key, value);
return true;
}
}
}
History
- Original article
- Complete rewrite