Observable Cache At A Glance

In my previous post I introduced the observable list which was released in version 4 of Dynamic Data.

For a working demonstration of dynamic data see Dynamic Trader.

Although Dynamic Data has always had an observable cache I have never posted an ‘at a glance’ offering so I have put together this post for completeness. The observable cache is the big brother of the observable list. It has existed for years, is very well tried and tested and a lot of trouble has been taking to ensure good performance can be easily achieved. This document briefly outlines the usage and behaviour of the observable cache.

In my next post I will examine the differences between the observable cache and observable list and which circumstances should one be favoured over the other. I will also examine how performance can be fine tuned.

Create and maintain a cache

Create an observable cache like this:

var myCache = new SourceCache<TObject,TKey>(t => key);

This can be amending calling the the edit methods. For example:

myCache.Clear();
myCache.AddOrUpdate(myItems);

The Clear and AddOrUpdate methods above will each produce a distinct change notification. In order to increase efficiency when making multiple amendments, the cache provides a means of batch editing. This is achieved using the .BatchUpdate method which ensures only a single change notification is produced.

myCache.BatchUpdate(innerCache =>
              {
                  innerCache.Clear();
                  innerCache.AddOrUpdate(myItems);
              });

If myCache is to be exposed publicly it can be made read only using .AsObservableCache

IObservableCache<TObject,TKey> readonlyCache = myCache.AsObservableCache();

which hides the edit methods.

The cache is observed by calling myCache.Connect() like this:

IObservable<IChangeSet<TObject,TKey>> myCacheObservable = myCache.Connect();

This creates an observable change set for which there are dozens of operators. The changes are transmitted as an Rx observable, so they are fluent and composable.

Observing dynamic data

Dynamic Data is based on the concept of creating and manipulating observable change sets. An observable change set is a collection of changes which have been applied to the underlying data source. The change set is always preceded with any data which is already in the underlying collection.

The primary method of creating observable change sets is to connect to an instances of the cache. There are alternative methods to produce observable change sets however, depending on the data source.

Connect to a Cache

The most common means of creating an observable change set is by calling Connect on an observable cache

var myObservableChangeSet = myDynamicDataSource.Connect();

The observable change set always begins with any items which are in the cache when the subscription is made.

Create an Observable Change Set from an Rx Observable

Given either of the following Rx observables:

IObservable<T> myObservable;
IObservable<IEnumerable<T>> myObservable;

an observable change set can be created like by calling .ToObservableChangeSet like this:

var myObservableChangeSet = myObservable.ToObservableChangeSet(t=> t.key);

Chain together operators

Since dynamic data is based on the IObservable interface and each operators returns an observable change set, operators can be chained together and flexibly composed.

For example if you have an observable cache containing trades you can sequentially apply multiple operators,

var myFluentOperation = myTrades
                    .Filter(trade=>trade.Status == TradeStatus.Live) 
                    .Transform(trade => new TradeProxy(trade))
                    .Sort(SortExpressionComparer<TradeProxy>.Descending(t => t.Timestamp))
                    .Subscribe(changes=>//do something with the result...);  

The result is a true observable. When values are added, replaced or removed from myTrades, the result will always reflect these changes.

Create a Derived Cache

Any observable change set can be converted into an observable cache by calling the AsObservableCache method

var readOnlyCache = myTrades
            .Filter(trade=>trade.Status == TradeStatus.Live) 
            .AsObservableCache();

which produces a self-maintaining cache of live trades.

Operator overview

Dynamic data is built on top of Rx but since Rx does not deal with collection changes, there are specific operators which account for adds, updates and removes for a large number of operations. There are detailed in the following sections.

Restriction Operators

These operators reduce the amount of changes and hence underlying data when applied.

OPERATOR DESCRIPTION
ExpireAfter Time based expiry for a cache
Filter The semantic equivalent of Rx.Where. There is an overload to allow the predicate to be changed
IgnoreUpdateWhen Ignores an update when the condition is met.
IncludeUpdateWhen Only includes the update when the condition is met.
LimitSizeTo Limits the size of the source cache to the specified limit, providing a notification when an item is removed due to the size limit being exceeded
NotEmpty Prevent empty changesets. By default this is applied by dynamic data operators so if only required for custom operators
Page Applies paging to the the underlying data source. Parameters are provided and changed by providing an observable of page requests
SkipInitial Defer the subscription until there is underlying data has loaded and skip first set of changes
Top Limits the size of the result set to the specified number of items
Virtualise Resulting changes only includes only a virtualised subset of the underlying data. Parameters are provided and changed by providing an observable of virtual requests
WhereReasonsAre Includes changes for the specified reasons only
WhereReasonsAreNot Excludes changes for the specified reasons

Transformative Operators

OPERATOR DESCRIPTION
AsObservableCache Converts an observable change set into a read only observable cache / hides the edit methods of the source cache.
Convert Light weight conversion / casting of a type as specified by a value selector
DistinctValues Return a changeset of distinct values as specified by a value selector
Group Semantic equivalent of Rx.GroupBy operator
QueryWhenChanged The latest copy of the underlying data is exposed for querying i) after each modification to the underlying data ii) upon subscription. The method accepts a result selector which accepts allows any resulting observable to be returned
RemoveKey Removes the key from each change. This changes the change set from a cache change set to a list changeset
Sort Allow sort operators using a specified comparer. There is an overload to allow the sort to be changed
ToCollection Converts the changeset into a fully formed collection. Each change in the source results in a new read only collection
Transform The semantic equivalent of Rx.Select
TransformMany Equivalent to a select many transform
TransformSafe The semantic equivalent of Rx.Select. Provides and overload to handle any errors encountered when converting an item
TransformToTree Transforms the object to a fully recursive tree, create a hiearchy based on the pivot function

Join Operators

The following operators apply boolean logic logic between two or more observable change sets.

OPERATOR DESCRIPTION
And Applies a logical And between the underlying data in two or more observable change sets
Or Applies a logical Or between the underlying data in two or more observable change sets
Xor Applies a logical Xor between the underlying data in two or more observable change sets
Except Takes the results of the first observable change set and ignore any items produced by one or more other change sets

Aggregation Operators

The aggregation operators are also new to Dynamic Data version 4. They produce light weight continuous aggregation of the underlying observable change set.

OPERATOR DESCRIPTION
Count Counts the number of items in the underlying data
Sum The sum of values matching the specified value selector
Avg The average value matching the specified value selector
Maximum The maximum value matching the specified value selector
Minimum The minimum value matching the specified value selector
StdDev The standard deviation of all the values matching the specified value selector

Inspection of Items Operators

These operators concentrate on applying logic to individual items within the change set.

OPERATOR DESCRIPTION
DisposeMany Dispose each item when removed from the stream. Also disposes all items when the stream is completed.
ForEachChange Provides a callback for each individual item change
MergeMany Dynamically merges the observable which is selected from each item in the stream, and unmerges the item when it is no longer part of the stream.
OnItemRemoved Callback for each item as and when it is being removed from the stream. This is not the same as restricting using WhereReasonsAre as an item is also considered as being removed when a subscribed stream has been disposed
SubscribeMany Subscribes to each item when it is added to the stream and unsubcribes when it is removed. All items will be unsubscribed when the stream is disposed
TrueForAny Produces a boolean observable indicating whether the resulting value of whether any of the values from the specified observable matches the equality condition
TrueForAll Produces a boolean observable indicating whether the resulting value of whether all of the values from the specified observable matches the equality condition
Watch Returns an observable of any changes for a single item which matches a specified key
WatchValue Overload of watch which returns the changed value only
WhenAnyChanged Property changed observation of each item in the underlying collection which provides a notification when any item has changed. The notification includes the sender and value of the changed property
WhenAnyValueChanged Property changed observation of each item in the underlying collection which provides a notification when any item has changed. The notification only includes the value of the changed property

Additional Operators

OPERATOR DESCRIPTION
Adapt Injects side effects using IChangeSetAdaptor interface
Bind Ensures a specified observable collection reflects the underling data
Batch This is a short cut to applying a standard Rx time based buffer followed by FlattenBufferResult (see below)
BufferIf Conditional buffer which accepts a boolean observable to turn buffering on or off
ChangeKey Changes the unique item key
Clone Populate a dictionary instance with the latest changes
DeferUntilLoaded Delay subscription until the source is populated with data
FlattenBufferResult A changeset is already a collection and therefore becomes a nested collection when Rx buffer operatons are applied. Use this to flatten the result back to a single change set
PopulateInto Populates a source cache instance with the latest changes
RefCount Change set equivalent to Publish().RefCount(). The source is cached so long as there is at least 1 subscriber
Advertisements

Observable List At A Glance

Dynamic data v4 has been taken out of beta and officially released.

For source code see Dynamic Data on GitHub
For install see Dynamic Data On Nuget

Dynamic data has always had an observable cache and now after a busy 4 months development and 188 commits there is an observable list. I also could not resist throwing in some aggregation operations. These new features are outlined in this post.

Getting Started

Create an observable list like this:

var myInts= new SourceList<int>();

There are direct edit methods, for example

myInts.AddRange(Enumerable.Range(0, 10000)); 
myInts.Add(99999); 
myInts.Remove(99999);

Each amend operation will produced a change notification. A much more efficient option is to batch edit which produces a single notification.

myInts.Edit(innerList =>
{
   innerList.Clear();
   innerList.AddRange(Enumerable.Range(0, 10000));
});

If SourceList is to be exposed publicly it can be made read only

IObservableList<int> readonlyInts = myInts.AsObservableList();

which hides the edit methods.

The list changes can be observed by calling myInts.Connect(). This creates an observable change set for which there are dozens of list specific operators. The changes are transmitted as an Rx observable so are fluent and composable.

Chain together operators

The following filters the observable list to include only odd numbers and applies a sum operation.

int total = 0;
var sumOfOddNumbers = myInts.Connect()
.Filter(i => i%2 == 1)
.Sum(i => i)
.Subscribe(result => total = result);

The result is a true observable. When values are added, replaced or removed from myInts , the result will always reflect these changes.

Create a derived list

Any observable change set can be converted into an observable list so for example


var myOddNumberList = myInts.Connect()
.Filter(i => i%2 == 1)
.AsObservableList();

produces a read only observable list of odd numbers.

Dynamic data compliments Rx but since Rx does not deal with collections I have created many list specific operators to account for adds, updates, replaces and moves. There are detail below.

Observable List Operators (35 operators and counting)

The following table is a brief description of all the observable operators which can be applied to the observable list. I will use to as a basic of further detailed documentation. This will take some time but will be done in near future.

Operator Description
Adapt Injects side effects using IChangeSetAdaptor interface.
AsObservableList Converts an observable change set into a read only observable list / hides the edit methods of a source list.
Bind Ensures a specified observable collection reflects the underling data
BufferIf Conditional buffer which accepts a boolean observable to turn buffering on or off
Clone Populate an IList instance with the latest changes.
Convert Light weight conversion / casting of a type as specified by a value selector
DeferUntilLoaded Delay subscription until the source is populated with data.
DisposeMany Dispose each item when removed from the stream. Also disposes all items when the stream is completed.
DistinctValues Return a changeset of distinct values as specified by a value selector
ExpireAfter Time based expiry for a source list
Filter The semantic equivalent of Rx.Where. There is an overload to allow the predicate to be changed
FlattenBufferResult A changeset is already a collection and therefore becomes a nested collection when Rx buffer operatons are applied. Use this to flatten the result back to a single change set
GroupOn Semantic equivalent of Rx.GroupBy operator
LimitSizeTo Limits the size of the source cache to the specified limit, providing a notification when an item is removed due to the size limit being exceeded
MergeMany Dynamically merges the observable which is selected from each item in the stream, and unmerges the item when it is no longer part of the stream.
NotEmpty Prevents a zero count change set notification
OnItemRemoved Callback for each item as and when it is being removed from the stream
Page Applies paging to the the data source
PopulateInto Populates an ISourceList instance with the latest changes
QueryWhenChanged The latest copy of the underlying data is exposed for querying i) after each modification to the underlying data ii) upon subscription
RefCount Change set equivalent to Publish().RefCount(). The source is cached so long as there is at least 1 subscriber.
SkipInitial Defer the subscription until there is underlying data loaded and skip first set of changes
Sort Allow sort operators using a specified comparer. There is an overload to allow the sort to be changed
SubscribeMany Subscribes to each item when it is added to the stream and unsubcribes when it is removed. All items will be unsubscribed when the stream is disposed
ToCollection Converts the changeset into a fully formed collection. Each change in the source results in a new read only collection
Top Limits the size of the result set to the specified number of items
Transform The semantic equivalent of Rx.Select
TransformMany Equivalent to a select many transform
Virtualise Virtualises the data. Parameters are provided and changed using a virtualising controller
WhenAnyChanged Property changed observation of each item in the underlying collection which provides a notification when any item has changed. The notification includes the sender and value of the changed property.
WhenAnyValueChanged Property changed observation of each item in the underlying collection which provides a notification when any item has changed. The notification only includes the value of the changed property.
WhereReasonsAre Includes changes for the specified reasons only
WhereReasonsAreNot Excludes changes for the specified reasons
WithKey Applies a key to each item. This changes the change set from a list change set to a cache changeset

This list has less operators than the observable cache but will be expanded in future releases.

Aggregation Operators

The following aggregation are also new to Dynamic Data version 4. The apply to both observable list and observable cache. With the exception of Count() which requires no parameter, the value to continually compute on is specified as a value selector Func valueSelector

Operator Syntax
Count Counts the number of items in the underlying data
Sum The sum of values matching the specified value selector
Avg The average value matching the specified value selector
Maximum The maximum value matching the specified value selector
Minimum The minimum value matching the specified value selector
StdDev The standard deviation of all the values matching the specified value selector

Logical collection operators

I stated previously that my next post would be an illustration of how to use Dynamic Data to virtualise data (there is a Virtualise operator). Although powerful the example I had in mind just seemed too boring so I decided to put if off until I can think of a means of making the demo interesting. Instead I will unveil the logical collection operators together with what I hope you find to be a clear example.

Dynamic data can apply logical / set type operators across two or more dynamic data sources. Suppose we have the 2 dynamic data sources, sourceA and sourceB, we can apply the following logical collection operators:

Operator Syntax What gets included
Or sourceA.Or(sourceB) Items which in sourceA or sourceB
And sourceA.And(sourceB) Items which are in sourceA and sourceB
Except sourceA.Except(sourceB) Items which are in sourceA and not sourceB

The constraint is the left and right data source are of the same type. The power of dynamic data is when items get added, updated and removed in either sourceA or sourceB the result automatically updates to reflect this.

The truth is it is only occasionally that I have had the practical reason to apply the above operators but when I have I have been delighted that I took the trouble to program them in the first place as adding observers to two dynamic data sources and manually combining them to produce a third is tedious and boring code indeed.

Now for the example of when this can be useful.

Create a dynamic list of search hints

On several of the screens I have created for Dynamic Trader there is a search text box which is used to create and apply a predicate to the trades data source. For all the examples in the project it is produced using the following code.

return  trade => trade.CurrencyPair.Contains(searchText,stringComparison.OrdinalIgnoreCase) 
 ||  trade.Customer.Contains(searchText, StringComparison.OrdinalIgnoreCase);

This predicate applies to the currency pair and the customer fields so I think it would be good user experience to provide some hints to help the user understand what makes a valid search term. That’s why I have changed the text box to a combo to produce the following.

Combine 2 dynamic collections to form a list of hints

The combo box displays a list which is made up of the customers and currency pairs in the underlying dynamic data source. As the user types the list is filtered accordingly. Also as the underlying data changes, the resulting drop down list will also change accordingly. As with all examples in the demo project we have one in-memory data source which is accessed from theITradeService interface and for brevity is not show in the code below.

First we need a distinct dynamic data source of customers from the trades data source.

var customers = tradesDataSource.DistinctValues(trade => trade.Customer);

And to get a distinct dynamic data source of currency pairs from the same trades data source.

var customers = tradesDataSource.DistinctValues(trade => trade.CurrencyPair);

Both currency pair and customer are string fields so the two data sources can be combined like this

var combinedstrings = customers.Or(currencypairs)

And now we have single observable change set of currency pair and customer strings. Herein we do the standard dynamic data stuff to filter, sort and bind the result to the combo box.

//Filter the combined list according to user entered input and bind it to hints
var loader = combinedstrings
    .Filter(filter)     //filter strings using a filter controller according to user entered text
    .Sort(SortExpressionComparer<string>.Ascending(str=>str))
    .ObserveOn(schedulerProvider.MainThread)
    .Bind(_hints)       //bind to hints list
    .Subscribe();

I have skipped over a lot of detail such as filtering as I have described the process in many posts before and I did not want to obscure the crux of the example which is the Or operator. Additionally I also introduced the DistinctValues operator which I think from the above code is self explanatory.

With this example there is scope for one more powerful operator. If the result produced by the filter is large and being as the binding operation has to take place in the UI thread, you can use the Top operator to limit the number of items returned by the result set. It is applied after the sort operator.

    .Sort(SortExpressionComparer<string>.Ascending(str=>str))
    .Top(15) //limit result to a maximum of 15 items
    //....do binding

This will make the search combo responsive and non blocking even for filtering large lists. The whole example is very easy and produced in 70 lines of code. If you don’t believe me look at SearchHints.cs.

Summary

In the last post I promised to illustrate the Virtualise operator but decided against it for now, but in this short article I have shown 5 new operators which I hope will convince you that dynamic data can make life so much easier for the handling of asynchronous dynamic collections in any application.

Although the examples in this blog have mostly ended up in binding operations, I emphasise that dynamic data is for collections first and foremost whether on a mobile device, desktop or server. I have put results on the screen simply to visually show what is happening to the data. I have a few more app / binding samples in mind and after that I think I will write a purely logical example which involves no screen – perhaps the foundation of an algo-trading component.

Links to source code

Search hints object SearchHints.cs
View model of code using search hints LiveTradesViewer.cs
All examples here Dynamic trader demo on GitHub
Dynamic data Source code on Github

Dynamically Sort, Filter And Page Data

This is the first in a new series of blog where I will illustrate how dynamic data and reactive extensions can be used to improve the performance of WPF applications. Answering this question on stack overflow has prompted me to write what I plan to be a 3 part mini-series. In this part I will examine paging of in-memory data to reduce how much data a grid binds to and in subsequent posts I will illustrate virtualising data, and finally I will look at injecting behaviours into visible rows.

It has become my custom to start each post with an image then I explain how I got there. So here’s the image, it shows a screen which can filter, sort and page in-memory data.

Paging with dynamic data

If you cannot be bothered reading the remainder of this article, the demo and code can be studied via the following links

All the code is in the demo project is here Dynamic data demo project on GitHub
View model PagedDataViewer.cs
View PagedDataView.xaml
Dynamic data source code here Github

Why use paging

So the first question is why would you page data when I can simply bind to all of it? That’s a reasonable question and mostly I would say there is no need. However for large collections or collections which rapidly update the main thread can often block whilst the collection is updating. I have found this to be the case even with virtualization enabled in the xaml. This is because the observable collection can only be updated on the main thread which is clearly problematic as it blocks. Additionally the ListCollectionView may have to apply sort operations which are very expensive as the ListCollectionView has to linearly find the correct position of each item. As the collection gets larger the linear find and replace operations get slower and slower. I have found from bitter experience that binding to more than 10,000 rows in WPF can be problematic.

Actually I could go on for a while about performance bottlenecks in WPF both from the data and the xaml perspective but that is debate which we can have on another day.

There are of course many solutions to the problem but now I will concentrate of using dynamic data to create a paged view.

Create controllers to dynamically change observables

Dynamic data provides a load of extensions and some controllers to dynamically interrogate the data. For our screen we need a few controllers to dynamically filter, sort and page.

    var pageController = new PageController();  
    var filterController = new FilterController<T>(); 
    var sortController = new SortController<T>(); 

where the values can be changed like this

    //to change page
    pageController.Change(new PageRequest(1,100));
    //to change filter 
    filterController.Change(myobject=> //return a predicate);
    //to change sort
    sortController .Change( //return an IComparable<>);

Use these controller to build a filtered, sorted and paged dynamic data stream

As with all the example in this blog, the data is fed from a shared in-memory cache which is exposed through the ITradeService interface. The following code is only marginally different from code in previous posts.

    //this is an extension of observable collection optimised for dynamic data
    var collection = new ObservableCollectionExtended<TradeProxy>();

var loader = tradeService.All .Connect() 
   .Filter(filterController) // apply user filter
   .Transform(trade => new TradeProxy(trade), new ParallelisationOptions(ParallelType.Ordered, 5))
   .Sort(sortContoller, SortOptimisations.ComparesImmutableValuesOnly)
   .Page(pageController) // this applies the paging and returns on result effecting the current page
   .ObserveOn(schedulerProvider.MainThread)
    //ensure page parameters class knows which page we are on
   .Do(changes => _pageParameters.Update(changes.Response))
   .Bind(_data)     // update observable collection bindings
   .DisposeMany()   //dispose when no longer required
   .Subscribe();

In one line of code the data has been transformed, filtered, sorted and the current page is bound and reflected in the observable collection. And as if by magic the observable collection will self-maintain when any of the controller parameters change or when any of the data changes. At any time the parameters of the controllers can be changed to dynamically change the results of the current page.

The result with this small segment of code is that by applying the page operator we have significantly reduced the number of records bound to the grid and therefore reduced the work load on the main thread.

Hooray, let’s open the champagne. Almost that time, but not quite. We have not set the controller parameters yet nor have we created anything means for the user to changing the page, sort or apply a filter. For the user to enter these values I have created a couple supporting objects which are explained below.

Apply Page Changes

PageParameterData.cs is the class containing the latest page details as well as commands to move to the next and previous page. The commands are bound to the skip previous and next buttons and when these are pressed the page number property changes. This fires a notification which is observed using some simple Rx.

We observe the size and current page properties as followings.

//observe size and current page
var currentPageChanged = PageParameters.ObservePropertyValue(p => p.CurrentPage).Select(prop => prop.Value);
var pageSizeChanged = PageParameters.ObservePropertyValue(p => p.PageSize).Select(prop => prop.Value);

//combine values, create request object and change the controller.
var pageChanger = currentPageChanged.CombineLatest(pageSizeChanged,
                           (page, size) => new PageRequest(page, size))
                           .DistinctUntilChanged()
                           .Sample(TimeSpan.FromMilliseconds(100))
                           .Subscribe(pageController.Change);

The latest values of each are combined into a new PageRequest and the page controller is updated to this value. This reapplies the page logic producing a next page response which includes the next page of data.

Apply Filtering

As with several example screens in the dynamic data menu we have the SearchText property on the main view model. We observe changes, build a predicate and update the filter controller.

  var filterApplier = this.ObservePropertyValue(t => t.SearchText)
                .Throttle(TimeSpan.FromMilliseconds(250))
                .Select(propargs => BuildFilter(propargs.Value))
                .Subscribe(filterController.Change);

where the build filter function is as follows

private Func<Trade, bool> BuildFilter(string searchText)
 {
     if (string.IsNullOrEmpty(searchText)) return trade => true;
     return t => t.CurrencyPair.Contains(searchText, StringComparison.OrdinalIgnoreCase) 
                          || t.Customer.Contains(searchText, StringComparison.OrdinalIgnoreCase);
 }

Apply Sorting

SortParameterData.cs is the view model to bind the sorting data. The following code observes the selected item and applies the selected comparer to the new sort controller.

  var sortChange = SortParameters.ObservePropertyValue(t => t.SelectedItem).Select(prop=>prop.Value.Comparer)
          .ObserveOn(schedulerProvider.TaskPool)
          //Change the sort controller
          .Subscribe(sortContoller.Change);

Summary

This code is surprisingly easy with the main view model having about 100 lines of code. You probably would not believe if I said I wrote all of it in under 3 hours. Admittedly I have the infrastructure for the page changing and the sorting from another project but nonetheless I can assure you that when you are up to speed with dynamic data, you will regard the manipulation of collections of data very easy indeed.

Next time I will be doing something similar yet simpler by showing how dynamic data can virtualise data.

Getting Started

Although I have been blogging about dynamic data I have so far omitted any specific documentation. This is because dynamic data is functionally very rich and hence there is such a huge amount to document. Frankly with over 50 operators to explain I have been daunted. But at last I am on the case, so this is the beginning.

The documents live on Dynamic data documents on GitHub and for the next month or two I will keep updating these.

The core concept

It is perhaps easiest to think of dynamic data as reactive extensions (rx) for collections but more accurately dynamic data is a bunch of rx operators based on the concept of an observable change set. The change set notifies listeners of any changes to an underlying source and has the following signature.

IObservable<IChangeSet<TObject,TKey>> myFirstObservableChangeSet;

where IChangeSet represents a set of adds, updates, removes and moves (for sort dependent operators). Each observer receives the changes, applies some logic and in turn notifies it’s own changeset. In this way complex chains of operators can easily be chained together.

The only constraint of dynamic data is an object needs to have a key specified. This was a design choice right from the beginning as the internals of dynamic data need to identify any object and be able to look it up quickly and efficiently.

Creating an observable change set

To open up the world of dynamic data to any object, we need to feed the data into some mechanism which produces the observable change set. Unless you are creating a custom operator then there is no need to directly create one as there are several out of the box means of doing so.

The easiest way is to feed directly into dynamic data from an standard rx observable.

IObservable<T> myObservable;
IObservable<IEnumerable<T>> myObservable;
// Use the hashcode for the key
var mydynamicdatasource = myObservable.ToObservableChangeSet();
// or specify a key like this
var mydynamicdatasource = myObservable.ToObservableChangeSet(t=> t.key);

The problem with the above is the collection will grow forever so there are overloads to specify size limitation or expiry times (not shown).

To have much more control over the root collection then we need an in-memory data store which has the requisite add, update and remove methods. Like the above the cache can be created with or without specifying a key

// Use the hash code for the key
var mycache  = new SourceCache<TObject>();
// or specify a key like this
var mycache  = new SourceCache<TObject,TKey>(t => t.Key);

The cache produces an observable change set via it’s connect methods.

var oberverableChangeSet = mycache.Connect();

Another way is to directly from an observable collection, you can do this

var myobservablecollection= new ObservableCollection<T>();
// Use the hashcode for the key
var mydynamicdatasource = myobservablecollection.ToObservableChangeSet();
// or specify a key like this
var mydynamicdatasource = myobservablecollection.ToObservableChangeSet(t => t.Key);

This method is only recommended for simple queries which act only on the UI thread as ObservableCollection is not thread safe.

One other point worth making here is any observable change set can be converted into a cache.

var mycache = somedynamicdatasource.AsObservableCache();

This cache has the same connection methods as a source cache but is read only.

Examples

Now you know how to create the source observable, here are some few quick fire examples. But first, what is the expected behaviour or any standard conventions? Simple answer to that one.

  1. All operators must comply with the Rx guidelines.
  2. When an observer subscribes the initial items of the underlying source always form the first batch of changes.
  3. Empty change sets should never be fired.

In all of these examples the resulting sequences always exactly reflect the items is the cache. This is where the power of add, update and removes comes into it’s own as all the operations are maintained with no consumer based plumbing.

Example 1: filters a stream of live trades, creates a proxy for each trade and orders the result by most recent first. As the source is modified the observable collection will automatically reflect changes.

//Dynamic data has it's own take on an observable collection (optimised for populating from dynamic data observables)
var list = new ObservableCollectionExtended<TradeProxy>();
var myoperation = somedynamicdatasource
                    .Filter(trade=>trade.Status == TradeStatus.Live) 
                    .Transform(trade => new TradeProxy(trade))
                    .Sort(SortExpressionComparer<TradeProxy>.Descending(t => t.Timestamp))
                    .ObserveOnDispatcher()
                    .Bind(list) 
                    .DisposeMany()
                    .Subscribe()

Oh and I forgot to say, TradeProxy is disposable and DisposeMany() ensures items are disposed when no longer part of the stream.

Example 2: for filters which can be dynamically changed, we can use a filter controller

var filtercontroller = new FilterController<Trade>()
var myoperation = somedynamicdatasource.Filter(filtercontroller) 

//can invoke a filter change any time
filtercontroller.Change(trade=>//return some predicate);

Example 3: produces a stream which is grouped by status. If an item changes status it will be moved to the new group and when a group has no items the group will automatically be removed.

var myoperation = somedynamicdatasource
                    .Group(trade=>trade.Status) //This is NOT Rx's GroupBy 

Example 4: Suppose I am editing some trades and I have an observable on each trades which validates but I want to know when all items are valid then this will do the job.

IObservable<bool> allValid = somedynamicdatasource
                    .TrueForAll(trade => trade.IsValidObservable, (trade, isvalid) => isvalid)

This operator flattens the observables and returns the combined state in one line of code. I love it.

Example 5: will wire and un-wire items from the observable when they are added, updated or removed from the source.

var myoperation = somedynamicdatasource.Connect() 
                .MergeMany(trade=> trade.ObservePropertyChanged(t=>t.Amount))
                .Subscribe(ObservableOfAmountChangedForAllItems=>//do something with IObservable<PropChangedArg>)

Example 6: Produces a distinct change set of currency pairs

var currencyPairs= somedynamicdatasource
                    .DistinctValues(trade => trade.CurrencyPair)

The above examples are just a brief glimpse of what dynamic data can do. It will all be documented in time.

Want to know more?

There is so much more which will be documented but if you want to find out more I suggest:
Download the WPF trading example and go through the example screens
or try it out directly Dynamic data on Nuget

Integration with ReactiveUI

I have released DynamicData.ReactiveUI which is a very simple adaptor layer to assist with binding dynamic data observables with reactiveui’s ReactiveList object.

Install from Nuget

Nuget

If you are not familiar with dynamic data and what it enables I suggest:

  1. Go through previous posts in the blog
  2. Download the example wpf app from github
  3. .

To cut to the chase, here’s an example. The code takes existing in-memory trade objects, transforms them into a view model proxy and updates the target reactive list object with the resulting change sets. It also pre-filters the data with live trades, applies a user entered filter, orders the resulting data and disposes the proxy when no longer required. Phew – all that in effectively one line of code.

I did the same example without using Reactive UI code see Trading Example Part 3. Integrate with UI where I have explained the code in greater detail

    public class RxUiViewer : ReactiveObject, IDisposable
    {
       //this is the target list which we will populate from the dynamic data stream
        private readonly ReactiveList<TradeProxy> _data = new ReactiveList<TradeProxy>();
        //the filter controller is used to inject filtering into a observable
        private readonly FilterController<Trade> _filter = new FilterController<Trade>();
        private readonly IDisposable _cleanUp;
        private string _searchText;

        public RxUiViewer(ITradeService tradeService)
        {
            //Change the filter when the user entered search text changes
            var filterApplier = this.WhenAnyValue(x => x.SearchText)
                .Throttle(TimeSpan.FromMilliseconds(250))
                .Subscribe(_ => ApplyFilter());
           
            ApplyFilter();

            var loader = tradeService.Trades
                .Connect(trade => trade.Status == TradeStatus.Live) //prefilter live trades only
                .Filter(_filter)    // apply user filter
                   //if targetting Net4 or Net45 platform can use parallelisation for transforms 'cause it's quicker
                .Transform(trade => new TradeProxy(trade), new ParallelisationOptions(ParallelType.Ordered, 5))
                .Sort(SortExpressionComparer<TradeProxy>.Descending(t => t.Timestamp), SortOptimisations.ComparesImmutableValuesOnly)
                .ObserveOn(RxApp.MainThreadScheduler)
                .Bind(_data)        //bind the results to the ReactiveList 
                .DisposeMany()      //since TradeProxy is disposable dispose when no longer required
                .Subscribe();

            _cleanUp = new CompositeDisposable(loader, _filter, filterApplier);
        }

        private void ApplyFilter()
        {
            if (string.IsNullOrEmpty(SearchText))
            {
                _filter.ChangeToIncludeAll();
            }
            else
            {
                _filter.Change(t => t.CurrencyPair.Contains(SearchText, StringComparison.OrdinalIgnoreCase) ||
                                    t.Customer.Contains(SearchText, StringComparison.OrdinalIgnoreCase));
            }
        }

        public string SearchText
        {
            get { return _searchText; }
            set { this.RaiseAndSetIfChanged(ref _searchText, value); }
        }

        public IReadOnlyReactiveList<TradeProxy> Data
        {
            get { return _data; }
        }

        public void Dispose()
        {
            _cleanUp.Dispose();
        }
    }

coupled with a little xaml can produce this

ReactiveUIImage

Very little code, but a very powerful example,

Gone Portable

Dynamic data is now a portable class library available on most platforms. See below.

Portable

Additionally there is a separate dotnet 4.0 library because I know there are enterprises out there stuck in the old days (investment banks maybe?).

Now you can do some cool rx for collections stuff of WP8, IOS and Android as well as windows desktop and server.

I will explain what the plinq assemblies enable in a post in the near future.