Monday, January 11, 2010

Silverlight, RIA Services, and the Reactive Framework (Rx)

This post is not specific to RIA Services but I thought I'd add it to the title since the example is based on RIA Services.

Anyone who is familiar with RIA Services will recognize this piece of code that is used to load a entity -
EntityQuery<Person> personQuery = DomainContext.GetPersonQuery();
DomainContext.Load(personQuery, loadOperation =>
{
    HandleLoadOperationError(loadOperation);
}, null);
Typically this piece of code wll be called from some other code in a simulated synchronous fashion, like this -
public void LoadPerson(Action callback)
{
    EntityQuery<Person> personQuery = DomainContext.GetPersonQuery();
    DomainContext.Load(personQuery, loadOperation =>
    {
        if (!HandleLoadOperationError(loadOperation))
            return;

        if (callback)
        {
            InitializeData();
            callback(null);
        }
    }, null);
}

public void DoMore()
{
    ShowBusyIndicator(); // Disable the active window and show busy indicator
    LoadPerson(() =>
    {
        // This will be called after the asychronous load in LoadPerson has completed
        // ...
        CloseBusyIndicator(); // Re-enable user interaction
        // ...
    }    
}
Let's say we have to display a search page with a bunch of fields, mostly pre-populated combo boxes and list boxes. The data for the combobox and listbox comes from corresponding Entity objects. One way to load the Entities could be the following -
public void LoadLookupEntities(Action callback)
{
    DomainContext.Load(DomainContext.GetPersonQuery(), loadOperation1 =>
    {
        if (!HandleLoadOperationError(loadOperation1))
            return;

        DomainContext.Load(DomainContext.GetGenderQuery(), loadOperation2 =>
        {
            if (!HandleLoadOperationError(loadOperation2))
                return;

            DomainContext.Load(DomainContext.GetEthnicityQuery(), loadOperation3 =>
            {
                if (!HandleLoadOperationError(loadOperation3))
                    return;

                // and so on ...

                if (callback)
                {
                    InitializeData();
                    callback(null);
                }
            }, null);
        }, null);
    }, null);
}
Now let's go ahead and refactor LoadLookupEntities with the Reactive Framework (Rx). First we need to create an extension method LoadAsync on DomainContext which is essentially the same as Load but will return an IOBservable<LoadOperation<>> instead of LoadOperation<>. This is one way to do it (this technique can be used for async patterns that don't follow the begin/end pattern) -
public static class DomainContextExtensions
{
    public static IObservable<LoadOperation<TEntity>> LoadAsync<TEntity>(
        this DomainContext domainContext,
        EntityQuery<TEntity> query)
        where TEntity : Entity
    {
        var asyncSubject = new AsyncSubject<LoadOperation<TEntity>>();

        domainContext.Load(query, loadOperation =>
        {
            if (loadOperation.HasError)
                asyncSubject.OnError(loadOperation.Error);
            else
            {
                asyncSubject.OnNext(loadOperation);
                asyncSubject.OnCompleted();
            }
        }, null);

        return asyncSubject;
    }
}
Once we have the extension method let's go ahead and use it.
public void LoadLookupEntities(Action callback)
{
    var loadAll =
        from loadOperation1 in DomainContext.LoadAsync(DomainContext.GetPersonQuery())
        where !HandleLoadOperationError(loadOperation1)
        from loadOperation2 in DomainContext.LoadAsync(DomainContext.GetGenderQuery())
        where !HandleLoadOperationError(loadOperation2)
        from loadOperation3 in DomainContext.LoadAsync(DomainContext.GetEthnicityQuery())
        where !HandleLoadOperationError(loadOperation3)
        // ...
        select true;

    loadAll.Subscribe(loadedAll =>
    {
        if (loadedAll)
        {
            InitializeData();
            callback(null);
        }
    });
}
Or if we want parallel execution -
var loadAllInParallel =
    Observable.ForkJoin<OperationBase>(
        DomainContext.LoadAsync(DomainContext.GetPersonQuery()).Select(s => s as OperationBase),
        DomainContext.LoadAsync(DomainContext.GetGenderQuery()).Select(s => s as OperationBase),
        DomainContext.LoadAsync(DomainContext.GetEthnicityQuery()).Select(s => s as OperationBase),
    ).Finally(() => System.Diagnostics.Debug.WriteLine("Done!"));

loadAllInParallel.Subscribe(
    loadOperationsAll =>
    {
        InitializeData();
        callback(null);
    },
    error =>
    {
        // Handle Errors
    });
That's it!

Why would you want to do this? If you don't know the answer, go ahead and check out the resources listed below. If you still don't know the answer, you don't need it!

My reaction to the Rx framework - Unfreaking believably cool! More importantly, it's so useful that I suspect I will be using it as commonly as I do Linq.

Note that the examples above are just that - examples. Obviously the error handling has a lot to be desired!

Some Really Good Resources on Rx

5 comments:

Merrill Marie said...

I have a desperate, real life question/situation ..

1. I created a Silverlight Business Application .. and have added 4 pages in addition to the main, home, and about pages.
2. Each page is one of your completed chap 09 solutions ..
3. On each “page” I go to my database for the Products download.
4. Example:

Page 1.

I go to the SQL data and create my product context. Everything is cool.

When I go to Page 2, why do I have to go back to the SQL database and re download the data. I think I already have it in my DomainContext.Entities.

I am assuming that the data is in the domain universe, somewhere .. can I access it without having to return to the SQL database when I go to a new page?

Thanks

silverlight application developer said...

You ave really nice Blog here and I am saving it in my favorite list. Keep posting similar stuff.

Unknown said...

Its a great to see all this coding and functionality about Silverlight Development. thanks for sharing about Silverlight.

Anonymous said...

How can I accomplish something like this where I want to use the result from the first query to restrict a second query, I'm getting a Contains not supported error

var load5 = Context.LoadAsync(Context.GetAuditsQuery());
load5.Subscribe(l =>
{
var sectionsQuery =
Context.GetAuditSectionsQuery().Where(
s => l.Entities.Select(x => x.Id).Contains(s.AuditId));
var load6 = Context.LoadAsync(sectionsQuery);

load6.Subscribe(l2 => { });
}
);

Rami AbuGhazaleh said...

Thank you, Wilfred! :)