Monday, January 11, 2010

Silverlight, RIA Services, and the Reactive Framework (Rx)

This post is not specific to RIA Services but I thought I'd add it to the title since the example is based on RIA Services.

Anyone who is familiar with RIA Services will recognize this piece of code that is used to load a entity -
EntityQuery<Person> personQuery = DomainContext.GetPersonQuery();
DomainContext.Load(personQuery, loadOperation =>
{
    HandleLoadOperationError(loadOperation);
}, null);
Typically this piece of code wll be called from some other code in a simulated synchronous fashion, like this -
public void LoadPerson(Action callback)
{
    EntityQuery<Person> personQuery = DomainContext.GetPersonQuery();
    DomainContext.Load(personQuery, loadOperation =>
    {
        if (!HandleLoadOperationError(loadOperation))
            return;

        if (callback)
        {
            InitializeData();
            callback(null);
        }
    }, null);
}

public void DoMore()
{
    ShowBusyIndicator(); // Disable the active window and show busy indicator
    LoadPerson(() =>
    {
        // This will be called after the asychronous load in LoadPerson has completed
        // ...
        CloseBusyIndicator(); // Re-enable user interaction
        // ...
    }    
}
Let's say we have to display a search page with a bunch of fields, mostly pre-populated combo boxes and list boxes. The data for the combobox and listbox comes from corresponding Entity objects. One way to load the Entities could be the following -
public void LoadLookupEntities(Action callback)
{
    DomainContext.Load(DomainContext.GetPersonQuery(), loadOperation1 =>
    {
        if (!HandleLoadOperationError(loadOperation1))
            return;

        DomainContext.Load(DomainContext.GetGenderQuery(), loadOperation2 =>
        {
            if (!HandleLoadOperationError(loadOperation2))
                return;

            DomainContext.Load(DomainContext.GetEthnicityQuery(), loadOperation3 =>
            {
                if (!HandleLoadOperationError(loadOperation3))
                    return;

                // and so on ...

                if (callback)
                {
                    InitializeData();
                    callback(null);
                }
            }, null);
        }, null);
    }, null);
}
Now let's go ahead and refactor LoadLookupEntities with the Reactive Framework (Rx). First we need to create an extension method LoadAsync on DomainContext which is essentially the same as Load but will return an IOBservable<LoadOperation<>> instead of LoadOperation<>. This is one way to do it (this technique can be used for async patterns that don't follow the begin/end pattern) -
public static class DomainContextExtensions
{
    public static IObservable<LoadOperation<TEntity>> LoadAsync<TEntity>(
        this DomainContext domainContext,
        EntityQuery<TEntity> query)
        where TEntity : Entity
    {
        var asyncSubject = new AsyncSubject<LoadOperation<TEntity>>();

        domainContext.Load(query, loadOperation =>
        {
            if (loadOperation.HasError)
                asyncSubject.OnError(loadOperation.Error);
            else
            {
                asyncSubject.OnNext(loadOperation);
                asyncSubject.OnCompleted();
            }
        }, null);

        return asyncSubject;
    }
}
Once we have the extension method let's go ahead and use it.
public void LoadLookupEntities(Action callback)
{
    var loadAll =
        from loadOperation1 in DomainContext.LoadAsync(DomainContext.GetPersonQuery())
        where !HandleLoadOperationError(loadOperation1)
        from loadOperation2 in DomainContext.LoadAsync(DomainContext.GetGenderQuery())
        where !HandleLoadOperationError(loadOperation2)
        from loadOperation3 in DomainContext.LoadAsync(DomainContext.GetEthnicityQuery())
        where !HandleLoadOperationError(loadOperation3)
        // ...
        select true;

    loadAll.Subscribe(loadedAll =>
    {
        if (loadedAll)
        {
            InitializeData();
            callback(null);
        }
    });
}
Or if we want parallel execution -
var loadAllInParallel =
    Observable.ForkJoin<OperationBase>(
        DomainContext.LoadAsync(DomainContext.GetPersonQuery()).Select(s => s as OperationBase),
        DomainContext.LoadAsync(DomainContext.GetGenderQuery()).Select(s => s as OperationBase),
        DomainContext.LoadAsync(DomainContext.GetEthnicityQuery()).Select(s => s as OperationBase),
    ).Finally(() => System.Diagnostics.Debug.WriteLine("Done!"));

loadAllInParallel.Subscribe(
    loadOperationsAll =>
    {
        InitializeData();
        callback(null);
    },
    error =>
    {
        // Handle Errors
    });
That's it!

Why would you want to do this? If you don't know the answer, go ahead and check out the resources listed below. If you still don't know the answer, you don't need it!

My reaction to the Rx framework - Unfreaking believably cool! More importantly, it's so useful that I suspect I will be using it as commonly as I do Linq.

Note that the examples above are just that - examples. Obviously the error handling has a lot to be desired!

Some Really Good Resources on Rx