Observable Cache At A Glance

In my previous post I introduced the observable list which was released in version 4 of Dynamic Data.

For a working demonstration of dynamic data see Dynamic Trader.

Although Dynamic Data has always had an observable cache I have never posted an ‘at a glance’ offering so I have put together this post for completeness. The observable cache is the big brother of the observable list. It has existed for years, is very well tried and tested and a lot of trouble has been taking to ensure good performance can be easily achieved. This document briefly outlines the usage and behaviour of the observable cache.

In my next post I will examine the differences between the observable cache and observable list and which circumstances should one be favoured over the other. I will also examine how performance can be fine tuned.

Create and maintain a cache

Create an observable cache like this:

var myCache = new SourceCache<TObject,TKey>(t => key);

This can be amending calling the the edit methods. For example:

myCache.Clear();
myCache.AddOrUpdate(myItems);

The Clear and AddOrUpdate methods above will each produce a distinct change notification. In order to increase efficiency when making multiple amendments, the cache provides a means of batch editing. This is achieved using the .BatchUpdate method which ensures only a single change notification is produced.

myCache.BatchUpdate(innerCache =>
              {
                  innerCache.Clear();
                  innerCache.AddOrUpdate(myItems);
              });

If myCache is to be exposed publicly it can be made read only using .AsObservableCache

IObservableCache<TObject,TKey> readonlyCache = myCache.AsObservableCache();

which hides the edit methods.

The cache is observed by calling myCache.Connect() like this:

IObservable<IChangeSet<TObject,TKey>> myCacheObservable = myCache.Connect();

This creates an observable change set for which there are dozens of operators. The changes are transmitted as an Rx observable, so they are fluent and composable.

Observing dynamic data

Dynamic Data is based on the concept of creating and manipulating observable change sets. An observable change set is a collection of changes which have been applied to the underlying data source. The change set is always preceded with any data which is already in the underlying collection.

The primary method of creating observable change sets is to connect to an instances of the cache. There are alternative methods to produce observable change sets however, depending on the data source.

Connect to a Cache

The most common means of creating an observable change set is by calling Connect on an observable cache

var myObservableChangeSet = myDynamicDataSource.Connect();

The observable change set always begins with any items which are in the cache when the subscription is made.

Create an Observable Change Set from an Rx Observable

Given either of the following Rx observables:

IObservable<T> myObservable;
IObservable<IEnumerable<T>> myObservable;

an observable change set can be created like by calling .ToObservableChangeSet like this:

var myObservableChangeSet = myObservable.ToObservableChangeSet(t=> t.key);

Chain together operators

Since dynamic data is based on the IObservable interface and each operators returns an observable change set, operators can be chained together and flexibly composed.

For example if you have an observable cache containing trades you can sequentially apply multiple operators,

var myFluentOperation = myTrades
                    .Filter(trade=>trade.Status == TradeStatus.Live) 
                    .Transform(trade => new TradeProxy(trade))
                    .Sort(SortExpressionComparer<TradeProxy>.Descending(t => t.Timestamp))
                    .Subscribe(changes=>//do something with the result...);  

The result is a true observable. When values are added, replaced or removed from myTrades, the result will always reflect these changes.

Create a Derived Cache

Any observable change set can be converted into an observable cache by calling the AsObservableCache method

var readOnlyCache = myTrades
            .Filter(trade=>trade.Status == TradeStatus.Live) 
            .AsObservableCache();

which produces a self-maintaining cache of live trades.

Operator overview

Dynamic data is built on top of Rx but since Rx does not deal with collection changes, there are specific operators which account for adds, updates and removes for a large number of operations. There are detailed in the following sections.

Restriction Operators

These operators reduce the amount of changes and hence underlying data when applied.

OPERATOR DESCRIPTION
ExpireAfter Time based expiry for a cache
Filter The semantic equivalent of Rx.Where. There is an overload to allow the predicate to be changed
IgnoreUpdateWhen Ignores an update when the condition is met.
IncludeUpdateWhen Only includes the update when the condition is met.
LimitSizeTo Limits the size of the source cache to the specified limit, providing a notification when an item is removed due to the size limit being exceeded
NotEmpty Prevent empty changesets. By default this is applied by dynamic data operators so if only required for custom operators
Page Applies paging to the the underlying data source. Parameters are provided and changed by providing an observable of page requests
SkipInitial Defer the subscription until there is underlying data has loaded and skip first set of changes
Top Limits the size of the result set to the specified number of items
Virtualise Resulting changes only includes only a virtualised subset of the underlying data. Parameters are provided and changed by providing an observable of virtual requests
WhereReasonsAre Includes changes for the specified reasons only
WhereReasonsAreNot Excludes changes for the specified reasons

Transformative Operators

OPERATOR DESCRIPTION
AsObservableCache Converts an observable change set into a read only observable cache / hides the edit methods of the source cache.
Convert Light weight conversion / casting of a type as specified by a value selector
DistinctValues Return a changeset of distinct values as specified by a value selector
Group Semantic equivalent of Rx.GroupBy operator
QueryWhenChanged The latest copy of the underlying data is exposed for querying i) after each modification to the underlying data ii) upon subscription. The method accepts a result selector which accepts allows any resulting observable to be returned
RemoveKey Removes the key from each change. This changes the change set from a cache change set to a list changeset
Sort Allow sort operators using a specified comparer. There is an overload to allow the sort to be changed
ToCollection Converts the changeset into a fully formed collection. Each change in the source results in a new read only collection
Transform The semantic equivalent of Rx.Select
TransformMany Equivalent to a select many transform
TransformSafe The semantic equivalent of Rx.Select. Provides and overload to handle any errors encountered when converting an item
TransformToTree Transforms the object to a fully recursive tree, create a hiearchy based on the pivot function

Join Operators

The following operators apply boolean logic logic between two or more observable change sets.

OPERATOR DESCRIPTION
And Applies a logical And between the underlying data in two or more observable change sets
Or Applies a logical Or between the underlying data in two or more observable change sets
Xor Applies a logical Xor between the underlying data in two or more observable change sets
Except Takes the results of the first observable change set and ignore any items produced by one or more other change sets

Aggregation Operators

The aggregation operators are also new to Dynamic Data version 4. They produce light weight continuous aggregation of the underlying observable change set.

OPERATOR DESCRIPTION
Count Counts the number of items in the underlying data
Sum The sum of values matching the specified value selector
Avg The average value matching the specified value selector
Maximum The maximum value matching the specified value selector
Minimum The minimum value matching the specified value selector
StdDev The standard deviation of all the values matching the specified value selector

Inspection of Items Operators

These operators concentrate on applying logic to individual items within the change set.

OPERATOR DESCRIPTION
DisposeMany Dispose each item when removed from the stream. Also disposes all items when the stream is completed.
ForEachChange Provides a callback for each individual item change
MergeMany Dynamically merges the observable which is selected from each item in the stream, and unmerges the item when it is no longer part of the stream.
OnItemRemoved Callback for each item as and when it is being removed from the stream. This is not the same as restricting using WhereReasonsAre as an item is also considered as being removed when a subscribed stream has been disposed
SubscribeMany Subscribes to each item when it is added to the stream and unsubcribes when it is removed. All items will be unsubscribed when the stream is disposed
TrueForAny Produces a boolean observable indicating whether the resulting value of whether any of the values from the specified observable matches the equality condition
TrueForAll Produces a boolean observable indicating whether the resulting value of whether all of the values from the specified observable matches the equality condition
Watch Returns an observable of any changes for a single item which matches a specified key
WatchValue Overload of watch which returns the changed value only
WhenAnyChanged Property changed observation of each item in the underlying collection which provides a notification when any item has changed. The notification includes the sender and value of the changed property
WhenAnyValueChanged Property changed observation of each item in the underlying collection which provides a notification when any item has changed. The notification only includes the value of the changed property

Additional Operators

OPERATOR DESCRIPTION
Adapt Injects side effects using IChangeSetAdaptor interface
Bind Ensures a specified observable collection reflects the underling data
Batch This is a short cut to applying a standard Rx time based buffer followed by FlattenBufferResult (see below)
BufferIf Conditional buffer which accepts a boolean observable to turn buffering on or off
ChangeKey Changes the unique item key
Clone Populate a dictionary instance with the latest changes
DeferUntilLoaded Delay subscription until the source is populated with data
FlattenBufferResult A changeset is already a collection and therefore becomes a nested collection when Rx buffer operatons are applied. Use this to flatten the result back to a single change set
PopulateInto Populates a source cache instance with the latest changes
RefCount Change set equivalent to Publish().RefCount(). The source is cached so long as there is at least 1 subscriber

Observable List At A Glance

Dynamic data v4 has been taken out of beta and officially released.

For source code see Dynamic Data on GitHub
For install see Dynamic Data On Nuget

Dynamic data has always had an observable cache and now after a busy 4 months development and 188 commits there is an observable list. I also could not resist throwing in some aggregation operations. These new features are outlined in this post.

Getting Started

Create an observable list like this:

var myInts= new SourceList<int>();

There are direct edit methods, for example

myInts.AddRange(Enumerable.Range(0, 10000)); 
myInts.Add(99999); 
myInts.Remove(99999);

Each amend operation will produced a change notification. A much more efficient option is to batch edit which produces a single notification.

myInts.Edit(innerList =>
{
   innerList.Clear();
   innerList.AddRange(Enumerable.Range(0, 10000));
});

If SourceList is to be exposed publicly it can be made read only

IObservableList<int> readonlyInts = myInts.AsObservableList();

which hides the edit methods.

The list changes can be observed by calling myInts.Connect(). This creates an observable change set for which there are dozens of list specific operators. The changes are transmitted as an Rx observable so are fluent and composable.

Chain together operators

The following filters the observable list to include only odd numbers and applies a sum operation.

int total = 0;
var sumOfOddNumbers = myInts.Connect()
.Filter(i => i%2 == 1)
.Sum(i => i)
.Subscribe(result => total = result);

The result is a true observable. When values are added, replaced or removed from myInts , the result will always reflect these changes.

Create a derived list

Any observable change set can be converted into an observable list so for example


var myOddNumberList = myInts.Connect()
.Filter(i => i%2 == 1)
.AsObservableList();

produces a read only observable list of odd numbers.

Dynamic data compliments Rx but since Rx does not deal with collections I have created many list specific operators to account for adds, updates, replaces and moves. There are detail below.

Observable List Operators (35 operators and counting)

The following table is a brief description of all the observable operators which can be applied to the observable list. I will use to as a basic of further detailed documentation. This will take some time but will be done in near future.

Operator Description
Adapt Injects side effects using IChangeSetAdaptor interface.
AsObservableList Converts an observable change set into a read only observable list / hides the edit methods of a source list.
Bind Ensures a specified observable collection reflects the underling data
BufferIf Conditional buffer which accepts a boolean observable to turn buffering on or off
Clone Populate an IList instance with the latest changes.
Convert Light weight conversion / casting of a type as specified by a value selector
DeferUntilLoaded Delay subscription until the source is populated with data.
DisposeMany Dispose each item when removed from the stream. Also disposes all items when the stream is completed.
DistinctValues Return a changeset of distinct values as specified by a value selector
ExpireAfter Time based expiry for a source list
Filter The semantic equivalent of Rx.Where. There is an overload to allow the predicate to be changed
FlattenBufferResult A changeset is already a collection and therefore becomes a nested collection when Rx buffer operatons are applied. Use this to flatten the result back to a single change set
GroupOn Semantic equivalent of Rx.GroupBy operator
LimitSizeTo Limits the size of the source cache to the specified limit, providing a notification when an item is removed due to the size limit being exceeded
MergeMany Dynamically merges the observable which is selected from each item in the stream, and unmerges the item when it is no longer part of the stream.
NotEmpty Prevents a zero count change set notification
OnItemRemoved Callback for each item as and when it is being removed from the stream
Page Applies paging to the the data source
PopulateInto Populates an ISourceList instance with the latest changes
QueryWhenChanged The latest copy of the underlying data is exposed for querying i) after each modification to the underlying data ii) upon subscription
RefCount Change set equivalent to Publish().RefCount(). The source is cached so long as there is at least 1 subscriber.
SkipInitial Defer the subscription until there is underlying data loaded and skip first set of changes
Sort Allow sort operators using a specified comparer. There is an overload to allow the sort to be changed
SubscribeMany Subscribes to each item when it is added to the stream and unsubcribes when it is removed. All items will be unsubscribed when the stream is disposed
ToCollection Converts the changeset into a fully formed collection. Each change in the source results in a new read only collection
Top Limits the size of the result set to the specified number of items
Transform The semantic equivalent of Rx.Select
TransformMany Equivalent to a select many transform
Virtualise Virtualises the data. Parameters are provided and changed using a virtualising controller
WhenAnyChanged Property changed observation of each item in the underlying collection which provides a notification when any item has changed. The notification includes the sender and value of the changed property.
WhenAnyValueChanged Property changed observation of each item in the underlying collection which provides a notification when any item has changed. The notification only includes the value of the changed property.
WhereReasonsAre Includes changes for the specified reasons only
WhereReasonsAreNot Excludes changes for the specified reasons
WithKey Applies a key to each item. This changes the change set from a list change set to a cache changeset

This list has less operators than the observable cache but will be expanded in future releases.

Aggregation Operators

The following aggregation are also new to Dynamic Data version 4. The apply to both observable list and observable cache. With the exception of Count() which requires no parameter, the value to continually compute on is specified as a value selector Func valueSelector

Operator Syntax
Count Counts the number of items in the underlying data
Sum The sum of values matching the specified value selector
Avg The average value matching the specified value selector
Maximum The maximum value matching the specified value selector
Minimum The minimum value matching the specified value selector
StdDev The standard deviation of all the values matching the specified value selector

Reactive Tree using Dynamic Data

For this post I assume familiarity with Dynamic Data or at least familiarity with some of my previous posts, so for those of you who are not already familiar with dynamic data see What is dynamic data? for a brief overview.

Dynamic data has evolved over several years where each new operator has been created to overcome a necessity for a practical problem which I have been working on. It is so functionally rich I always assume that there is not a lot that it cannot do. This complacency was shaken when recently I was asked if it was possible to use Dynamic Data to convert a flat observable data stream to a fully hierarchical structure. I immediately replied yes of course without weighing up in advance what the solution could be.

I quickly identified the need for a new ‘convert a stream to a tree’ operator so I took the challenge to implement it. However the more I looked into the problem the bigger the challenge become and I started to doubt whether I could pull it off.

The complexity: Writing the operator

The contract of dynamic data guarantees any operator will reflect items which are added, updated or removed from its source collection. For this to hold true for a recursive hierarchy is an extreme challenge. For every item there has to be a node which has references to both the parent and child nodes, and these have to be maintained as changes are received. The big challenge is twofold:

  1. To ensure accurate reflection of underling data
  2. Ensure good performance

My first solution was to create a node which applied a filter to find its children and in turn each child also applied a filter to find its children. I quickly got this solution working as I was able to implement it using a combination of the existing dynamic data filter and transform operators. The code was simple but alas had poor performance as each node had to create a filter from the original cache.

My mind was melting trying to find a better solution, then I realised creating and maintaining a nested structure does not necessarily have to be so mind numbingly complex as I first suspected. The algorithm can be made easy as follows:

  1. For each item in the flat list create a cache of nodes.
  2. As each node in the cache changes look back into the cache to find the parent and add / remove from the parent accordingly
  3. The output is the cache of nodes filtered so that only nodes with no parent are included in the result

Part 1 and part 3 of this algorithm can be achieved with existing operators which left me to implement part 2 in a single iteration. This means there is absolutely no need for recursion within the operator which has led to good performance.

I have called this new operator TransformToTree. It has been implemented and released on Nuget as beta version.

Herein I will illustrate how the operator is used and then walk through a working example.

The Simplicity: Create an observable tree

The following object defines an employee which has a boss id who naturally is also an employee.

public class Employee
{
    public int Id {get;set;}
    public string Name {get;set;}
    public int BossId {get;set;}
}

To transform the employee into a fully recursive tree structure you first of all need a cache of employees

var employees = new SourceCache<Employee, int>(x => x.Id)

and now to transform this into a the tree structure use the new TransformToTree operator.

var myTree = employees.TransformToTree(employee => employee.BossId);

At that is that, we have an observable tree. The resulting observable is a deeply nested node structure representing an organisational hierarchy. As the cache of employees is maintained the nodes of the tree will completely self maintain.

Do this if you need to cache the tree.

var myTreeCache = myTree.AsObservableCache();

but perhaps a more obvious example would be to display the result on a tree control on a gui.

A WPF Example

Full source code for demo here

This example takes a flat hierarchy of employees, binds the nodes to a WPF TreeView and adds some buttons which change the underlying data in order to demonstrate that the tree reflects changes to the underlying data i.e. truly reactive.

Screenshot

I will not be illustrate the entire code base here as it would make this post too long. So instead I will outline what each part of the code does together with an extract of the core functionality.

Code file What does it do
MainWindow.xaml Xaml producing the view
EmployeesViewModel.cs The main view model for the example
EmployeeViewModel.cs Recursive view model for each node
EmployeeService.cs Data source for employees. Also provides methods to sack or promote employees

Most of the code in this example is boiler plate so below I will explain only the key parts of the code.

In EmployeesViewModel.cs the following code transforms the employee data into the tree structure then the second transform function takes the node which dynamic data has provided and transforms it into a xaml friendly employee view model.

var treeLoader = employeeService.Employees.Connect()
    //produce the nested tree observable
    .TransformToTree(employee => employee.BossId)
    //Transform each node into a view model
    .Transform(node => new EmployeeViewModel(node, Promote,Sack))
    .Bind(_employeeViewModels)
    .DisposeMany()
    .Subscribe();

Promote and Sack are methods which call into the employee service and change the underlying employee data. These are invoked by commands in the employee view model. I have included these actions purely to show that changing the underlying data source is reflected in the tree structure proving it is truly a reactive tree.

The employee view model is a full recursive view model. The example project loads 25,000 employees which implies there are 25,000 nodes in the tree. Clearly the tree view would struggle binding to such a large tree. To circumvent this problem the child view models are lazy loaded when the parent node is expanded. The following snippet shows how this is achieved.

public EmployeeViewModel(Node<EmployeeDto, int> node, Action promoteAction, Action sackAction, EmployeeViewModel parent = null)
{
    //.................................
    //Setting of backing fields not shown...

    //Wrap loader for the nested view model inside a lazy so we can control when it is invoked
    var childrenLoader = new Lazy(() => node.Children.Connect()
        .Transform(e => new EmployeeViewModel(e, promoteAction, sackAction,this))
        .Bind(Inferiors)
        .DisposeMany()
        .Subscribe());

    //return true when the children should be loaded
    //(i.e. if current node is a root, otherwise when the parent expands)
    var shouldExpand = node.IsRoot
         ? Observable.Return(true)
         : Parent.Value.ObservePropertyValue(This => This.IsExpanded).Value();

    //wire the observable
    var expander =shouldExpand
        .Where(isExpanded => isExpanded)
        .Take(1)
        .Subscribe(_ =>
        {
        //force lazy loading
        var x = childrenLoader.Value;
        });

    //Not all code show....
}

And that is about it. All that is left is create the xaml to bind to the a tree view. This is pretty standard xaml so I will proffer no further explanation here.

This operator and post were inspired by a question about whether dynamic data could create a hierarchy. Since then I have been asked whether I am going to expand dynamic data to include continuous aggregations and the answers is big YES. Watch this space.

Logical collection operators

I stated previously that my next post would be an illustration of how to use Dynamic Data to virtualise data (there is a Virtualise operator). Although powerful the example I had in mind just seemed too boring so I decided to put if off until I can think of a means of making the demo interesting. Instead I will unveil the logical collection operators together with what I hope you find to be a clear example.

Dynamic data can apply logical / set type operators across two or more dynamic data sources. Suppose we have the 2 dynamic data sources, sourceA and sourceB, we can apply the following logical collection operators:

Operator Syntax What gets included
Or sourceA.Or(sourceB) Items which in sourceA or sourceB
And sourceA.And(sourceB) Items which are in sourceA and sourceB
Except sourceA.Except(sourceB) Items which are in sourceA and not sourceB

The constraint is the left and right data source are of the same type. The power of dynamic data is when items get added, updated and removed in either sourceA or sourceB the result automatically updates to reflect this.

The truth is it is only occasionally that I have had the practical reason to apply the above operators but when I have I have been delighted that I took the trouble to program them in the first place as adding observers to two dynamic data sources and manually combining them to produce a third is tedious and boring code indeed.

Now for the example of when this can be useful.

Create a dynamic list of search hints

On several of the screens I have created for Dynamic Trader there is a search text box which is used to create and apply a predicate to the trades data source. For all the examples in the project it is produced using the following code.

return  trade => trade.CurrencyPair.Contains(searchText,stringComparison.OrdinalIgnoreCase) 
 ||  trade.Customer.Contains(searchText, StringComparison.OrdinalIgnoreCase);

This predicate applies to the currency pair and the customer fields so I think it would be good user experience to provide some hints to help the user understand what makes a valid search term. That’s why I have changed the text box to a combo to produce the following.

Combine 2 dynamic collections to form a list of hints

The combo box displays a list which is made up of the customers and currency pairs in the underlying dynamic data source. As the user types the list is filtered accordingly. Also as the underlying data changes, the resulting drop down list will also change accordingly. As with all examples in the demo project we have one in-memory data source which is accessed from theITradeService interface and for brevity is not show in the code below.

First we need a distinct dynamic data source of customers from the trades data source.

var customers = tradesDataSource.DistinctValues(trade => trade.Customer);

And to get a distinct dynamic data source of currency pairs from the same trades data source.

var customers = tradesDataSource.DistinctValues(trade => trade.CurrencyPair);

Both currency pair and customer are string fields so the two data sources can be combined like this

var combinedstrings = customers.Or(currencypairs)

And now we have single observable change set of currency pair and customer strings. Herein we do the standard dynamic data stuff to filter, sort and bind the result to the combo box.

//Filter the combined list according to user entered input and bind it to hints
var loader = combinedstrings
    .Filter(filter)     //filter strings using a filter controller according to user entered text
    .Sort(SortExpressionComparer<string>.Ascending(str=>str))
    .ObserveOn(schedulerProvider.MainThread)
    .Bind(_hints)       //bind to hints list
    .Subscribe();

I have skipped over a lot of detail such as filtering as I have described the process in many posts before and I did not want to obscure the crux of the example which is the Or operator. Additionally I also introduced the DistinctValues operator which I think from the above code is self explanatory.

With this example there is scope for one more powerful operator. If the result produced by the filter is large and being as the binding operation has to take place in the UI thread, you can use the Top operator to limit the number of items returned by the result set. It is applied after the sort operator.

    .Sort(SortExpressionComparer<string>.Ascending(str=>str))
    .Top(15) //limit result to a maximum of 15 items
    //....do binding

This will make the search combo responsive and non blocking even for filtering large lists. The whole example is very easy and produced in 70 lines of code. If you don’t believe me look at SearchHints.cs.

Summary

In the last post I promised to illustrate the Virtualise operator but decided against it for now, but in this short article I have shown 5 new operators which I hope will convince you that dynamic data can make life so much easier for the handling of asynchronous dynamic collections in any application.

Although the examples in this blog have mostly ended up in binding operations, I emphasise that dynamic data is for collections first and foremost whether on a mobile device, desktop or server. I have put results on the screen simply to visually show what is happening to the data. I have a few more app / binding samples in mind and after that I think I will write a purely logical example which involves no screen – perhaps the foundation of an algo-trading component.

Links to source code

Search hints object SearchHints.cs
View model of code using search hints LiveTradesViewer.cs
All examples here Dynamic trader demo on GitHub
Dynamic data Source code on Github

Dynamically Sort, Filter And Page Data

This is the first in a new series of blog where I will illustrate how dynamic data and reactive extensions can be used to improve the performance of WPF applications. Answering this question on stack overflow has prompted me to write what I plan to be a 3 part mini-series. In this part I will examine paging of in-memory data to reduce how much data a grid binds to and in subsequent posts I will illustrate virtualising data, and finally I will look at injecting behaviours into visible rows.

It has become my custom to start each post with an image then I explain how I got there. So here’s the image, it shows a screen which can filter, sort and page in-memory data.

Paging with dynamic data

If you cannot be bothered reading the remainder of this article, the demo and code can be studied via the following links

All the code is in the demo project is here Dynamic data demo project on GitHub
View model PagedDataViewer.cs
View PagedDataView.xaml
Dynamic data source code here Github

Why use paging

So the first question is why would you page data when I can simply bind to all of it? That’s a reasonable question and mostly I would say there is no need. However for large collections or collections which rapidly update the main thread can often block whilst the collection is updating. I have found this to be the case even with virtualization enabled in the xaml. This is because the observable collection can only be updated on the main thread which is clearly problematic as it blocks. Additionally the ListCollectionView may have to apply sort operations which are very expensive as the ListCollectionView has to linearly find the correct position of each item. As the collection gets larger the linear find and replace operations get slower and slower. I have found from bitter experience that binding to more than 10,000 rows in WPF can be problematic.

Actually I could go on for a while about performance bottlenecks in WPF both from the data and the xaml perspective but that is debate which we can have on another day.

There are of course many solutions to the problem but now I will concentrate of using dynamic data to create a paged view.

Create controllers to dynamically change observables

Dynamic data provides a load of extensions and some controllers to dynamically interrogate the data. For our screen we need a few controllers to dynamically filter, sort and page.

    var pageController = new PageController();  
    var filterController = new FilterController<T>(); 
    var sortController = new SortController<T>(); 

where the values can be changed like this

    //to change page
    pageController.Change(new PageRequest(1,100));
    //to change filter 
    filterController.Change(myobject=> //return a predicate);
    //to change sort
    sortController .Change( //return an IComparable<>);

Use these controller to build a filtered, sorted and paged dynamic data stream

As with all the example in this blog, the data is fed from a shared in-memory cache which is exposed through the ITradeService interface. The following code is only marginally different from code in previous posts.

    //this is an extension of observable collection optimised for dynamic data
    var collection = new ObservableCollectionExtended<TradeProxy>();

var loader = tradeService.All .Connect() 
   .Filter(filterController) // apply user filter
   .Transform(trade => new TradeProxy(trade), new ParallelisationOptions(ParallelType.Ordered, 5))
   .Sort(sortContoller, SortOptimisations.ComparesImmutableValuesOnly)
   .Page(pageController) // this applies the paging and returns on result effecting the current page
   .ObserveOn(schedulerProvider.MainThread)
    //ensure page parameters class knows which page we are on
   .Do(changes => _pageParameters.Update(changes.Response))
   .Bind(_data)     // update observable collection bindings
   .DisposeMany()   //dispose when no longer required
   .Subscribe();

In one line of code the data has been transformed, filtered, sorted and the current page is bound and reflected in the observable collection. And as if by magic the observable collection will self-maintain when any of the controller parameters change or when any of the data changes. At any time the parameters of the controllers can be changed to dynamically change the results of the current page.

The result with this small segment of code is that by applying the page operator we have significantly reduced the number of records bound to the grid and therefore reduced the work load on the main thread.

Hooray, let’s open the champagne. Almost that time, but not quite. We have not set the controller parameters yet nor have we created anything means for the user to changing the page, sort or apply a filter. For the user to enter these values I have created a couple supporting objects which are explained below.

Apply Page Changes

PageParameterData.cs is the class containing the latest page details as well as commands to move to the next and previous page. The commands are bound to the skip previous and next buttons and when these are pressed the page number property changes. This fires a notification which is observed using some simple Rx.

We observe the size and current page properties as followings.

//observe size and current page
var currentPageChanged = PageParameters.ObservePropertyValue(p => p.CurrentPage).Select(prop => prop.Value);
var pageSizeChanged = PageParameters.ObservePropertyValue(p => p.PageSize).Select(prop => prop.Value);

//combine values, create request object and change the controller.
var pageChanger = currentPageChanged.CombineLatest(pageSizeChanged,
                           (page, size) => new PageRequest(page, size))
                           .DistinctUntilChanged()
                           .Sample(TimeSpan.FromMilliseconds(100))
                           .Subscribe(pageController.Change);

The latest values of each are combined into a new PageRequest and the page controller is updated to this value. This reapplies the page logic producing a next page response which includes the next page of data.

Apply Filtering

As with several example screens in the dynamic data menu we have the SearchText property on the main view model. We observe changes, build a predicate and update the filter controller.

  var filterApplier = this.ObservePropertyValue(t => t.SearchText)
                .Throttle(TimeSpan.FromMilliseconds(250))
                .Select(propargs => BuildFilter(propargs.Value))
                .Subscribe(filterController.Change);

where the build filter function is as follows

private Func<Trade, bool> BuildFilter(string searchText)
 {
     if (string.IsNullOrEmpty(searchText)) return trade => true;
     return t => t.CurrencyPair.Contains(searchText, StringComparison.OrdinalIgnoreCase) 
                          || t.Customer.Contains(searchText, StringComparison.OrdinalIgnoreCase);
 }

Apply Sorting

SortParameterData.cs is the view model to bind the sorting data. The following code observes the selected item and applies the selected comparer to the new sort controller.

  var sortChange = SortParameters.ObservePropertyValue(t => t.SelectedItem).Select(prop=>prop.Value.Comparer)
          .ObserveOn(schedulerProvider.TaskPool)
          //Change the sort controller
          .Subscribe(sortContoller.Change);

Summary

This code is surprisingly easy with the main view model having about 100 lines of code. You probably would not believe if I said I wrote all of it in under 3 hours. Admittedly I have the infrastructure for the page changing and the sorting from another project but nonetheless I can assure you that when you are up to speed with dynamic data, you will regard the manipulation of collections of data very easy indeed.

Next time I will be doing something similar yet simpler by showing how dynamic data can virtualise data.

Trading Demo Part 5 – Aggregate Dynamic Data

This is part 5 of building a trading system series. The idea has been to take one source of rapidly changing trading data and demonstrate the range, power and simplicity of applying dynamic data operators to it. Throughout the series I have made the assumption that the reader is already familiar and comfortable with the absolutely brilliant reactive extensions framework. For those who are not I strongly recommend to check them out. There is a steep learning curve but I assure anyone that once you start thinking reactive you will never turn back. You will probably wonder how you ever programmed without it.

If you are new to dynamic data I suggest you first read this Getting Started and if you have not seen the previous parts they are

1. Expose the data as a service
2. Manage market data
3. Integration with the UI
4. Filter on calculated values

Also if you want to download the code which goes with this example, go to Trading Demo on GitHub and download and run in visual studio. Open the menu option labelled Trading Positions and you will see the following screen.

Aggregated Positions

This screen connects to the trade service data, groups by currency pair and aggregates the totals for each grouping of currency pairs. With dynamic data doing this becomes so simple with very little code involved as I am about to show.

The starting point is to group the trade data by currency pair. This is achieved using dynamic data’s Group operator.

//where tradeService is the trade service created in Part 1.
var mygroup = tradeService.Live.Connect()
                          .Group(trade => trade.CurrencyPair)

This does not look like much but the group operator is very powerful indeed. As with all things reactive, changes in the trade service are automatically propagated to the appropriate grouping. Dynamic Data extends the reactive framework by accounting for adds, updates and removes. So when trades are created, closed or amended the groups will automatically reflect these changes.

The grouping object creates a cache of each group and it’s signature is like this.

public interface IGroup<TObject, TKey, out TGroupKey>
{
    TGroupKey Key {get;}
    IObservableCache<TObject, TKey> Cache { get; }
}

Three generic types, a bit ugly I admit but we have a dynamic collection of caches, one for each item matching the group selector in the underlying data source. When grouping I almost always apply a transform function to do something with the group. In this example we want to aggregate totals for each group, so first I have created an object which accepts the grouping in it’s constructor and apply some dynamic aggregations on the cache. For this we need to use dynamic data’s Transform operator. This is akin to the standard reactive Select operator but as with several other operators in dynamic data I have to use an alternative semantic dues to ambiguous reference problems for extensions.

var transformedGroup = tradeService.Live.Connect()
                .Group(trade => trade.CurrencyPair)
                .Transform(group => new CurrencyPairPosition(group))

This means for each group we have a CurrencyPairPosition object which will be used to dynamically calculate the sum total of the buys and sells so that the overall position can be calculated. For the aggregation I have used another dynamic data operator QueryWhenChanged. The following exert illustrates it’s usage

//constructor only for illustration
public CurrencyPairPosition(IGroup<Trade, long, string> tradesByCurrencyPair)
{
    var aggregations = tradesByCurrencyPair.Cache.Connect()
                        .QueryWhenChanged(query => //query and return a result
}

For each change in the underlying cache the operator is invoked. The parameter exposes a querying API so the underling cache can be queried in a functional manner. The consumer must return a result which becomes an observable of whatever is returned. In this example we have used some standard linq to objects to calculate the buy and sell totals and return a new object which contains these values.

If we expand out the above code we easily create an observable of trade positions for each currency pair,

  var aggregations = tradesByCurrencyPair .Cache.Connect()
                       .QueryWhenChanged(query =>
                        {
                            var buy = query.Items
                                        .Where(trade => trade.BuyOrSell == BuyOrSell.Buy)
                                        .Sum(trade=>trade.Amount);
                            var sell = query.Items
                                        .Where(trade => trade.BuyOrSell == BuyOrSell.Sell)
                                        .Sum(trade => trade.Amount);
                            var count = query.Count;
                            return new TradesPosition(buy,sell,count);
                        });
                //subscribe to the result and set a property to the latest

We are at the point where we have a CurrencyPairPosition object which always reflects the latest position so the last things is to expand the consuming code so we can bind to the collection of these.

 //not all code show and _data is a derived Observable collection
  var subscriber = tradeService.Live.Connect()
                .Group(trade => trade.CurrencyPair)
                .Transform(group => new CurrencyPairPosition(group))
                .Sort(SortExpressionComparer<CurrencyPairPosition>.Ascending(t => t.CurrencyPair))
                .ObserveOn(schedulerProvider.MainThread)
                .Bind(_data)
                .DisposeMany()
                .Subscribe();

This will maintain the sorted order and ensure that CurrencyPairPosition objects are disposed when no longer required. Now a little xaml bound to the data property can produce the screen at the top of the page.

In summary, Dynamic Data makes the reactive management of collections of data very very easy indeed. If you look at the code list below, you will see just how little is involved. There are about 170 lines of code over three c# classes which also includes namespace and using declarations. And what’s even better is despite the underlying data rapidly moving, nowhere have I had to concern myself with thread safety and all the operators are completely thread-safe so long as all code functionally remains inside the operators.

The full code for all the objects is

1. PositionsViewer.cs (The view model)
2. CurrencyPairPosition.cs
3. TradesPosition.cs (the object which holds the result)
4. PositionsView.xaml (the screen)

Getting Started

Although I have been blogging about dynamic data I have so far omitted any specific documentation. This is because dynamic data is functionally very rich and hence there is such a huge amount to document. Frankly with over 50 operators to explain I have been daunted. But at last I am on the case, so this is the beginning.

The documents live on Dynamic data documents on GitHub and for the next month or two I will keep updating these.

The core concept

It is perhaps easiest to think of dynamic data as reactive extensions (rx) for collections but more accurately dynamic data is a bunch of rx operators based on the concept of an observable change set. The change set notifies listeners of any changes to an underlying source and has the following signature.

IObservable<IChangeSet<TObject,TKey>> myFirstObservableChangeSet;

where IChangeSet represents a set of adds, updates, removes and moves (for sort dependent operators). Each observer receives the changes, applies some logic and in turn notifies it’s own changeset. In this way complex chains of operators can easily be chained together.

The only constraint of dynamic data is an object needs to have a key specified. This was a design choice right from the beginning as the internals of dynamic data need to identify any object and be able to look it up quickly and efficiently.

Creating an observable change set

To open up the world of dynamic data to any object, we need to feed the data into some mechanism which produces the observable change set. Unless you are creating a custom operator then there is no need to directly create one as there are several out of the box means of doing so.

The easiest way is to feed directly into dynamic data from an standard rx observable.

IObservable<T> myObservable;
IObservable<IEnumerable<T>> myObservable;
// Use the hashcode for the key
var mydynamicdatasource = myObservable.ToObservableChangeSet();
// or specify a key like this
var mydynamicdatasource = myObservable.ToObservableChangeSet(t=> t.key);

The problem with the above is the collection will grow forever so there are overloads to specify size limitation or expiry times (not shown).

To have much more control over the root collection then we need an in-memory data store which has the requisite add, update and remove methods. Like the above the cache can be created with or without specifying a key

// Use the hash code for the key
var mycache  = new SourceCache<TObject>();
// or specify a key like this
var mycache  = new SourceCache<TObject,TKey>(t => t.Key);

The cache produces an observable change set via it’s connect methods.

var oberverableChangeSet = mycache.Connect();

Another way is to directly from an observable collection, you can do this

var myobservablecollection= new ObservableCollection<T>();
// Use the hashcode for the key
var mydynamicdatasource = myobservablecollection.ToObservableChangeSet();
// or specify a key like this
var mydynamicdatasource = myobservablecollection.ToObservableChangeSet(t => t.Key);

This method is only recommended for simple queries which act only on the UI thread as ObservableCollection is not thread safe.

One other point worth making here is any observable change set can be converted into a cache.

var mycache = somedynamicdatasource.AsObservableCache();

This cache has the same connection methods as a source cache but is read only.

Examples

Now you know how to create the source observable, here are some few quick fire examples. But first, what is the expected behaviour or any standard conventions? Simple answer to that one.

  1. All operators must comply with the Rx guidelines.
  2. When an observer subscribes the initial items of the underlying source always form the first batch of changes.
  3. Empty change sets should never be fired.

In all of these examples the resulting sequences always exactly reflect the items is the cache. This is where the power of add, update and removes comes into it’s own as all the operations are maintained with no consumer based plumbing.

Example 1: filters a stream of live trades, creates a proxy for each trade and orders the result by most recent first. As the source is modified the observable collection will automatically reflect changes.

//Dynamic data has it's own take on an observable collection (optimised for populating from dynamic data observables)
var list = new ObservableCollectionExtended<TradeProxy>();
var myoperation = somedynamicdatasource
                    .Filter(trade=>trade.Status == TradeStatus.Live) 
                    .Transform(trade => new TradeProxy(trade))
                    .Sort(SortExpressionComparer<TradeProxy>.Descending(t => t.Timestamp))
                    .ObserveOnDispatcher()
                    .Bind(list) 
                    .DisposeMany()
                    .Subscribe()

Oh and I forgot to say, TradeProxy is disposable and DisposeMany() ensures items are disposed when no longer part of the stream.

Example 2: for filters which can be dynamically changed, we can use a filter controller

var filtercontroller = new FilterController<Trade>()
var myoperation = somedynamicdatasource.Filter(filtercontroller) 

//can invoke a filter change any time
filtercontroller.Change(trade=>//return some predicate);

Example 3: produces a stream which is grouped by status. If an item changes status it will be moved to the new group and when a group has no items the group will automatically be removed.

var myoperation = somedynamicdatasource
                    .Group(trade=>trade.Status) //This is NOT Rx's GroupBy 

Example 4: Suppose I am editing some trades and I have an observable on each trades which validates but I want to know when all items are valid then this will do the job.

IObservable<bool> allValid = somedynamicdatasource
                    .TrueForAll(trade => trade.IsValidObservable, (trade, isvalid) => isvalid)

This operator flattens the observables and returns the combined state in one line of code. I love it.

Example 5: will wire and un-wire items from the observable when they are added, updated or removed from the source.

var myoperation = somedynamicdatasource.Connect() 
                .MergeMany(trade=> trade.ObservePropertyChanged(t=>t.Amount))
                .Subscribe(ObservableOfAmountChangedForAllItems=>//do something with IObservable<PropChangedArg>)

Example 6: Produces a distinct change set of currency pairs

var currencyPairs= somedynamicdatasource
                    .DistinctValues(trade => trade.CurrencyPair)

The above examples are just a brief glimpse of what dynamic data can do. It will all be documented in time.

Want to know more?

There is so much more which will be documented but if you want to find out more I suggest:
Download the WPF trading example and go through the example screens
or try it out directly Dynamic data on Nuget

Integration with ReactiveUI

I have released DynamicData.ReactiveUI which is a very simple adaptor layer to assist with binding dynamic data observables with reactiveui’s ReactiveList object.

Install from Nuget

Nuget

If you are not familiar with dynamic data and what it enables I suggest:

  1. Go through previous posts in the blog
  2. Download the example wpf app from github
  3. .

To cut to the chase, here’s an example. The code takes existing in-memory trade objects, transforms them into a view model proxy and updates the target reactive list object with the resulting change sets. It also pre-filters the data with live trades, applies a user entered filter, orders the resulting data and disposes the proxy when no longer required. Phew – all that in effectively one line of code.

I did the same example without using Reactive UI code see Trading Example Part 3. Integrate with UI where I have explained the code in greater detail

    public class RxUiViewer : ReactiveObject, IDisposable
    {
       //this is the target list which we will populate from the dynamic data stream
        private readonly ReactiveList<TradeProxy> _data = new ReactiveList<TradeProxy>();
        //the filter controller is used to inject filtering into a observable
        private readonly FilterController<Trade> _filter = new FilterController<Trade>();
        private readonly IDisposable _cleanUp;
        private string _searchText;

        public RxUiViewer(ITradeService tradeService)
        {
            //Change the filter when the user entered search text changes
            var filterApplier = this.WhenAnyValue(x => x.SearchText)
                .Throttle(TimeSpan.FromMilliseconds(250))
                .Subscribe(_ => ApplyFilter());
           
            ApplyFilter();

            var loader = tradeService.Trades
                .Connect(trade => trade.Status == TradeStatus.Live) //prefilter live trades only
                .Filter(_filter)    // apply user filter
                   //if targetting Net4 or Net45 platform can use parallelisation for transforms 'cause it's quicker
                .Transform(trade => new TradeProxy(trade), new ParallelisationOptions(ParallelType.Ordered, 5))
                .Sort(SortExpressionComparer<TradeProxy>.Descending(t => t.Timestamp), SortOptimisations.ComparesImmutableValuesOnly)
                .ObserveOn(RxApp.MainThreadScheduler)
                .Bind(_data)        //bind the results to the ReactiveList 
                .DisposeMany()      //since TradeProxy is disposable dispose when no longer required
                .Subscribe();

            _cleanUp = new CompositeDisposable(loader, _filter, filterApplier);
        }

        private void ApplyFilter()
        {
            if (string.IsNullOrEmpty(SearchText))
            {
                _filter.ChangeToIncludeAll();
            }
            else
            {
                _filter.Change(t => t.CurrencyPair.Contains(SearchText, StringComparison.OrdinalIgnoreCase) ||
                                    t.Customer.Contains(SearchText, StringComparison.OrdinalIgnoreCase));
            }
        }

        public string SearchText
        {
            get { return _searchText; }
            set { this.RaiseAndSetIfChanged(ref _searchText, value); }
        }

        public IReadOnlyReactiveList<TradeProxy> Data
        {
            get { return _data; }
        }

        public void Dispose()
        {
            _cleanUp.Dispose();
        }
    }

coupled with a little xaml can produce this

ReactiveUIImage

Very little code, but a very powerful example,

Gone Portable

Dynamic data is now a portable class library available on most platforms. See below.

Portable

Additionally there is a separate dotnet 4.0 library because I know there are enterprises out there stuck in the old days (investment banks maybe?).

Now you can do some cool rx for collections stuff of WP8, IOS and Android as well as windows desktop and server.

I will explain what the plinq assemblies enable in a post in the near future.

Trading Example Part 4. Filter on calculated values

Intrinsic to collection change notifications, items can be notified by add, update and remove events. If ordering is supported a collection will support a move notification. Of course Dynamic Data supports these. Yet something which is often overlooked is data or functions of data are sometimes by necessity mutable. How can a collection which notifies respond to mutable changes?

To deal with this scenario, dynamic data introduces the concept of an Evaluate notification. This notification forms part of a change set and tells a consumer an item needs to be re-assessed or re-evaluated. This may effect things like filtering or sorting of an item. I feel a concrete example best illustrates this concept. Suppose we have the following function:

Func<Trade,bool> isNearToMarketPrice = trade => return Math.Abs(trade.PercentFromMarket) <= 1 %

Where PercentFromMarket is a calculation which is recalculated with each and every market data tick. Using standard linq a list of near to market trades can be retrieved as follows.

var nearToMaketTrades = myListOfTrades.Where(isNearToMarketPrice);

The manifest problem with this query is it pull based only and has no means of observing when the market price has been recalculated, or alternatively how can re-evaluation be forced. Based on practical experience of dealing with this kind problem I introduced multiple means of injecting the evaluate command into a dynamic data stream. Here I will use a filter controller illustrated in a previous blog and we will apply it to the trade service here.

In summary any of the controllers in dynamic data are used to inject commands and meta data into a stream. For this example we need a dynamic filter which is applied to a stream of trades.

//create a filter controller and set it's filter
var filter = new FilterController<Trade>();
filter.Change(trade => Math.Abs(trade.PercentFromMarket) <= percentFromMarket());

//create a stream of live trades where the trade matches the above filter
var filteredByPercent  = myTradeService.Trades
                         .Connect(trade=>trade.Status==TradeStatus.Live)
                         .Synchronize(locker)
                         .Filter(filter)

The filter controller has an overload to force re-evaluation.

filter.Revalue() // to reevaluate all
// or
filter.Reevaluate(Func<T,bool> itemSelector) //to re-evaluate selected items

The only missing element is when do we invoke re-evalulation. We have 2 choices. Either the service which calculates market prices provides a notification of trades which have been re-calculated or we poll on a period. Option 1 would be suitable for algo trading where everything must happen preferably with zero latency but for simplicity in the example we will poll as follows.

//re-evaluate filter periodically
var reevaluator = Observable.Interval(TimeSpan.FromMilliseconds(250))
                         .Subscribe(_ => filter.Reevaluate());

And that is that. We have a live stream of trades, where closed trades are automatically filtered out from the source and the filter controller constantly re-applies to ensure only trades near to the market are included in the result.   And as ever what is beginning to become my catch phrase – that is easy!

After wrapping the function into a cold observable, here’s the final code.

public IObservable<IChangeSet<Trade, long>> Query(Func<uint> percentFromMarket)
{
    if (percentFromMarket == null) throw new ArgumentNullException("percentFromMarket");
    return Observable.Create<IChangeSet<Trade, long>>
	(observer =>
	 {
	     var locker = new object();
	     Func<Trade,bool> nearToMaketTrades = trade => Math.Abs(trade.PercentFromMarket) <= percentFromMarket();
	     var filter = new FilterController<Trade>(nearToMaketTrades);

	     //re-evaluate filter periodically
	     var reevaluator = Observable.Interval(TimeSpan.FromMilliseconds(250))
		 .Synchronize(locker)
		 .Subscribe(_ => filter.Reevaluate());

	     //filter on live trades matching % specified
	     var subscriber = _tradeService.Trades
		 .Connect(trade=>trade.Status==TradeStatus.Live)
		 .Synchronize(locker)
		 .Filter(filter)
		 .SubscribeSafe(observer);

	     return new CompositeDisposable(subscriber, filter, reevaluator);
	 });
}

This observable will form part of a future post where I want to build the foundation of an auto trading system.

But for now, in a few lines of code (see NearToMarketViewer.cs) we can put the data onto the screen.

Near to market