//--------------------------------------------------------------------------
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// File: LinqToTasks.cs
//
//--------------------------------------------------------------------------
using System.Collections;
using System.Collections.Generic;
using System.Linq;
namespace System.Threading.Tasks.Linq
{
/// <summary>
/// Provides LINQ support for Tasks by implementing the primary standard query operators.
/// </summary>
public static class LinqToTasks
{
public static Task<TResult> Select<TSource, TResult>(this Task<TSource> source, Func<TSource, TResult> selector)
{
// Validate arguments
if (source == null) throw new ArgumentNullException("source");
if (selector == null) throw new ArgumentNullException("selector");
// Use a continuation to run the selector function
return source.ContinueWith(t => selector(t.Result), TaskContinuationOptions.NotOnCanceled);
}
public static Task<TResult> SelectMany<TSource, TResult>(this Task<TSource> source, Func<TSource, Task<TResult>> selector)
{
// Validate arguments
if (source == null) throw new ArgumentNullException("source");
if (selector == null) throw new ArgumentNullException("selector");
// Use a continuation to run the selector function.
return source.ContinueWith(t => selector(t.Result), TaskContinuationOptions.NotOnCanceled).Unwrap();
}
public static Task<TResult> SelectMany<TSource, TCollection, TResult>(
this Task<TSource> source,
Func<TSource, Task<TCollection>> collectionSelector,
Func<TSource, TCollection, TResult> resultSelector)
{
// Validate arguments
if (source == null) throw new ArgumentNullException("source");
if (collectionSelector == null) throw new ArgumentNullException("collectionSelector");
if (resultSelector == null) throw new ArgumentNullException("resultSelector");
// When the source completes, run the collectionSelector to get the next Task,
// and continue off of it to run the result selector
return source.ContinueWith(t =>
{
return collectionSelector(t.Result).
ContinueWith(c => resultSelector(t.Result, c.Result), TaskContinuationOptions.NotOnCanceled);
}, TaskContinuationOptions.NotOnCanceled).Unwrap();
}
public static Task<TSource> Where<TSource>(this Task<TSource> source, Func<TSource, bool> predicate)
{
// Validate arguments
if (source == null) throw new ArgumentNullException("source");
if (predicate == null) throw new ArgumentNullException("predicate");
// Create a continuation to run the predicate and return the source's result.
// If the predicate returns false, cancel the returned Task.
var cts = new CancellationTokenSource();
return source.ContinueWith(t =>
{
var result = t.Result;
if (!predicate(result)) cts.CancelAndThrow();
return result;
}, cts.Token, TaskContinuationOptions.NotOnCanceled, TaskScheduler.Default);
}
public static Task<TResult> Join<TOuter, TInner, TKey, TResult>(
this Task<TOuter> outer, Task<TInner> inner,
Func<TOuter, TKey> outerKeySelector,
Func<TInner, TKey> innerKeySelector,
Func<TOuter, TInner, TResult> resultSelector)
{
// Argument validation handled by delegated method call
return Join(outer, inner, outerKeySelector, innerKeySelector, resultSelector, EqualityComparer<TKey>.Default);
}
public static Task<TResult> Join<TOuter, TInner, TKey, TResult>(
this Task<TOuter> outer, Task<TInner> inner,
Func<TOuter, TKey> outerKeySelector,
Func<TInner, TKey> innerKeySelector,
Func<TOuter, TInner, TResult> resultSelector,
IEqualityComparer<TKey> comparer)
{
// Validate arguments
if (outer == null) throw new ArgumentNullException("outer");
if (inner == null) throw new ArgumentNullException("inner");
if (outerKeySelector == null) throw new ArgumentNullException("outerKeySelector");
if (innerKeySelector == null) throw new ArgumentNullException("innerKeySelector");
if (resultSelector == null) throw new ArgumentNullException("resultSelector");
if (comparer == null) throw new ArgumentNullException("comparer");
// First continue off of the outer and then off of the inner. Two separate
// continuations are used so that each may be canceled easily using the NotOnCanceled option.
return outer.ContinueWith(delegate
{
var cts = new CancellationTokenSource();
return inner.ContinueWith(delegate
{
// Propagate all exceptions
Task.WaitAll(outer, inner);
// Both completed successfully, so if their keys are equal, return the result
if (comparer.Equals(outerKeySelector(outer.Result), innerKeySelector(inner.Result)))
{
return resultSelector(outer.Result, inner.Result);
}
// Otherwise, cancel this task.
else
{
cts.CancelAndThrow();
return default(TResult); // won't be reached
}
}, cts.Token, TaskContinuationOptions.NotOnCanceled, TaskScheduler.Default);
}, TaskContinuationOptions.NotOnCanceled).Unwrap();
}
public static Task<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(
this Task<TOuter> outer, Task<TInner> inner,
Func<TOuter, TKey> outerKeySelector,
Func<TInner, TKey> innerKeySelector,
Func<TOuter, Task<TInner>, TResult> resultSelector)
{
// Argument validation handled by delegated method call
return GroupJoin(outer, inner, outerKeySelector, innerKeySelector, resultSelector, EqualityComparer<TKey>.Default);
}
public static Task<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(
this Task<TOuter> outer, Task<TInner> inner,
Func<TOuter, TKey> outerKeySelector,
Func<TInner, TKey> innerKeySelector,
Func<TOuter, Task<TInner>, TResult> resultSelector,
IEqualityComparer<TKey> comparer)
{
// Validate arguments
if (outer == null) throw new ArgumentNullException("outer");
if (inner == null) throw new ArgumentNullException("inner");
if (outerKeySelector == null) throw new ArgumentNullException("outerKeySelector");
if (innerKeySelector == null) throw new ArgumentNullException("innerKeySelector");
if (resultSelector == null) throw new ArgumentNullException("resultSelector");
if (comparer == null) throw new ArgumentNullException("comparer");
// First continue off of the outer and then off of the inner. Two separate
// continuations are used so that each may be canceled easily using the NotOnCanceled option.
return outer.ContinueWith(delegate
{
var cts = new CancellationTokenSource();
return inner.ContinueWith(delegate
{
// Propagate all exceptions
Task.WaitAll(outer, inner);
// Both completed successfully, so if their keys are equal, return the result
if (comparer.Equals(outerKeySelector(outer.Result), innerKeySelector(inner.Result)))
{
return resultSelector(outer.Result, inner);
}
// Otherwise, cancel this task.
else
{
cts.CancelAndThrow();
return default(TResult); // won't be reached
}
}, cts.Token, TaskContinuationOptions.NotOnCanceled, TaskScheduler.Default);
}, TaskContinuationOptions.NotOnCanceled).Unwrap();
}
public static Task<IGrouping<TKey, TElement>> GroupBy<TSource, TKey, TElement>(
this Task<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector)
{
// Validate arguments
if (source == null) throw new ArgumentNullException("source");
if (keySelector == null) throw new ArgumentNullException("keySelector");
if (elementSelector == null) throw new ArgumentNullException("elementSelector");
// When the source completes, return a grouping of just the one element
return source.ContinueWith(t =>
{
var result = t.Result;
var key = keySelector(result);
var element = elementSelector(result);
return (IGrouping<TKey,TElement>)new OneElementGrouping<TKey,TElement> { Key = key, Element = element };
}, TaskContinuationOptions.NotOnCanceled);
}
/// <summary>Represents a grouping of one element.</summary>
/// <typeparam name="TKey">The type of the key for the element.</typeparam>
/// <typeparam name="TElement">The type of the element.</typeparam>
private class OneElementGrouping<TKey,TElement> : IGrouping<TKey, TElement>
{
public TKey Key { get; internal set; }
internal TElement Element { get; set; }
public IEnumerator<TElement> GetEnumerator() { yield return Element; }
IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); }
}
public static Task<TSource> OrderBy<TSource, TKey>(this Task<TSource> source, Func<TSource, TKey> keySelector)
{
// A single item is already in sorted order, no matter what the key selector is, so just
// return the original.
if (source == null) throw new ArgumentNullException("source");
return source;
}
public static Task<TSource> OrderByDescending<TSource, TKey>(this Task<TSource> source, Func<TSource, TKey> keySelector)
{
// A single item is already in sorted order, no matter what the key selector is, so just
// return the original.
if (source == null) throw new ArgumentNullException("source");
return source;
}
public static Task<TSource> ThenBy<TSource, TKey>(this Task<TSource> source, Func<TSource, TKey> keySelector)
{
// A single item is already in sorted order, no matter what the key selector is, so just
// return the original.
if (source == null) throw new ArgumentNullException("source");
return source;
}
public static Task<TSource> ThenByDescending<TSource, TKey>(this Task<TSource> source, Func<TSource, TKey> keySelector)
{
// A single item is already in sorted order, no matter what the key selector is, so just
// return the original.
if (source == null) throw new ArgumentNullException("source");
return source;
}
}
}