Task
Processing tasks as they complete with Rx
Microsoft has a couple of articles on processing tasks as they complete, instead of waiting for Task.WhenAll: Start Multiple Async Tasks and Process Them As They Complete & Processing tasks as they complete. I’m going to show how to do it with Reactive Extensions. A way to think about IObservable is it’s a future IEnumerable and what is a Task[]? It’s a future IEnumerable!
For this example I’m going to use a Console application and hit an OData endpoint.
First we are going to create a class that will query the OData endpoint figure out how many calls we need to make then return back an array of Tasks. The method signature looks a little funky because OData Task returns back an IEnumberable<T>.
public class CombineOData { private readonly DataServiceContext _dataServiceContext; private const double PageSize = 250L; public CombineOData(Uri oDataSite) { _dataServiceContext = new DataServiceContext(oDataSite, DataServiceProtocolVersion.V3); } public async Task<Task<IEnumerable>[]> GetData(string entityName) { var query = _dataServiceContext.CreateQuery(entityName); // Get the total number of entities in the feed query = query.AddQueryOption("$inlinecount", "allpages") .AddQueryOption("$top", PageSize); // need to make the first call to just get the total count var webCall = await Task.Factory.FromAsync<IEnumerable>(query.BeginExecute, query.EndExecute, null); var totalCount = ((QueryOperationResponse) webCall).TotalCount; // Store the result into a new task so we don't hit the website again for data we already have var result = new [] {Task.FromResult(webCall)}; // If there are more entities than asked for create them all if (totalCount > PageSize) { result = AllWebCalls(totalCount, entityName).Concat(result).ToArray(); } return result; } private IEnumerable<Task<IEnumerable>> AllWebCalls(long entityCount, string entityName) { // get the number of entities we need to download per chunk var numberOfCalls = Convert.ToInt32(Math.Ceiling(entityCount / PageSize) - 1); for (var i = 0; i < numberOfCalls; i++) { var query = _dataServiceContext.CreateQuery(entityName) .AddQueryOption("$top", PageSize) .AddQueryOption("$inlinecount", "none") .AddQueryOption("$skip", PageSize*(i + 1)); yield return Task.Factory.FromAsync<IEnumerable>(query.BeginExecute, query.EndExecute, null); } } }
We also need a class to deserialize the OData call into.
public class ProductCatalog { public DataServiceStreamLink ThumbNailPhoto { get; set; } public DataServiceStreamLink LargePhoto { get; set; } public int ProductID { get; set; } public string ProductNumber { get; set; } public string ProductName { get; set; } public string ProductModel { get; set; } public string ProductCategory { get; set; } public string ProductSubcategory { get; set; } public string Description { get; set; } public string CultureID { get; set; } public string Color { get; set; } public string Size { get; set; } public decimal? Weight { get; set; } public decimal ListPrice { get; set; } public long ID { get; set; } }
Now in our console application we will convert the task into IObservables and process them when they come in.
private static void Main(string[] args) { var task = Task.Run(async () => { var uri = new Uri("http://services.odata.org/AdventureWorksV3/AdventureWorks.svc/"); var client = new CombineOData(uri); // products is an array of task that return IEnumerables. var productsTasks = await client.GetData("ProductCatalog"); // convert the task to observable // then merge them into one observable // then make the IEnumberable into an observable var products = productsTasks.Select(p => p.ToObservable()) .Merge() .SelectMany(t => t.ToObservable()); // want to IConnectable because we will subscribe twice one for the result and await for the results to be done var publisher = products.Publish(); // just log to the console window to view the results as they come in to prove it's processing it as they arrive publisher.Subscribe( catalog => Console.WriteLine("{0} {1} {2} {3}", catalog.ID, catalog.ProductID, catalog.ProductName, catalog.ProductNumber), ex => Console.WriteLine(ex.Message), () => Console.WriteLine("Completed")); // Don't continue past this point until the observable is complete // this will also auto connect the publisher and start the process var data = await publisher; // proof that we didn't hit this line until afterwards Console.WriteLine("After await {0}", data.ID); }); Console.ReadKey(); }
The real key is to switch the task ToObservable and the merge to make them one sequence. From that we just needed to publish so we can have our method await for it to finish.