Reactive Extensions

Processing tasks as they complete with Rx

Microsoft has a couple of articles on processing tasks as they complete, instead of waiting for Task.WhenAll: Start Multiple Async Tasks and Process Them As They Complete & Processing tasks as they complete.  I’m going to show how to do it with Reactive Extensions.  A way to think about IObservable is it’s a future IEnumerable and what is a Task[]?  It’s a future IEnumerable!

For this example I’m going to use a Console application and hit an OData endpoint.

First we are going to create a class that will query the OData endpoint figure out how many calls we need to make then return back an array of Tasks.  The method signature looks a little funky because OData Task returns back an IEnumberable<T>.

public class CombineOData
{
    private readonly DataServiceContext _dataServiceContext;
    private const double PageSize = 250L;

    public CombineOData(Uri oDataSite)
    {
        _dataServiceContext = new DataServiceContext(oDataSite, DataServiceProtocolVersion.V3);
    }

    public async Task<Task<IEnumerable>[]> GetData(string entityName)
    {
        var query = _dataServiceContext.CreateQuery(entityName);
        // Get the total number of entities in the feed
        query = query.AddQueryOption("$inlinecount", "allpages")
                     .AddQueryOption("$top", PageSize);
            
        // need to make the first call to just get the total count
        var webCall = await Task.Factory.FromAsync<IEnumerable>(query.BeginExecute, query.EndExecute, null);
        var totalCount = ((QueryOperationResponse) webCall).TotalCount;

        // Store the result into a new task so we don't hit the website again for data we already have
        var result = new [] {Task.FromResult(webCall)};

        // If there are more entities than asked for create them all
        if (totalCount > PageSize)
        {
            result = AllWebCalls(totalCount, entityName).Concat(result).ToArray();
        }

        return result;
    }

    private IEnumerable<Task<IEnumerable>> AllWebCalls(long entityCount, string entityName)
    {
        // get the number of entities we need to download per chunk
        var numberOfCalls = Convert.ToInt32(Math.Ceiling(entityCount / PageSize) - 1);
        for (var i = 0; i < numberOfCalls; i++)
        {
            var query =
                _dataServiceContext.CreateQuery(entityName)
                                   .AddQueryOption("$top", PageSize)
                                   .AddQueryOption("$inlinecount", "none")
                                   .AddQueryOption("$skip", PageSize*(i + 1));

            yield return
                Task.Factory.FromAsync<IEnumerable>(query.BeginExecute, query.EndExecute, null);

        }
    }
}

We also need a class to deserialize the OData call into.

public class ProductCatalog
{
    public DataServiceStreamLink ThumbNailPhoto { get; set; }
    public DataServiceStreamLink LargePhoto { get; set; }
    public int ProductID { get; set; }
    public string ProductNumber { get; set; }
    public string ProductName { get; set; }
    public string ProductModel { get; set; }
    public string ProductCategory { get; set; }
    public string ProductSubcategory { get; set; }
    public string Description { get; set; }
    public string CultureID { get; set; }
    public string Color { get; set; }
    public string Size { get; set; }
    public decimal? Weight { get; set; }
    public decimal ListPrice { get; set; }
    public long ID { get; set; }
}

Now in our console application we will convert the task into IObservables and process them when they come in.

private static void Main(string[] args)
{
    var task = Task.Run(async () =>
        {
            var uri =
                new Uri("http://services.odata.org/AdventureWorksV3/AdventureWorks.svc/");
            var client = new CombineOData(uri);

            // products is an array of task that return IEnumerables.
            var productsTasks = await client.GetData("ProductCatalog");

            // convert the task to observable 
            // then merge them into one observable
            // then make the IEnumberable into an observable
            var products = productsTasks.Select(p => p.ToObservable())
                                        .Merge()
                                        .SelectMany(t => t.ToObservable());

            // want to IConnectable because we will subscribe twice one for the result and await for the results to be done
            var publisher = products.Publish();
                    
            // just log to the console window to view the results as they come in to prove it's processing it as they arrive
            publisher.Subscribe(
                      catalog => Console.WriteLine("{0} {1} {2} {3}", catalog.ID, catalog.ProductID, catalog.ProductName,
                                                   catalog.ProductNumber), 
                      ex => Console.WriteLine(ex.Message), 
                      () => Console.WriteLine("Completed"));

            // Don't continue past this point until the observable is complete
            // this will also auto connect the publisher and start the process
            var data = await publisher;

            // proof that we didn't hit this line  until afterwards
            Console.WriteLine("After await {0}", data.ID);
        });
            
    Console.ReadKey();
}

The real key is to switch the task ToObservable and the merge to make them one sequence. From that we just needed to publish so we can have our method await for it to finish.

Tags: , , ,

Tuesday, April 22nd, 2014 Reactive Extensions No Comments