Observable List At A Glance

Dynamic data v4 has been taken out of beta and officially released.

For source code see Dynamic Data on GitHub
For install see Dynamic Data On Nuget

Dynamic data has always had an observable cache and now after a busy 4 months development and 188 commits there is an observable list. I also could not resist throwing in some aggregation operations. These new features are outlined in this post.

Getting Started

Create an observable list like this:

var myInts= new SourceList<int>();

There are direct edit methods, for example

myInts.AddRange(Enumerable.Range(0, 10000)); 
myInts.Add(99999); 
myInts.Remove(99999);

Each amend operation will produced a change notification. A much more efficient option is to batch edit which produces a single notification.

myInts.Edit(innerList =>
{
   innerList.Clear();
   innerList.AddRange(Enumerable.Range(0, 10000));
});

If SourceList is to be exposed publicly it can be made read only

IObservableList<int> readonlyInts = myInts.AsObservableList();

which hides the edit methods.

The list changes can be observed by calling myInts.Connect(). This creates an observable change set for which there are dozens of list specific operators. The changes are transmitted as an Rx observable so are fluent and composable.

Chain together operators

The following filters the observable list to include only odd numbers and applies a sum operation.

int total = 0;
var sumOfOddNumbers = myInts.Connect()
.Filter(i => i%2 == 1)
.Sum(i => i)
.Subscribe(result => total = result);

The result is a true observable. When values are added, replaced or removed from myInts , the result will always reflect these changes.

Create a derived list

Any observable change set can be converted into an observable list so for example


var myOddNumberList = myInts.Connect()
.Filter(i => i%2 == 1)
.AsObservableList();

produces a read only observable list of odd numbers.

Dynamic data compliments Rx but since Rx does not deal with collections I have created many list specific operators to account for adds, updates, replaces and moves. There are detail below.

Observable List Operators (35 operators and counting)

The following table is a brief description of all the observable operators which can be applied to the observable list. I will use to as a basic of further detailed documentation. This will take some time but will be done in near future.

Operator Description
Adapt Injects side effects using IChangeSetAdaptor interface.
AsObservableList Converts an observable change set into a read only observable list / hides the edit methods of a source list.
Bind Ensures a specified observable collection reflects the underling data
BufferIf Conditional buffer which accepts a boolean observable to turn buffering on or off
Clone Populate an IList instance with the latest changes.
Convert Light weight conversion / casting of a type as specified by a value selector
DeferUntilLoaded Delay subscription until the source is populated with data.
DisposeMany Dispose each item when removed from the stream. Also disposes all items when the stream is completed.
DistinctValues Return a changeset of distinct values as specified by a value selector
ExpireAfter Time based expiry for a source list
Filter The semantic equivalent of Rx.Where. There is an overload to allow the predicate to be changed
FlattenBufferResult A changeset is already a collection and therefore becomes a nested collection when Rx buffer operatons are applied. Use this to flatten the result back to a single change set
GroupOn Semantic equivalent of Rx.GroupBy operator
LimitSizeTo Limits the size of the source cache to the specified limit, providing a notification when an item is removed due to the size limit being exceeded
MergeMany Dynamically merges the observable which is selected from each item in the stream, and unmerges the item when it is no longer part of the stream.
NotEmpty Prevents a zero count change set notification
OnItemRemoved Callback for each item as and when it is being removed from the stream
Page Applies paging to the the data source
PopulateInto Populates an ISourceList instance with the latest changes
QueryWhenChanged The latest copy of the underlying data is exposed for querying i) after each modification to the underlying data ii) upon subscription
RefCount Change set equivalent to Publish().RefCount(). The source is cached so long as there is at least 1 subscriber.
SkipInitial Defer the subscription until there is underlying data loaded and skip first set of changes
Sort Allow sort operators using a specified comparer. There is an overload to allow the sort to be changed
SubscribeMany Subscribes to each item when it is added to the stream and unsubcribes when it is removed. All items will be unsubscribed when the stream is disposed
ToCollection Converts the changeset into a fully formed collection. Each change in the source results in a new read only collection
Top Limits the size of the result set to the specified number of items
Transform The semantic equivalent of Rx.Select
TransformMany Equivalent to a select many transform
Virtualise Virtualises the data. Parameters are provided and changed using a virtualising controller
WhenAnyChanged Property changed observation of each item in the underlying collection which provides a notification when any item has changed. The notification includes the sender and value of the changed property.
WhenAnyValueChanged Property changed observation of each item in the underlying collection which provides a notification when any item has changed. The notification only includes the value of the changed property.
WhereReasonsAre Includes changes for the specified reasons only
WhereReasonsAreNot Excludes changes for the specified reasons
WithKey Applies a key to each item. This changes the change set from a list change set to a cache changeset

This list has less operators than the observable cache but will be expanded in future releases.

Aggregation Operators

The following aggregation are also new to Dynamic Data version 4. The apply to both observable list and observable cache. With the exception of Count() which requires no parameter, the value to continually compute on is specified as a value selector Func valueSelector

Operator Syntax
Count Counts the number of items in the underlying data
Sum The sum of values matching the specified value selector
Avg The average value matching the specified value selector
Maximum The maximum value matching the specified value selector
Minimum The minimum value matching the specified value selector
StdDev The standard deviation of all the values matching the specified value selector

Reactive Tree using Dynamic Data

For this post I assume familiarity with Dynamic Data or at least familiarity with some of my previous posts, so for those of you who are not already familiar with dynamic data see What is dynamic data? for a brief overview.

Dynamic data has evolved over several years where each new operator has been created to overcome a necessity for a practical problem which I have been working on. It is so functionally rich I always assume that there is not a lot that it cannot do. This complacency was shaken when recently I was asked if it was possible to use Dynamic Data to convert a flat observable data stream to a fully hierarchical structure. I immediately replied yes of course without weighing up in advance what the solution could be.

I quickly identified the need for a new ‘convert a stream to a tree’ operator so I took the challenge to implement it. However the more I looked into the problem the bigger the challenge become and I started to doubt whether I could pull it off.

The complexity: Writing the operator

The contract of dynamic data guarantees any operator will reflect items which are added, updated or removed from its source collection. For this to hold true for a recursive hierarchy is an extreme challenge. For every item there has to be a node which has references to both the parent and child nodes, and these have to be maintained as changes are received. The big challenge is twofold:

  1. To ensure accurate reflection of underling data
  2. Ensure good performance

My first solution was to create a node which applied a filter to find its children and in turn each child also applied a filter to find its children. I quickly got this solution working as I was able to implement it using a combination of the existing dynamic data filter and transform operators. The code was simple but alas had poor performance as each node had to create a filter from the original cache.

My mind was melting trying to find a better solution, then I realised creating and maintaining a nested structure does not necessarily have to be so mind numbingly complex as I first suspected. The algorithm can be made easy as follows:

  1. For each item in the flat list create a cache of nodes.
  2. As each node in the cache changes look back into the cache to find the parent and add / remove from the parent accordingly
  3. The output is the cache of nodes filtered so that only nodes with no parent are included in the result

Part 1 and part 3 of this algorithm can be achieved with existing operators which left me to implement part 2 in a single iteration. This means there is absolutely no need for recursion within the operator which has led to good performance.

I have called this new operator TransformToTree. It has been implemented and released on Nuget as beta version.

Herein I will illustrate how the operator is used and then walk through a working example.

The Simplicity: Create an observable tree

The following object defines an employee which has a boss id who naturally is also an employee.

public class Employee
{
    public int Id {get;set;}
    public string Name {get;set;}
    public int BossId {get;set;}
}

To transform the employee into a fully recursive tree structure you first of all need a cache of employees

var employees = new SourceCache<Employee, int>(x => x.Id)

and now to transform this into a the tree structure use the new TransformToTree operator.

var myTree = employees.TransformToTree(employee => employee.BossId);

At that is that, we have an observable tree. The resulting observable is a deeply nested node structure representing an organisational hierarchy. As the cache of employees is maintained the nodes of the tree will completely self maintain.

Do this if you need to cache the tree.

var myTreeCache = myTree.AsObservableCache();

but perhaps a more obvious example would be to display the result on a tree control on a gui.

A WPF Example

Full source code for demo here

This example takes a flat hierarchy of employees, binds the nodes to a WPF TreeView and adds some buttons which change the underlying data in order to demonstrate that the tree reflects changes to the underlying data i.e. truly reactive.

Screenshot

I will not be illustrate the entire code base here as it would make this post too long. So instead I will outline what each part of the code does together with an extract of the core functionality.

Code file What does it do
MainWindow.xaml Xaml producing the view
EmployeesViewModel.cs The main view model for the example
EmployeeViewModel.cs Recursive view model for each node
EmployeeService.cs Data source for employees. Also provides methods to sack or promote employees

Most of the code in this example is boiler plate so below I will explain only the key parts of the code.

In EmployeesViewModel.cs the following code transforms the employee data into the tree structure then the second transform function takes the node which dynamic data has provided and transforms it into a xaml friendly employee view model.

var treeLoader = employeeService.Employees.Connect()
    //produce the nested tree observable
    .TransformToTree(employee => employee.BossId)
    //Transform each node into a view model
    .Transform(node => new EmployeeViewModel(node, Promote,Sack))
    .Bind(_employeeViewModels)
    .DisposeMany()
    .Subscribe();

Promote and Sack are methods which call into the employee service and change the underlying employee data. These are invoked by commands in the employee view model. I have included these actions purely to show that changing the underlying data source is reflected in the tree structure proving it is truly a reactive tree.

The employee view model is a full recursive view model. The example project loads 25,000 employees which implies there are 25,000 nodes in the tree. Clearly the tree view would struggle binding to such a large tree. To circumvent this problem the child view models are lazy loaded when the parent node is expanded. The following snippet shows how this is achieved.

public EmployeeViewModel(Node<EmployeeDto, int> node, Action promoteAction, Action sackAction, EmployeeViewModel parent = null)
{
    //.................................
    //Setting of backing fields not shown...

    //Wrap loader for the nested view model inside a lazy so we can control when it is invoked
    var childrenLoader = new Lazy(() => node.Children.Connect()
        .Transform(e => new EmployeeViewModel(e, promoteAction, sackAction,this))
        .Bind(Inferiors)
        .DisposeMany()
        .Subscribe());

    //return true when the children should be loaded
    //(i.e. if current node is a root, otherwise when the parent expands)
    var shouldExpand = node.IsRoot
         ? Observable.Return(true)
         : Parent.Value.ObservePropertyValue(This => This.IsExpanded).Value();

    //wire the observable
    var expander =shouldExpand
        .Where(isExpanded => isExpanded)
        .Take(1)
        .Subscribe(_ =>
        {
        //force lazy loading
        var x = childrenLoader.Value;
        });

    //Not all code show....
}

And that is about it. All that is left is create the xaml to bind to the a tree view. This is pretty standard xaml so I will proffer no further explanation here.

This operator and post were inspired by a question about whether dynamic data could create a hierarchy. Since then I have been asked whether I am going to expand dynamic data to include continuous aggregations and the answers is big YES. Watch this space.