What is Dynamic Data?

Dynamic Data is a portable class library which brings the power of Reactive Extensions (Rx) to collections.

Mutable collections frequently experience additions, updates, and removals (among other changes). Dynamic Data provides two collection implementations, an observable list and an observable cache that expose changes to the collection via an observable change set. The resulting observable change sets can be manipulated and transformed using Dynamic Data’s robust and powerful array of change set operators. These operators receive change notifications, apply some logic, and subsequently provide their own change notifications. Because of this, operators are fully composable and can be chained together to perform powerful and very complicated operations while maintaining simple, fluent code.

Using Dynamic Data’s collections and change set operators make in-memory data management extremely easy and can reduce the size and complexity of your code base by abstracting complicated and often repetitive collection based operations.

An example please

Given a Trade object create an observable list like this

var myTrades = new SourceList<Trade>();

or a cache which requires that a unique key is specified

var myTrades = new SourceCache<Trade,long>(trade => trade.Id);

Either of these collections are made observable by calling the Connect() method which produces an observable change set.

var myObservableTrades = myTrades.Connect();

This example first filters a stream of trades to select only live trades, then creates a proxy for each live trade, and finally orders the results by most recent first. The resulting trade proxies are bound on the dispatcher thread to the specified observable collection.

ReadOnlyObservableCollection<TradeProxy> data;

var loader = myObservableTrades
    .Filter(trade => trade.Status == TradeStatus.Live) //filter on live trades only
    .Transform(trade => new TradeProxy(trade))         //create a proxy
    .Sort(SortExpressionComparer<TradeProxy>.Descending(t => t.Timestamp)) 
    .ObserveOnDispatcher()          //ensure operation is on the UI thread
    .Bind(out data)         //Populate the observable collection
    .DisposeMany()          //Dispose TradeProxy when no longer required
    .Subscribe();  

Since the TradeProxy object is disposable, the DisposeMany operator ensures that the proxy objects are disposed when they are no longer part of this observable stream.

This is the tip of the iceberg. The observable cache over 60 operators and the observable list has around 35 operators and counting.

Want to find out more?

Read the following for a quick overview

1. Observable Cache at a glance
2. Observable List at a glance
3. Getting Started
4. Binding example

Download the demo and source code

1. Demo WPF trading system demo on GitHub
2. Demo WPF reactive tree demo on GitHub
3. Dynamic Data source code on GitHub

Some funky UI specific stuff

1. Dynamically filter, sort and page
2. Reactive Tree
3. Advanced search hints with logical collection operators
4. Aggregation of dynamic data

Work through the ‘Creating a trading system’ series

1. Expose the data as a service
2. Manage market data
3. Integration with the UI
4. Filter on calculated values
5. Aggregation of dynamic data

Example integration with ReactiveUI

1. Integration with ReactiveUI for a simple example.
2. Log entry viewer for an advanced example.

Install from Nuget

1. Dynamic Data
or 2. Dynamic Data Adaptors for Reactive UI if you are already using ReactiveUI

Advertisements

Observable Cache At A Glance

In my previous post I introduced the observable list which was released in version 4 of Dynamic Data.

For a working demonstration of dynamic data see Dynamic Trader.

Although Dynamic Data has always had an observable cache I have never posted an ‘at a glance’ offering so I have put together this post for completeness. The observable cache is the big brother of the observable list. It has existed for years, is very well tried and tested and a lot of trouble has been taking to ensure good performance can be easily achieved. This document briefly outlines the usage and behaviour of the observable cache.

In my next post I will examine the differences between the observable cache and observable list and which circumstances should one be favoured over the other. I will also examine how performance can be fine tuned.

Create and maintain a cache

Create an observable cache like this:

var myCache = new SourceCache<TObject,TKey>(t => key);

This can be amending calling the the edit methods. For example:

myCache.Clear();
myCache.AddOrUpdate(myItems);

The Clear and AddOrUpdate methods above will each produce a distinct change notification. In order to increase efficiency when making multiple amendments, the cache provides a means of batch editing. This is achieved using the .BatchUpdate method which ensures only a single change notification is produced.

myCache.BatchUpdate(innerCache =>
              {
                  innerCache.Clear();
                  innerCache.AddOrUpdate(myItems);
              });

If myCache is to be exposed publicly it can be made read only using .AsObservableCache

IObservableCache<TObject,TKey> readonlyCache = myCache.AsObservableCache();

which hides the edit methods.

The cache is observed by calling myCache.Connect() like this:

IObservable<IChangeSet<TObject,TKey>> myCacheObservable = myCache.Connect();

This creates an observable change set for which there are dozens of operators. The changes are transmitted as an Rx observable, so they are fluent and composable.

Observing dynamic data

Dynamic Data is based on the concept of creating and manipulating observable change sets. An observable change set is a collection of changes which have been applied to the underlying data source. The change set is always preceded with any data which is already in the underlying collection.

The primary method of creating observable change sets is to connect to an instances of the cache. There are alternative methods to produce observable change sets however, depending on the data source.

Connect to a Cache

The most common means of creating an observable change set is by calling Connect on an observable cache

var myObservableChangeSet = myDynamicDataSource.Connect();

The observable change set always begins with any items which are in the cache when the subscription is made.

Create an Observable Change Set from an Rx Observable

Given either of the following Rx observables:

IObservable<T> myObservable;
IObservable<IEnumerable<T>> myObservable;

an observable change set can be created like by calling .ToObservableChangeSet like this:

var myObservableChangeSet = myObservable.ToObservableChangeSet(t=> t.key);

Chain together operators

Since dynamic data is based on the IObservable interface and each operators returns an observable change set, operators can be chained together and flexibly composed.

For example if you have an observable cache containing trades you can sequentially apply multiple operators,

var myFluentOperation = myTrades
                    .Filter(trade=>trade.Status == TradeStatus.Live) 
                    .Transform(trade => new TradeProxy(trade))
                    .Sort(SortExpressionComparer<TradeProxy>.Descending(t => t.Timestamp))
                    .Subscribe(changes=>//do something with the result...);  

The result is a true observable. When values are added, replaced or removed from myTrades, the result will always reflect these changes.

Create a Derived Cache

Any observable change set can be converted into an observable cache by calling the AsObservableCache method

var readOnlyCache = myTrades
            .Filter(trade=>trade.Status == TradeStatus.Live) 
            .AsObservableCache();

which produces a self-maintaining cache of live trades.

Operator overview

Dynamic data is built on top of Rx but since Rx does not deal with collection changes, there are specific operators which account for adds, updates and removes for a large number of operations. There are detailed in the following sections.

Restriction Operators

These operators reduce the amount of changes and hence underlying data when applied.

OPERATOR DESCRIPTION
ExpireAfter Time based expiry for a cache
Filter The semantic equivalent of Rx.Where. There is an overload to allow the predicate to be changed
IgnoreUpdateWhen Ignores an update when the condition is met.
IncludeUpdateWhen Only includes the update when the condition is met.
LimitSizeTo Limits the size of the source cache to the specified limit, providing a notification when an item is removed due to the size limit being exceeded
NotEmpty Prevent empty changesets. By default this is applied by dynamic data operators so if only required for custom operators
Page Applies paging to the the underlying data source. Parameters are provided and changed by providing an observable of page requests
SkipInitial Defer the subscription until there is underlying data has loaded and skip first set of changes
Top Limits the size of the result set to the specified number of items
Virtualise Resulting changes only includes only a virtualised subset of the underlying data. Parameters are provided and changed by providing an observable of virtual requests
WhereReasonsAre Includes changes for the specified reasons only
WhereReasonsAreNot Excludes changes for the specified reasons

Transformative Operators

OPERATOR DESCRIPTION
AsObservableCache Converts an observable change set into a read only observable cache / hides the edit methods of the source cache.
Convert Light weight conversion / casting of a type as specified by a value selector
DistinctValues Return a changeset of distinct values as specified by a value selector
Group Semantic equivalent of Rx.GroupBy operator
QueryWhenChanged The latest copy of the underlying data is exposed for querying i) after each modification to the underlying data ii) upon subscription. The method accepts a result selector which accepts allows any resulting observable to be returned
RemoveKey Removes the key from each change. This changes the change set from a cache change set to a list changeset
Sort Allow sort operators using a specified comparer. There is an overload to allow the sort to be changed
ToCollection Converts the changeset into a fully formed collection. Each change in the source results in a new read only collection
Transform The semantic equivalent of Rx.Select
TransformMany Equivalent to a select many transform
TransformSafe The semantic equivalent of Rx.Select. Provides and overload to handle any errors encountered when converting an item
TransformToTree Transforms the object to a fully recursive tree, create a hiearchy based on the pivot function

Join Operators

The following operators apply boolean logic logic between two or more observable change sets.

OPERATOR DESCRIPTION
And Applies a logical And between the underlying data in two or more observable change sets
Or Applies a logical Or between the underlying data in two or more observable change sets
Xor Applies a logical Xor between the underlying data in two or more observable change sets
Except Takes the results of the first observable change set and ignore any items produced by one or more other change sets

Aggregation Operators

The aggregation operators are also new to Dynamic Data version 4. They produce light weight continuous aggregation of the underlying observable change set.

OPERATOR DESCRIPTION
Count Counts the number of items in the underlying data
Sum The sum of values matching the specified value selector
Avg The average value matching the specified value selector
Maximum The maximum value matching the specified value selector
Minimum The minimum value matching the specified value selector
StdDev The standard deviation of all the values matching the specified value selector

Inspection of Items Operators

These operators concentrate on applying logic to individual items within the change set.

OPERATOR DESCRIPTION
DisposeMany Dispose each item when removed from the stream. Also disposes all items when the stream is completed.
ForEachChange Provides a callback for each individual item change
MergeMany Dynamically merges the observable which is selected from each item in the stream, and unmerges the item when it is no longer part of the stream.
OnItemRemoved Callback for each item as and when it is being removed from the stream. This is not the same as restricting using WhereReasonsAre as an item is also considered as being removed when a subscribed stream has been disposed
SubscribeMany Subscribes to each item when it is added to the stream and unsubcribes when it is removed. All items will be unsubscribed when the stream is disposed
TrueForAny Produces a boolean observable indicating whether the resulting value of whether any of the values from the specified observable matches the equality condition
TrueForAll Produces a boolean observable indicating whether the resulting value of whether all of the values from the specified observable matches the equality condition
Watch Returns an observable of any changes for a single item which matches a specified key
WatchValue Overload of watch which returns the changed value only
WhenAnyChanged Property changed observation of each item in the underlying collection which provides a notification when any item has changed. The notification includes the sender and value of the changed property
WhenAnyValueChanged Property changed observation of each item in the underlying collection which provides a notification when any item has changed. The notification only includes the value of the changed property

Additional Operators

OPERATOR DESCRIPTION
Adapt Injects side effects using IChangeSetAdaptor interface
Bind Ensures a specified observable collection reflects the underling data
Batch This is a short cut to applying a standard Rx time based buffer followed by FlattenBufferResult (see below)
BufferIf Conditional buffer which accepts a boolean observable to turn buffering on or off
ChangeKey Changes the unique item key
Clone Populate a dictionary instance with the latest changes
DeferUntilLoaded Delay subscription until the source is populated with data
FlattenBufferResult A changeset is already a collection and therefore becomes a nested collection when Rx buffer operatons are applied. Use this to flatten the result back to a single change set
PopulateInto Populates a source cache instance with the latest changes
RefCount Change set equivalent to Publish().RefCount(). The source is cached so long as there is at least 1 subscriber

Observable List At A Glance

Dynamic data v4 has been taken out of beta and officially released.

For source code see Dynamic Data on GitHub
For install see Dynamic Data On Nuget

Dynamic data has always had an observable cache and now after a busy 4 months development and 188 commits there is an observable list. I also could not resist throwing in some aggregation operations. These new features are outlined in this post.

Getting Started

Create an observable list like this:

var myInts= new SourceList<int>();

There are direct edit methods, for example

myInts.AddRange(Enumerable.Range(0, 10000)); 
myInts.Add(99999); 
myInts.Remove(99999);

Each amend operation will produced a change notification. A much more efficient option is to batch edit which produces a single notification.

myInts.Edit(innerList =>
{
   innerList.Clear();
   innerList.AddRange(Enumerable.Range(0, 10000));
});

If SourceList is to be exposed publicly it can be made read only

IObservableList<int> readonlyInts = myInts.AsObservableList();

which hides the edit methods.

The list changes can be observed by calling myInts.Connect(). This creates an observable change set for which there are dozens of list specific operators. The changes are transmitted as an Rx observable so are fluent and composable.

Chain together operators

The following filters the observable list to include only odd numbers and applies a sum operation.

int total = 0;
var sumOfOddNumbers = myInts.Connect()
.Filter(i => i%2 == 1)
.Sum(i => i)
.Subscribe(result => total = result);

The result is a true observable. When values are added, replaced or removed from myInts , the result will always reflect these changes.

Create a derived list

Any observable change set can be converted into an observable list so for example


var myOddNumberList = myInts.Connect()
.Filter(i => i%2 == 1)
.AsObservableList();

produces a read only observable list of odd numbers.

Dynamic data compliments Rx but since Rx does not deal with collections I have created many list specific operators to account for adds, updates, replaces and moves. There are detail below.

Observable List Operators (35 operators and counting)

The following table is a brief description of all the observable operators which can be applied to the observable list. I will use to as a basic of further detailed documentation. This will take some time but will be done in near future.

Operator Description
Adapt Injects side effects using IChangeSetAdaptor interface.
AsObservableList Converts an observable change set into a read only observable list / hides the edit methods of a source list.
Bind Ensures a specified observable collection reflects the underling data
BufferIf Conditional buffer which accepts a boolean observable to turn buffering on or off
Clone Populate an IList instance with the latest changes.
Convert Light weight conversion / casting of a type as specified by a value selector
DeferUntilLoaded Delay subscription until the source is populated with data.
DisposeMany Dispose each item when removed from the stream. Also disposes all items when the stream is completed.
DistinctValues Return a changeset of distinct values as specified by a value selector
ExpireAfter Time based expiry for a source list
Filter The semantic equivalent of Rx.Where. There is an overload to allow the predicate to be changed
FlattenBufferResult A changeset is already a collection and therefore becomes a nested collection when Rx buffer operatons are applied. Use this to flatten the result back to a single change set
GroupOn Semantic equivalent of Rx.GroupBy operator
LimitSizeTo Limits the size of the source cache to the specified limit, providing a notification when an item is removed due to the size limit being exceeded
MergeMany Dynamically merges the observable which is selected from each item in the stream, and unmerges the item when it is no longer part of the stream.
NotEmpty Prevents a zero count change set notification
OnItemRemoved Callback for each item as and when it is being removed from the stream
Page Applies paging to the the data source
PopulateInto Populates an ISourceList instance with the latest changes
QueryWhenChanged The latest copy of the underlying data is exposed for querying i) after each modification to the underlying data ii) upon subscription
RefCount Change set equivalent to Publish().RefCount(). The source is cached so long as there is at least 1 subscriber.
SkipInitial Defer the subscription until there is underlying data loaded and skip first set of changes
Sort Allow sort operators using a specified comparer. There is an overload to allow the sort to be changed
SubscribeMany Subscribes to each item when it is added to the stream and unsubcribes when it is removed. All items will be unsubscribed when the stream is disposed
ToCollection Converts the changeset into a fully formed collection. Each change in the source results in a new read only collection
Top Limits the size of the result set to the specified number of items
Transform The semantic equivalent of Rx.Select
TransformMany Equivalent to a select many transform
Virtualise Virtualises the data. Parameters are provided and changed using a virtualising controller
WhenAnyChanged Property changed observation of each item in the underlying collection which provides a notification when any item has changed. The notification includes the sender and value of the changed property.
WhenAnyValueChanged Property changed observation of each item in the underlying collection which provides a notification when any item has changed. The notification only includes the value of the changed property.
WhereReasonsAre Includes changes for the specified reasons only
WhereReasonsAreNot Excludes changes for the specified reasons
WithKey Applies a key to each item. This changes the change set from a list change set to a cache changeset

This list has less operators than the observable cache but will be expanded in future releases.

Aggregation Operators

The following aggregation are also new to Dynamic Data version 4. The apply to both observable list and observable cache. With the exception of Count() which requires no parameter, the value to continually compute on is specified as a value selector Func valueSelector

Operator Syntax
Count Counts the number of items in the underlying data
Sum The sum of values matching the specified value selector
Avg The average value matching the specified value selector
Maximum The maximum value matching the specified value selector
Minimum The minimum value matching the specified value selector
StdDev The standard deviation of all the values matching the specified value selector

Reactive Tree using Dynamic Data

For this post I assume familiarity with Dynamic Data or at least familiarity with some of my previous posts, so for those of you who are not already familiar with dynamic data see What is dynamic data? for a brief overview.

Dynamic data has evolved over several years where each new operator has been created to overcome a necessity for a practical problem which I have been working on. It is so functionally rich I always assume that there is not a lot that it cannot do. This complacency was shaken when recently I was asked if it was possible to use Dynamic Data to convert a flat observable data stream to a fully hierarchical structure. I immediately replied yes of course without weighing up in advance what the solution could be.

I quickly identified the need for a new ‘convert a stream to a tree’ operator so I took the challenge to implement it. However the more I looked into the problem the bigger the challenge become and I started to doubt whether I could pull it off.

The complexity: Writing the operator

The contract of dynamic data guarantees any operator will reflect items which are added, updated or removed from its source collection. For this to hold true for a recursive hierarchy is an extreme challenge. For every item there has to be a node which has references to both the parent and child nodes, and these have to be maintained as changes are received. The big challenge is twofold:

  1. To ensure accurate reflection of underling data
  2. Ensure good performance

My first solution was to create a node which applied a filter to find its children and in turn each child also applied a filter to find its children. I quickly got this solution working as I was able to implement it using a combination of the existing dynamic data filter and transform operators. The code was simple but alas had poor performance as each node had to create a filter from the original cache.

My mind was melting trying to find a better solution, then I realised creating and maintaining a nested structure does not necessarily have to be so mind numbingly complex as I first suspected. The algorithm can be made easy as follows:

  1. For each item in the flat list create a cache of nodes.
  2. As each node in the cache changes look back into the cache to find the parent and add / remove from the parent accordingly
  3. The output is the cache of nodes filtered so that only nodes with no parent are included in the result

Part 1 and part 3 of this algorithm can be achieved with existing operators which left me to implement part 2 in a single iteration. This means there is absolutely no need for recursion within the operator which has led to good performance.

I have called this new operator TransformToTree. It has been implemented and released on Nuget as beta version.

Herein I will illustrate how the operator is used and then walk through a working example.

The Simplicity: Create an observable tree

The following object defines an employee which has a boss id who naturally is also an employee.

public class Employee
{
    public int Id {get;set;}
    public string Name {get;set;}
    public int BossId {get;set;}
}

To transform the employee into a fully recursive tree structure you first of all need a cache of employees

var employees = new SourceCache<Employee, int>(x => x.Id)

and now to transform this into a the tree structure use the new TransformToTree operator.

var myTree = employees.TransformToTree(employee => employee.BossId);

At that is that, we have an observable tree. The resulting observable is a deeply nested node structure representing an organisational hierarchy. As the cache of employees is maintained the nodes of the tree will completely self maintain.

Do this if you need to cache the tree.

var myTreeCache = myTree.AsObservableCache();

but perhaps a more obvious example would be to display the result on a tree control on a gui.

A WPF Example

Full source code for demo here

This example takes a flat hierarchy of employees, binds the nodes to a WPF TreeView and adds some buttons which change the underlying data in order to demonstrate that the tree reflects changes to the underlying data i.e. truly reactive.

Screenshot

I will not be illustrate the entire code base here as it would make this post too long. So instead I will outline what each part of the code does together with an extract of the core functionality.

Code file What does it do
MainWindow.xaml Xaml producing the view
EmployeesViewModel.cs The main view model for the example
EmployeeViewModel.cs Recursive view model for each node
EmployeeService.cs Data source for employees. Also provides methods to sack or promote employees

Most of the code in this example is boiler plate so below I will explain only the key parts of the code.

In EmployeesViewModel.cs the following code transforms the employee data into the tree structure then the second transform function takes the node which dynamic data has provided and transforms it into a xaml friendly employee view model.

var treeLoader = employeeService.Employees.Connect()
    //produce the nested tree observable
    .TransformToTree(employee => employee.BossId)
    //Transform each node into a view model
    .Transform(node => new EmployeeViewModel(node, Promote,Sack))
    .Bind(_employeeViewModels)
    .DisposeMany()
    .Subscribe();

Promote and Sack are methods which call into the employee service and change the underlying employee data. These are invoked by commands in the employee view model. I have included these actions purely to show that changing the underlying data source is reflected in the tree structure proving it is truly a reactive tree.

The employee view model is a full recursive view model. The example project loads 25,000 employees which implies there are 25,000 nodes in the tree. Clearly the tree view would struggle binding to such a large tree. To circumvent this problem the child view models are lazy loaded when the parent node is expanded. The following snippet shows how this is achieved.

public EmployeeViewModel(Node<EmployeeDto, int> node, Action promoteAction, Action sackAction, EmployeeViewModel parent = null)
{
    //.................................
    //Setting of backing fields not shown...

    //Wrap loader for the nested view model inside a lazy so we can control when it is invoked
    var childrenLoader = new Lazy(() => node.Children.Connect()
        .Transform(e => new EmployeeViewModel(e, promoteAction, sackAction,this))
        .Bind(Inferiors)
        .DisposeMany()
        .Subscribe());

    //return true when the children should be loaded
    //(i.e. if current node is a root, otherwise when the parent expands)
    var shouldExpand = node.IsRoot
         ? Observable.Return(true)
         : Parent.Value.ObservePropertyValue(This => This.IsExpanded).Value();

    //wire the observable
    var expander =shouldExpand
        .Where(isExpanded => isExpanded)
        .Take(1)
        .Subscribe(_ =>
        {
        //force lazy loading
        var x = childrenLoader.Value;
        });

    //Not all code show....
}

And that is about it. All that is left is create the xaml to bind to the a tree view. This is pretty standard xaml so I will proffer no further explanation here.

This operator and post were inspired by a question about whether dynamic data could create a hierarchy. Since then I have been asked whether I am going to expand dynamic data to include continuous aggregations and the answers is big YES. Watch this space.

Logical collection operators

I stated previously that my next post would be an illustration of how to use Dynamic Data to virtualise data (there is a Virtualise operator). Although powerful the example I had in mind just seemed too boring so I decided to put if off until I can think of a means of making the demo interesting. Instead I will unveil the logical collection operators together with what I hope you find to be a clear example.

Dynamic data can apply logical / set type operators across two or more dynamic data sources. Suppose we have the 2 dynamic data sources, sourceA and sourceB, we can apply the following logical collection operators:

Operator Syntax What gets included
Or sourceA.Or(sourceB) Items which in sourceA or sourceB
And sourceA.And(sourceB) Items which are in sourceA and sourceB
Except sourceA.Except(sourceB) Items which are in sourceA and not sourceB

The constraint is the left and right data source are of the same type. The power of dynamic data is when items get added, updated and removed in either sourceA or sourceB the result automatically updates to reflect this.

The truth is it is only occasionally that I have had the practical reason to apply the above operators but when I have I have been delighted that I took the trouble to program them in the first place as adding observers to two dynamic data sources and manually combining them to produce a third is tedious and boring code indeed.

Now for the example of when this can be useful.

Create a dynamic list of search hints

On several of the screens I have created for Dynamic Trader there is a search text box which is used to create and apply a predicate to the trades data source. For all the examples in the project it is produced using the following code.

return  trade => trade.CurrencyPair.Contains(searchText,stringComparison.OrdinalIgnoreCase) 
 ||  trade.Customer.Contains(searchText, StringComparison.OrdinalIgnoreCase);

This predicate applies to the currency pair and the customer fields so I think it would be good user experience to provide some hints to help the user understand what makes a valid search term. That’s why I have changed the text box to a combo to produce the following.

Combine 2 dynamic collections to form a list of hints

The combo box displays a list which is made up of the customers and currency pairs in the underlying dynamic data source. As the user types the list is filtered accordingly. Also as the underlying data changes, the resulting drop down list will also change accordingly. As with all examples in the demo project we have one in-memory data source which is accessed from theITradeService interface and for brevity is not show in the code below.

First we need a distinct dynamic data source of customers from the trades data source.

var customers = tradesDataSource.DistinctValues(trade => trade.Customer);

And to get a distinct dynamic data source of currency pairs from the same trades data source.

var customers = tradesDataSource.DistinctValues(trade => trade.CurrencyPair);

Both currency pair and customer are string fields so the two data sources can be combined like this

var combinedstrings = customers.Or(currencypairs)

And now we have single observable change set of currency pair and customer strings. Herein we do the standard dynamic data stuff to filter, sort and bind the result to the combo box.

//Filter the combined list according to user entered input and bind it to hints
var loader = combinedstrings
    .Filter(filter)     //filter strings using a filter controller according to user entered text
    .Sort(SortExpressionComparer<string>.Ascending(str=>str))
    .ObserveOn(schedulerProvider.MainThread)
    .Bind(_hints)       //bind to hints list
    .Subscribe();

I have skipped over a lot of detail such as filtering as I have described the process in many posts before and I did not want to obscure the crux of the example which is the Or operator. Additionally I also introduced the DistinctValues operator which I think from the above code is self explanatory.

With this example there is scope for one more powerful operator. If the result produced by the filter is large and being as the binding operation has to take place in the UI thread, you can use the Top operator to limit the number of items returned by the result set. It is applied after the sort operator.

    .Sort(SortExpressionComparer<string>.Ascending(str=>str))
    .Top(15) //limit result to a maximum of 15 items
    //....do binding

This will make the search combo responsive and non blocking even for filtering large lists. The whole example is very easy and produced in 70 lines of code. If you don’t believe me look at SearchHints.cs.

Summary

In the last post I promised to illustrate the Virtualise operator but decided against it for now, but in this short article I have shown 5 new operators which I hope will convince you that dynamic data can make life so much easier for the handling of asynchronous dynamic collections in any application.

Although the examples in this blog have mostly ended up in binding operations, I emphasise that dynamic data is for collections first and foremost whether on a mobile device, desktop or server. I have put results on the screen simply to visually show what is happening to the data. I have a few more app / binding samples in mind and after that I think I will write a purely logical example which involves no screen – perhaps the foundation of an algo-trading component.

Links to source code

Search hints object SearchHints.cs
View model of code using search hints LiveTradesViewer.cs
All examples here Dynamic trader demo on GitHub
Dynamic data Source code on Github

Dynamically Sort, Filter And Page Data

This is the first in a new series of blog where I will illustrate how dynamic data and reactive extensions can be used to improve the performance of WPF applications. Answering this question on stack overflow has prompted me to write what I plan to be a 3 part mini-series. In this part I will examine paging of in-memory data to reduce how much data a grid binds to and in subsequent posts I will illustrate virtualising data, and finally I will look at injecting behaviours into visible rows.

It has become my custom to start each post with an image then I explain how I got there. So here’s the image, it shows a screen which can filter, sort and page in-memory data.

Paging with dynamic data

If you cannot be bothered reading the remainder of this article, the demo and code can be studied via the following links

All the code is in the demo project is here Dynamic data demo project on GitHub
View model PagedDataViewer.cs
View PagedDataView.xaml
Dynamic data source code here Github

Why use paging

So the first question is why would you page data when I can simply bind to all of it? That’s a reasonable question and mostly I would say there is no need. However for large collections or collections which rapidly update the main thread can often block whilst the collection is updating. I have found this to be the case even with virtualization enabled in the xaml. This is because the observable collection can only be updated on the main thread which is clearly problematic as it blocks. Additionally the ListCollectionView may have to apply sort operations which are very expensive as the ListCollectionView has to linearly find the correct position of each item. As the collection gets larger the linear find and replace operations get slower and slower. I have found from bitter experience that binding to more than 10,000 rows in WPF can be problematic.

Actually I could go on for a while about performance bottlenecks in WPF both from the data and the xaml perspective but that is debate which we can have on another day.

There are of course many solutions to the problem but now I will concentrate of using dynamic data to create a paged view.

Create controllers to dynamically change observables

Dynamic data provides a load of extensions and some controllers to dynamically interrogate the data. For our screen we need a few controllers to dynamically filter, sort and page.

    var pageController = new PageController();  
    var filterController = new FilterController<T>(); 
    var sortController = new SortController<T>(); 

where the values can be changed like this

    //to change page
    pageController.Change(new PageRequest(1,100));
    //to change filter 
    filterController.Change(myobject=> //return a predicate);
    //to change sort
    sortController .Change( //return an IComparable<>);

Use these controller to build a filtered, sorted and paged dynamic data stream

As with all the example in this blog, the data is fed from a shared in-memory cache which is exposed through the ITradeService interface. The following code is only marginally different from code in previous posts.

    //this is an extension of observable collection optimised for dynamic data
    var collection = new ObservableCollectionExtended<TradeProxy>();

var loader = tradeService.All .Connect() 
   .Filter(filterController) // apply user filter
   .Transform(trade => new TradeProxy(trade), new ParallelisationOptions(ParallelType.Ordered, 5))
   .Sort(sortContoller, SortOptimisations.ComparesImmutableValuesOnly)
   .Page(pageController) // this applies the paging and returns on result effecting the current page
   .ObserveOn(schedulerProvider.MainThread)
    //ensure page parameters class knows which page we are on
   .Do(changes => _pageParameters.Update(changes.Response))
   .Bind(_data)     // update observable collection bindings
   .DisposeMany()   //dispose when no longer required
   .Subscribe();

In one line of code the data has been transformed, filtered, sorted and the current page is bound and reflected in the observable collection. And as if by magic the observable collection will self-maintain when any of the controller parameters change or when any of the data changes. At any time the parameters of the controllers can be changed to dynamically change the results of the current page.

The result with this small segment of code is that by applying the page operator we have significantly reduced the number of records bound to the grid and therefore reduced the work load on the main thread.

Hooray, let’s open the champagne. Almost that time, but not quite. We have not set the controller parameters yet nor have we created anything means for the user to changing the page, sort or apply a filter. For the user to enter these values I have created a couple supporting objects which are explained below.

Apply Page Changes

PageParameterData.cs is the class containing the latest page details as well as commands to move to the next and previous page. The commands are bound to the skip previous and next buttons and when these are pressed the page number property changes. This fires a notification which is observed using some simple Rx.

We observe the size and current page properties as followings.

//observe size and current page
var currentPageChanged = PageParameters.ObservePropertyValue(p => p.CurrentPage).Select(prop => prop.Value);
var pageSizeChanged = PageParameters.ObservePropertyValue(p => p.PageSize).Select(prop => prop.Value);

//combine values, create request object and change the controller.
var pageChanger = currentPageChanged.CombineLatest(pageSizeChanged,
                           (page, size) => new PageRequest(page, size))
                           .DistinctUntilChanged()
                           .Sample(TimeSpan.FromMilliseconds(100))
                           .Subscribe(pageController.Change);

The latest values of each are combined into a new PageRequest and the page controller is updated to this value. This reapplies the page logic producing a next page response which includes the next page of data.

Apply Filtering

As with several example screens in the dynamic data menu we have the SearchText property on the main view model. We observe changes, build a predicate and update the filter controller.

  var filterApplier = this.ObservePropertyValue(t => t.SearchText)
                .Throttle(TimeSpan.FromMilliseconds(250))
                .Select(propargs => BuildFilter(propargs.Value))
                .Subscribe(filterController.Change);

where the build filter function is as follows

private Func<Trade, bool> BuildFilter(string searchText)
 {
     if (string.IsNullOrEmpty(searchText)) return trade => true;
     return t => t.CurrencyPair.Contains(searchText, StringComparison.OrdinalIgnoreCase) 
                          || t.Customer.Contains(searchText, StringComparison.OrdinalIgnoreCase);
 }

Apply Sorting

SortParameterData.cs is the view model to bind the sorting data. The following code observes the selected item and applies the selected comparer to the new sort controller.

  var sortChange = SortParameters.ObservePropertyValue(t => t.SelectedItem).Select(prop=>prop.Value.Comparer)
          .ObserveOn(schedulerProvider.TaskPool)
          //Change the sort controller
          .Subscribe(sortContoller.Change);

Summary

This code is surprisingly easy with the main view model having about 100 lines of code. You probably would not believe if I said I wrote all of it in under 3 hours. Admittedly I have the infrastructure for the page changing and the sorting from another project but nonetheless I can assure you that when you are up to speed with dynamic data, you will regard the manipulation of collections of data very easy indeed.

Next time I will be doing something similar yet simpler by showing how dynamic data can virtualise data.

Log Entry Viewer Using Dynamic Data and ReactiveUI

Log Entry Viewer

In this post I show how to create a log entry viewer which uses both Dynamic Data and ReactiveUI. This example screen allows viewing, filtering and removal of log entry items from memory.

There are several features which I hope act as a good example of dynamic data, reactive ui and where reactive ui and dynamic data can compliment each other. This post is long as there is loads of explanation required yet the implementation is reasonably easy and straight forward.

All the code is in the demo project is here Dynamic data demo project on GitHub
The main view model is here LogEntryViewer.cs
The log entry service code LogEntryService.cs

Apart from some domain objects and a little xaml, the above classes consisting of a few hundred lines of code is all that is required to implement the screen. It is easy once you understand a few key concepts about dynamic data and a few key concepts about reactive ui. I hope after you read this article and gone though the code, you will think the same.

1. Create an log entry observable

We need to get a reactive log stream of log entry elements. Using Log4Net we can create a custom appender see ReactiveLogAppender.cs. The appended publishes each log entry to a static subject ISubject. This subject is exposed via a static property ReactiveLogAppender.LogEntryObservable {get;} and will be used by a service to load and manage log entries.

2. Populate an observable cache and expose as a service

There are overloads in dynamic data which convert a standard rx observable into a cache. For example you could do the following which takes the log entry observable, converts it first of all into an observable change set, then caches the result. The size limiter is optional and ensures the cache does not indefinitely grow. When the size limit is reached the oldest items are removed from the internal cache. There is also an overload for time limited expiration of items.

var logEntryCache = ReactiveLogAppender.LogEntryObservable
                     .ToObservableChangeSet(le=>le.Key,limitSizeTo: 10000) 
                     .AsObservableCache(); 

This code results in a read only observable cache. But for this example I want to expose methods to remove items, so instead I create an editable cache as follows. Since this is a cache, I have had to specify a unique key which for the log entry domain object is a counter.

var editableCache = new SourceCache<LogEntry, long>(l => l.Key);
//limit size
var sizeLimiter = editableCache.LimitSizeTo(10000).Subscribe();
//load from observable
var loader = ReactiveLogAppender.LogEntryObservable
                    .Subscribe(editableCache.AddOrUpdate);

If you want to make the cache read only, call the following method.

var readonlycache=editableCache.AsObservableCache();

For this example we will expose the cache via a service

public interface ILogEntryService
{
  IObservableCache<LogEntry, long> Items { get; }
  void Add(LogEntry items);
  void Remove(IEnumerable<LogEntry> items);
  void Remove(IEnumerable<long> keys);
}

Implementing these methods is trivial so I will not bore you will all the details of the service . The complete code is here LogEntryService.cs. This service is instantiated when the system starts as we need to catch all log entries even before the user opens the log entry view screen.

3. Create the view model, using features from both dynamic data and reactive ui

There is a lot happening in this screen so I will break it down into it’s functional parts

a) Populate a reactive list and enable user filtering

I have written extensively in previous posts about filtering and loading data into an observable collection using dynamic data observables. But to recap in dynamic data any adds, updates and removes are notified via an observable change set. Each operator is responsible for responding to a source observable, doing something and then in turn publishing it’s own change set. This way long and complicated transformation streams of data can be composed with very simple fluent and declarative code.

For the log entry example we take the data from log entry service and create an observable which processes the result and binds the data to the screen using the following code.

//this is the target list which is bound to the view
var reactiveList = new ReactiveList<LogEntryProxy>();

//a controller which allows changing a filter any time
var filter = = new FilterController<LogEntryProxy>(l => true);

 //filter, sort and populate reactive list,
var dataLoader = logEntryService.Items.Connect() //connect to the cache
     .Transform(le => new LogEntryProxy(le))     //Create a proxy
     .Filter(filter)                             //Apply a dynamic filter
     .Sort(SortExpressionComparer<LogEntryProxy>.Descending(l => l.TimeStamp))
     .ObserveOn(RxApp.MainThreadScheduler)
     .Bind(reactiveList)                          
     .DisposeMany()                              //automatic disposal
     .Subscribe();

The above code takes data from the log entry service, creates a proxy, sorts by most recent first, binds to the reactive list and finally when the proxy is finished with it is disposed.

The missing ingredient is the code to apply a filter. For this we need a string field with a 2 way binding to a view, and when it changes a new filter is build and applied.

//Binding to allow a user to to enter some text 
 public string SearchText
 {
     get { return _searchText; }
     set { this.RaiseAndSetIfChanged(ref _searchText, value); } 
 }

//using reactive ui operator to respond to any change 
var filterApplier = this.WhenAnyValue(x => x.SearchText)
                .Throttle(TimeSpan.FromMilliseconds(250))
                .Select(BuildFilter)
                .Subscribe(filter.Change); //apply predicate to filter controller

 //build the predicate
 private Func<LogEntryProxy, bool> BuildFilter(string searchText)
 {
     if (string.IsNullOrEmpty(SearchText))
         return logentry => true;

     return logentry => logentry.Message.Contains(SearchText, StringComparison.OrdinalIgnoreCase)
                     || logentry.Level.ToString().Contains(SearchText, StringComparison.OrdinalIgnoreCase);
 }

Now we have an observable list of log entry items where the user can enter some search text and the result will match the text. Not terribly exciting so far, but terribly simple!

Herein we will make the the screen more interesting and create some stuff which will give user some feed back.

b) Highlight new items

As this is a log entry view items are continually being added so the first bit of user feed back is to highlight newly added rows. The solution to this is simply achieved using reactive code made better by applying a the reactive ui ToProperty() operator. I will create a flag on the LogEntryProxy which will be used to highlight new rows for 2 seconds.

To create a reactive ui property:

//the super cool lazy backing field
 private readonly ObservableAsPropertyHelper<bool> _recent;

//which is exposed like this.
 public bool Recent  {  get { return _recent.Value; } 

and populated like this

_recent = Observable.Create<bool>(observer =>
                                {
                                    var isRecent = DateTime.Now.Subtract(original.TimeStamp).TotalSeconds < 2;
                                    if (!isRecent)   return Disposable.Empty;
                                    observer.OnNext(true);
                                    return Observable.Timer(TimeSpan.FromSeconds(2)).Select(_=>false).SubscribeSafe(observer);
                                }).ToProperty(this,lep=>lep.Recent);

Full code LogEntryProxy.cs

The power of ToProperty is it is lazy. It encapsulates a strategy which I have used for a long time to radically speed up reactive code where code is invoked only on demand, yet I had never encapsulated the concept so well. This is why it is one of my favourite parts of RXUI,

As a practical explanation, if I opened a list with 10,000 items where each row has an observable then 10,000 observables are created as part of loading the list even though a user can only see about 30 at the time. This is clearly performance and memory intensive. In the trading systems which I write each row can have dozens of observations on each proxy so only loading what is required when it is required is a very powerful strategy indeed.

The only more efficient performance and memory strategy which could be deployed is to make observables visibility aware meaning they can be hydrated and de-hydrated on demand. This requires a higher level of control i.e. some form of controller which perhaps I will write about in the future. But not now.

The resulting flag on the proxy can be used to apply a simple xaml trigger in wpf, enabling the recently added rows to be highlighted

<Style TargetType="{x:Type ListViewItem}" BasedOn="{StaticResource MetroListViewItem}">
    <Style.Triggers>
     <DataTrigger Binding="{Binding Recent}" Value="True">
             <Setter Property="Background" Value="{DynamicResource SecondaryAccentBrush}"/>
         <Setter Property="Foreground"  Value="{DynamicResource SecondaryAccentForegroundBrush}"/>   
     </DataTrigger>
    </Style.Triggers>
</Style>

I’ve got to be honest here, I have no idea how I would apply the above trigger using reactive ui binding capabilities, If anyone can let me know, then please do so,

c) Create a delete command and provide some user text

In the example, the user can delete log entry items. So we need a command which can be enabled or disabled according to whether items are selected. The starting point is I created a controller that manages which items are selected by exposing an observable collection of selected items. SelectionController.cs and it’s implementation is not part of what I am trying to illustrate so I will profer no extra explantion about it.

Knowing what items are selected enables us to build the delete command.

The first task is to convert the observable collection into a strongly typed observable change set which provides an observable of adds, updates and removes. Dynamic data provides an operator for this out of the box.

//create the selection controller. This is bound using an attached property
var selectionController = new SelectionController()

//Selected property is ObservableCollection<object>, so convert the select items into an observable change set of log entries
var selectedItems = selectionController.Selected.ToObservableChangeSet().Transform(obj => (LogEntryProxy)obj);

This observable can be directly converted into a command using a combination of dynamic data’s QueryWhenChanged operator and reactive ui’s ToCommand operator.

//make a command out of selected items, enabling the command when there is a selection 
 var deleteCommand = selectedItems
                          .QueryWhenChanged(query => query.Count > 0)
                          .ToCommand();

QueryWhenChanged exposes the inner items of the list when any change has occurred and it returns an observable of whatever data is returned. In this example query.Count>0 is a boolean so the operator returns a boolean observable which enables the command conversion.

This command can now be directly bound to a button. It will be enabled only when items are selected. However so far we are doing nothing when the command is invoked so we must subscribe to the reactive command as follows.

//Assign action when the command is invoked
 var commandInvoker =  this.WhenAnyObservable(x => x.DeleteCommand)
          .Subscribe(_ =>
          {
              //do the code to remove items from LogEntryService
          });

Just one finishing touch for the delete operation, we could do with a message tailored from the user selection. Again I will use the same QueryWhenChanged operator but this time convert the resulting string observable to a property.

var deleteItemsText = selectedItems.QueryWhenChanged(query =>
       {
           if (query.Count == 0) return "Select log entries to delete";
           if (query.Count == 1) return "Delete selected log entry?";
           return string.Format("Delete {0} log entries?", query.Count);
       })
       .StartWith("Select log entries to delete")
       .ToProperty(this, viewmodel => viewmodel.DeleteItemsText);

  //deleted items property
   public string DeleteItemsText    { get { return _deleteItemsText.Value; }   }

A simple binding to DeleteItemsText can now display the appropriate message.

d) Fade out deleted items

The next visual feedback to apply is to fade out when an item is deleted. Instead of removing an item immediately it will gradually fade out and then be removed.

Translated into code, this means each removed item needs to be delayed for a period, and upon the delay period commencing, a flag should be set on the object which will tell the view to run a fade out transition. There are many solutions to this problem but I found the most functional was to create a dynamic data based custom operator. I came up with an extension.

public static IObservable<IChangeSet<TObject, TKey>> DelayRemove<TObject, TKey>(this IObservable<IChangeSet<TObject, TKey>> source,  TimeSpan delayPeriod, Action<TObject> onDefer)
{
   if (source == null) throw new ArgumentNullException("source");
   if (onDefer == null) throw new ArgumentNullException("onDefer");

   return Observable.Create<IChangeSet<TObject, TKey>>(observer =>
   {
       var locker = new object();
       var shared = source.Publish();
       var notRemoved = shared.WhereReasonsAreNot(ChangeReason.Remove)
       .Synchronize(locker);

       var removes = shared.WhereReasonsAre(ChangeReason.Remove)
       .Do(changes => changes.Select(change => change.Current).ForEach(onDefer))
       .Delay(delayPeriod)
       .Synchronize(locker);

       var subscriber = notRemoved.Merge(removes).SubscribeSafe(observer);
       return new CompositeDisposable(subscriber, shared.Connect());

   });
}

In this operator the observable is split into 2 observables, where one part is the adds and updates and the other is the removes. The removed part has a delay applied and when delayed calls back to the consumer.

This operator can now be applied to any dynamic data observable. In our case adjust the observable which we use to load the reactive list and make a DelayRemove call.

//filter, sort and populate reactive list,
var dataLoader = logEntryService.Items.Connect()
                .DelayRemove(TimeSpan.FromSeconds(0.75),proxy =>
                      {
                          //set a property which will start a xaml fadeout storyboard
                          proxy.FlagForRemove();
                          //deselect property other selected brush will be used
                          _selectionController.DeSelect(proxy);
                      })
                 .//more operators to filter and load the list

The xaml for the delay

<Style TargetType="{x:Type ListViewItem}" BasedOn="{StaticResource MetroListViewItem}">
       <Style.Triggers>
       <DataTrigger Binding="{Binding Removing}" Value="True">
           <DataTrigger.EnterActions>
               <BeginStoryboard>
                   <Storyboard>
                       <DoubleAnimation
                               Storyboard.TargetProperty="(TextBlock.Opacity)"
                               From="1.0" To="0.1" Duration="0:0:0.75"/>
                   </Storyboard>
               </BeginStoryboard>
           </DataTrigger.EnterActions>
       </DataTrigger>
   </Style.Triggers>
 </Style>

e) Log count summary

The final feature is a summary of the log count. Yet again. There is nothing new below which has not already been explained above so all I need to say is the following code queries the log entry service and produces an aggregated summary of the different log levels.

//aggregate total items
var summariser = logEntryService.Items.Connect()
      .QueryWhenChanged(query =>
      {
          var items = query.Items.ToList();
          var debug = items.Count(le => le.Level == LogLevel.Debug);
          var info = items.Count(le => le.Level == LogLevel.Info);
          var warn = items.Count(le => le.Level == LogLevel.Warning);
          var error = items.Count(le => le.Level == LogLevel.Error);
          return new LogEntrySummary(debug, info, warn, error);
      })
      .Subscribe(s => Summary = s);

This can now be bound to display a running aggregation of the log level item counts.

4. Integrate Reactive UI into the existing sample dynamic data project

a) Use another IOC container

Reactive UI has it’s own dependency injection mechanism yet my demo application was already set up to use the marvellous structure map. You can if you wish use the built in mechanism for all your registrations but most of us have have our own favourite IOC container. Thankfully ReactiveUI accommodates our choices by exposing an API to allow use of any mechanism.

I found out how to use this API after a quick google search. This custom-structuremap-dependency-resolver-for-reactiveui-5 taught me to implement IMutableDependencyResolver and register it. However this example uses reactive ui 5 and reactive ui 6 now sits on top of Splat, so the registration takes place with Splat.

//create a structure map container
var container =  new Container(x=> x.AddRegistry<AppRegistry>());
//ReactiveUIDependencyResolver implements IMutableDependencyResolver
var resolver =  new ReactiveUIDependencyResolver(container);

//...Register views (see next section)

//register IMutableDependencyResolver
Locator.Current = resolver;

See ReactiveUIDependencyResolver.cs for implementation of IMutableDependencyResolver using structure map.

b) Set up ReactiveUI view hosting

I defer now to this excellent post A GitHub Client with WPF and ReactiveUI Part 2 for the brief and concise explanation of how views are handled using ReactiveUI’s ViewModelViewHost. It taught me everything I need to know to get up an running with the reactive ui way of doing things. Well maybe not everything but it got me off to a running start. So I will not dwell too much on details as I do not wish to replicate that post, but i will briefly explain.

ReactiveUI makes use of a view model host control ViewModelViewHost which exposes a view model dependency property. When some content is bound to the ViewModel property, the dependency resolver resolves the view which is registered via splat.

I created a control with the following definition

<reactiveUi:ViewModelViewHost ViewModel="{Binding Content}"
                        HorizontalContentAlignment="Stretch"
                        VerticalContentAlignment="Stretch"/>

For full xaml RxUiHostView.xaml
The view model which this is bound to RxUIHostViewModel.cs

In demo app, when the view model is required, it is directly resolved and bound to the above xaml. This is where reactive ui takes control. For the view model host to resolve the correct view the dependency resolver must be told what view to use for each view model. This is achieved as followings using the dependency resolver.

//register the log entry view example
resolver.Register(() => new LogEntryView(), typeof(IViewFor<LogEntryViewer>));

This has associated the view and the view model. The view must implement IViewFor which is illustrated next.

c) View to View Model Bindings

Reactive ui provides it’s own binding functionality. I am not too well versed in all of the details so I have not gone the whole hog, but for those who are interested this in the code being my view.

I have set up a few of the bindings using reactive ui bindings for illustrative purposes and the remainder I use wpf bindings.

 public partial class LogEntryView : UserControl, IViewFor<LogEntryViewer>
 {
     public LogEntryView()
     {
         InitializeComponent();

         //use wpf bindings for the data context    
         this.WhenAnyValue(x => x.ViewModel).BindTo(this, x => x.DataContext);

         //use rx ui binding for the search text box and delete command
         this.Bind(ViewModel, model => model.SearchText, view => view.SearchTextBox.Text);
         this.OneWayBind(ViewModel, model => model.DeleteCommand, view => view.DeleteButton.Command);
         //should be able to bind to a command as follows but for some reason it kept throwing, so I quickly qave up!
          //  this.BindCommand(ViewModel, model => model.DeleteCommand, view => view.DeleteButton);
     }

     public static readonly DependencyProperty ViewModelProperty = DependencyProperty.Register(
         "ViewModel", typeof (LogEntryViewer), typeof (LogEntryView), new PropertyMetadata(default(LogEntryViewer)));

     public LogEntryViewer ViewModel
     {
         get { return (LogEntryViewer) GetValue(ViewModelProperty); }
         set { SetValue(ViewModelProperty, value); }
     }

     object IViewFor.ViewModel
     {
         get { return ViewModel; }
         set { ViewModel = (LogEntryViewer)value; }
     }
 }

If anyone wants to change the bindings for this screen to completely use rx ui bindings, please be my guest. Make the changes in github and submit a pull request. Thanks!

5. That’s all folks

Both dynamic data and ReactiveUI are functionally very rich and this article only touches the surface of both but I believe the code in this example will be a good starting point for anyone wishing to find out about either of these libraries.

Trading Demo Part 5 – Aggregate Dynamic Data

This is part 5 of building a trading system series. The idea has been to take one source of rapidly changing trading data and demonstrate the range, power and simplicity of applying dynamic data operators to it. Throughout the series I have made the assumption that the reader is already familiar and comfortable with the absolutely brilliant reactive extensions framework. For those who are not I strongly recommend to check them out. There is a steep learning curve but I assure anyone that once you start thinking reactive you will never turn back. You will probably wonder how you ever programmed without it.

If you are new to dynamic data I suggest you first read this Getting Started and if you have not seen the previous parts they are

1. Expose the data as a service
2. Manage market data
3. Integration with the UI
4. Filter on calculated values

Also if you want to download the code which goes with this example, go to Trading Demo on GitHub and download and run in visual studio. Open the menu option labelled Trading Positions and you will see the following screen.

Aggregated Positions

This screen connects to the trade service data, groups by currency pair and aggregates the totals for each grouping of currency pairs. With dynamic data doing this becomes so simple with very little code involved as I am about to show.

The starting point is to group the trade data by currency pair. This is achieved using dynamic data’s Group operator.

//where tradeService is the trade service created in Part 1.
var mygroup = tradeService.Live.Connect()
                          .Group(trade => trade.CurrencyPair)

This does not look like much but the group operator is very powerful indeed. As with all things reactive, changes in the trade service are automatically propagated to the appropriate grouping. Dynamic Data extends the reactive framework by accounting for adds, updates and removes. So when trades are created, closed or amended the groups will automatically reflect these changes.

The grouping object creates a cache of each group and it’s signature is like this.

public interface IGroup<TObject, TKey, out TGroupKey>
{
    TGroupKey Key {get;}
    IObservableCache<TObject, TKey> Cache { get; }
}

Three generic types, a bit ugly I admit but we have a dynamic collection of caches, one for each item matching the group selector in the underlying data source. When grouping I almost always apply a transform function to do something with the group. In this example we want to aggregate totals for each group, so first I have created an object which accepts the grouping in it’s constructor and apply some dynamic aggregations on the cache. For this we need to use dynamic data’s Transform operator. This is akin to the standard reactive Select operator but as with several other operators in dynamic data I have to use an alternative semantic dues to ambiguous reference problems for extensions.

var transformedGroup = tradeService.Live.Connect()
                .Group(trade => trade.CurrencyPair)
                .Transform(group => new CurrencyPairPosition(group))

This means for each group we have a CurrencyPairPosition object which will be used to dynamically calculate the sum total of the buys and sells so that the overall position can be calculated. For the aggregation I have used another dynamic data operator QueryWhenChanged. The following exert illustrates it’s usage

//constructor only for illustration
public CurrencyPairPosition(IGroup<Trade, long, string> tradesByCurrencyPair)
{
    var aggregations = tradesByCurrencyPair.Cache.Connect()
                        .QueryWhenChanged(query => //query and return a result
}

For each change in the underlying cache the operator is invoked. The parameter exposes a querying API so the underling cache can be queried in a functional manner. The consumer must return a result which becomes an observable of whatever is returned. In this example we have used some standard linq to objects to calculate the buy and sell totals and return a new object which contains these values.

If we expand out the above code we easily create an observable of trade positions for each currency pair,

  var aggregations = tradesByCurrencyPair .Cache.Connect()
                       .QueryWhenChanged(query =>
                        {
                            var buy = query.Items
                                        .Where(trade => trade.BuyOrSell == BuyOrSell.Buy)
                                        .Sum(trade=>trade.Amount);
                            var sell = query.Items
                                        .Where(trade => trade.BuyOrSell == BuyOrSell.Sell)
                                        .Sum(trade => trade.Amount);
                            var count = query.Count;
                            return new TradesPosition(buy,sell,count);
                        });
                //subscribe to the result and set a property to the latest

We are at the point where we have a CurrencyPairPosition object which always reflects the latest position so the last things is to expand the consuming code so we can bind to the collection of these.

 //not all code show and _data is a derived Observable collection
  var subscriber = tradeService.Live.Connect()
                .Group(trade => trade.CurrencyPair)
                .Transform(group => new CurrencyPairPosition(group))
                .Sort(SortExpressionComparer<CurrencyPairPosition>.Ascending(t => t.CurrencyPair))
                .ObserveOn(schedulerProvider.MainThread)
                .Bind(_data)
                .DisposeMany()
                .Subscribe();

This will maintain the sorted order and ensure that CurrencyPairPosition objects are disposed when no longer required. Now a little xaml bound to the data property can produce the screen at the top of the page.

In summary, Dynamic Data makes the reactive management of collections of data very very easy indeed. If you look at the code list below, you will see just how little is involved. There are about 170 lines of code over three c# classes which also includes namespace and using declarations. And what’s even better is despite the underlying data rapidly moving, nowhere have I had to concern myself with thread safety and all the operators are completely thread-safe so long as all code functionally remains inside the operators.

The full code for all the objects is

1. PositionsViewer.cs (The view model)
2. CurrencyPairPosition.cs
3. TradesPosition.cs (the object which holds the result)
4. PositionsView.xaml (the screen)