Task Completion On Event





5.00/5 (4 votes)
A road trip to implement generic TaskCompletionSource for non-generic message event types
Introduction
Recently, I implemented a solution to a specific use case for invoking a continuation (using TaskCompletionSource
) for an event that passes in a specific message derived from a base class. For example, consider several messages that can be generated from some "external" source (a couple real life example: an Ingenico iSC480 and also receiving RabbitMQ messages from a BeagleBone):
Each of these messages is derived from a base class, so the boilerplate concrete implementation looks like this:
public class Message { }
public class Message1 : Message { }
public class Message2 : Message { }
public class Message3 : Message { }
In my use case, these messages are received effectively asynchronously and the specific message will be received only after some user action has occurred. In my particular case, I know exactly what message I should receive.
It should also be noted that I wanted to avoid writing if
-then
-else
or switch
statements based on the message ID or the message packet type. The goals were:
- Be able to await for a specific message
- Have the underlying implementation automatically cancel the awaiting task if a different message is received.
So, in my particular case, I wanted to be able to do something like this:
// Do something that expects Message2 to be returned at some point
Message2 msg = await device.WaitForMessage<Message2>();
// Do something once Message2 has been received.
The complexity in this occurs partly because the message packets are received through a standard event handler:
public event EventHandler<MessageEventArgs> MessageReceived;
So, the overall workflow looks something like this:
Where MessageEventArgs
looks like this (at least for the example in this article):
public class MessageEventArgs : EventArgs
{
public Message Message { get; set; }
}
A very important point to this story is that I am only ever expecting one kind of message to allow the continuation. Anything else should cancel processing the continuation.
A Mock Device
We can implement a mock device like this:
public class MockDevice
{
public event EventHandler<MessageEventArgs> MessageReceived;
public void SimulateMessage(int n)
{
switch (n)
{
case 1:
MessageReceived.Fire(this, new MessageEventArgs() { Message = new Message1() });
break;
case 2:
MessageReceived.Fire(this, new MessageEventArgs() { Message = new Message2() });
break;
case 3:
MessageReceived.Fire(this, new MessageEventArgs() { Message = new Message3() });
break;
}
}
}
In the actual implementation, there is no switch
statement, instead I use class attributes to designate which concrete message packet class gets instantiated for which message, via reflection (as well as populating message-specific properties from the parameters that are acquired separately from the device), but all that is outside the purpose of this article.
A Basic Interface to the Device
The basic interface to the device looks like this:
public class DeviceInterface
{
public MockDevice Device { get; protected set; }
public DeviceInterface()
{
Device = new MockDevice();
Device.MessageReceived += OnMessageReceived;
}
protected void OnMessageReceived(object sender, MessageEventArgs e)
{
// Here we want to set the task completion result.
}
}
Take 1: Completing With a Message Result
TaskCompletionSource
(read about it here) is a useful mechanism for external asynchronous operations. From MSDN:
In many scenarios, it is useful to enable a Task<TResult> to represent an external asynchronous operation. TaskCompletionSource<TResult> is provided for this purpose. It enables the creation of a task that can be handed out to consumers, and those consumers can use the members of the task as they would any other. However, unlike most tasks, the state of a task created by a TaskCompletionSource is controlled explicitly by the methods on TaskCompletionSource. This enables the completion of the external asynchronous operation to be propagated to the underlying Task.
I initially used it like this:
public class DeviceInterface
{
public MockDevice Device { get; protected set; }
protected TaskCompletionSource<Message> tcs;
public DeviceInterface()
{
Device = new MockDevice();
Device.MessageReceived += OnMessageReceived;
}
public Task<Message> WaitForMessage()
{
tcs = new TaskCompletionSource<Message>();
return tcs.Task;
}
protected void OnMessageReceived(object sender, MessageEventArgs e)
{
tcs.SetResult(e.Message);
}
}
Notice the declaration TaskCompletionSource<Message>
. This immediately indicates that we have a problem, because we're specifying the result type as Message
, rather than one of the desired sub-classes of Message
. We'll deal with that later. First, let's just get some basic understanding of how to use TaskCompletionSource
under our belt.
A simple test program illustrates that this works:
class Program
{
static DeviceInterface deviceInterface = new DeviceInterface();
static void Main(string[] args)
{
DoSomethingThatExpectsMessage2();
Thread.Sleep(500);
deviceInterface.Device.SimulateMessage(2);
}
static async void DoSomethingThatExpectsMessage2()
{
Message2 msg = (Message2)await deviceInterface.WaitForMessage();
Console.WriteLine("Message2 received.");
}
}
But it doesn't work well. Consider this:
deviceInterface.Device.SimulateMessage(1); // let's get a different message!
What happened? To answer that (although hopefully it's obvious), we need to put a try
-catch
block around the DoSomething
method:
Quite so. We're expecting message 2, but we got message 1!
Dynamic Duck Disaster
The amusing this is, if we were using a duck-typed language, we wouldn't have to deal with the complexities of generics and getting the right type. We can do this too! By using the dynamic
keyword, we can simply operate on the type with the assumption that it is the type we are wanting, and if it isn't, well, you'll know at runtime, just like in a duck-typed interpreted language.
dynamic dmsg = await deviceInterface.WaitForMessage();
Console.WriteLine("Message Type: " + dmsg.GetType().ToString());
Notice what happens if we receive a "Message1
":
deviceInterface.Device.SimulateMessage(1);
Very cool!
Do not do this.
Take 2: Checking for the Right Type
In this version, I save the expected message type. This has the advantage of allowing the code to cancel the task (and thus prevent the continuation from running) if the message type is something different than expected:
public class DeviceInterface
{
public MockDevice Device { get; protected set; }
protected TaskCompletionSource<Message> tcs;
protected Type messageType;
public DeviceInterface()
{
Device = new MockDevice();
Device.MessageReceived += OnMessageReceived;
}
public Task<Message> WaitForMessage<T>()
{
tcs = new TaskCompletionSource<Message>();
messageType = typeof(T);
return tcs.Task;
}
protected void OnMessageReceived(object sender, MessageEventArgs e)
{
if (e.Message.GetType() == messageType)
{
tcs.SetResult(e.Message);
}
else
{
tcs.SetCanceled();
}
}
}
The DoSomething
method looks like this now:
static async void DoSomethingThatExpectsMessage2()
{
try
{
Message2 msg = (Message2)await deviceInterface.WaitForMessage<Message2>();
Console.WriteLine("Message2 received.");
}
catch (TaskCanceledException)
{
// We don't care
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
Notice that the desired type is passed in as a generic parameter. While better, there's still things I don't like:
- The downcast to the actual message is still required
- We now have an independent variable that is holding the message type.
Take 3: Ideally, But Doesn't Compile...
Take 4: Encapsulating TaskCompletionSource with an Interface
In order to accomplish our goal, we have to abstract out the generic type <T>
such that the event handler OnMessageReceived
doesn't need to know what <T>
is. This requires encapsulating TaskCompletionSource
:
internal class TCS<T> : ITCS where T : Message, new()
{
public Task<T> Task { get { return internalTcs.Task; } }
protected TaskCompletionSource<T> internalTcs;
public TCS()
{
internalTcs = new TaskCompletionSource<T>();
}
public void SetResult(object val)
{
internalTcs.SetResult((T)val);
}
}
The magic really happens in our SetResult
, in that we're passing in an object, but cast it to T
on the call to SetResult
. This allows the awaiter to get exactly the type of message that is being awaited, rather than having to downcast the return value of the await
.
But we still need that interface, which looks like this:
public interface ITCS
{
void SetResult(object val);
}
Now, our device interface can be written using the interface to our wrapper:
public class DeviceInterface
{
public MockDevice Device { get; protected set; }
protected ITCS tcs;
public DeviceInterface()
{
Device = new MockDevice();
Device.MessageReceived += OnMessageReceived;
}
public Task<T> WaitForMessage<T>() where T : Message, new()
{
TCS<T> wrappedTcs = new TCS<T>();
tcs = wrappedTcs;
return wrappedTcs.Task;
}
protected void OnMessageReceived(object sender, MessageEventArgs e)
{
tcs.SetResult(e.Message);
}
}
Now the message handler doesn't need to know (or care) what the message type is!
The usage is finally what we want:
Message2 msg = await deviceInterface.WaitForMessage<Message2>();
Bells and Whistles
In our interface, we should expose some additional methods:
public interface ITCS
{
void SetResult(object val);
void TrySetResult(object val);
void SetCanceled();
void TrySetCanceled();
}
The implementation should be obvious!
We can also check that the message type is of the type we're expecting to handle in the task completion:
internal class TCS<T> : ITCS where T : Message, new()
{
public bool IsOfType(Type t)
{
return typeof(T) == t;
}
public TaskCompletionSource<T> TaskCompletionSource = new TaskCompletionSource<T>();
public void SetResult(object val)
{
Assert.That(IsOfType(val.GetType()), "TrySetResult called for an object of type " +
val.GetType().ToString() +
" when the task completion is expecting an object of type " + typeof(T).ToString());
internalTcs.SetResult((T)val);
}
...etc...
We can also do some more interesting things in our message handler, like canceling the awaiting task (and thus the continuation) if the wrong message is received:
protected void OnMessageReceived(object sender, MessageEventArgs e)
{
if (tcs.IsOfType(e.Message.GetType()))
{
tcs.SetResult(e.Message);
}
else
{
tcs.SetCanceled();
}
}
Which gives us a way to recover from the wrong message:
static async void DoSomethingThatExpectsMessage2()
{
try
{
Message2 msg = await deviceInterface.WaitForMessage<Message2>();
Console.WriteLine("Message2 received.");
}
catch (TaskCanceledException)
{
// Do recovery here:
Console.WriteLine("Task has been canceled!");
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
We can also fire off other events, like an "unsolicited message" event (not illustrated), when we receive an unexpected event.
Which brings me to another point -- we can certainly receive messages when we're not waiting for something, anything. So it'll be useful to implement something like this:
public class DeviceInterface
{
public event EventHandler<MessageEventArgs> UnsolicitedMessage;
...
protected void OnMessageReceived(object sender, MessageEventArgs e)
{
if ((tcs == null) || (tcs.IsDone))
{
UnsolicitedMessage.Fire(this, e);
}
else
{
if (tcs.IsOfType(e.Message.GetType()))
{
tcs.SetResult(e.Message);
}
else
{
tcs.SetCanceled();
UnsolicitedMessage.Fire(this, e);
}
}
}
}
Because I know that I'm only ever awaiting for one specific response, I can check that there is no awaiting task:
public Task<T> WaitForMessage<T>() where T : Message, new()
{
Assert.ThatIf(tcs != null, () => tcs.IsDone,
() => "The task " + tcs.TypeOf.ToString() + " is already awaiting completion.");
TCS<T> wrappedTcs = new TCS<T>();
tcs = wrappedTcs;
return wrappedTcs.Task;
}
This way, if I accidentally do something like this:
DoSomethingThatExpectsMessage2();
DoSomethingThatExpectsMessage3();
Then I get an exception:
Extension Methods and Helpers
There's a couple extension methods and assertion helpers that I've used in this article which should be self-explanatory:
public static class Assert
{
public static void That(bool b, string exceptionMessage)
{
if (!b) throw new Exception(exceptionMessage);
}
// Don't evaluate the message unless b is false.
public static void That(bool b, Func<string> msg)
{
if (!b) throw new Exception(msg());
}
/// <summary>
/// Only evaluate the thenExpr if b is true.
/// </summary>
public static void ThatIf(bool b, Func<bool> thenExpr, Func<string> msg)
{
if (b)
{
Assert.That(thenExpr(), msg);
}
}
}
public static class ExtensionMethods
{
public static void Fire<TEventArgs>(this EventHandler<TEventArgs> theEvent,
object sender, TEventArgs e) where TEventArgs : EventArgs
{
if (theEvent != null)
{
theEvent(sender, e);
}
}
public static bool IsDone(this Task task)
{
return task.IsCanceled | task.IsCompleted | task.IsFaulted;
}
}
Conclusion
I hope you enjoyed the road trip!