Trading Demo Part 5 – Aggregate Dynamic Data

This is part 5 of building a trading system series. The idea has been to take one source of rapidly changing trading data and demonstrate the range, power and simplicity of applying dynamic data operators to it. Throughout the series I have made the assumption that the reader is already familiar and comfortable with the absolutely brilliant reactive extensions framework. For those who are not I strongly recommend to check them out. There is a steep learning curve but I assure anyone that once you start thinking reactive you will never turn back. You will probably wonder how you ever programmed without it.

If you are new to dynamic data I suggest you first read this Getting Started and if you have not seen the previous parts they are

1. Expose the data as a service
2. Manage market data
3. Integration with the UI
4. Filter on calculated values

Also if you want to download the code which goes with this example, go to Trading Demo on GitHub and download and run in visual studio. Open the menu option labelled Trading Positions and you will see the following screen.

Aggregated Positions

This screen connects to the trade service data, groups by currency pair and aggregates the totals for each grouping of currency pairs. With dynamic data doing this becomes so simple with very little code involved as I am about to show.

The starting point is to group the trade data by currency pair. This is achieved using dynamic data’s Group operator.

//where tradeService is the trade service created in Part 1.
var mygroup = tradeService.Live.Connect()
                          .Group(trade => trade.CurrencyPair)

This does not look like much but the group operator is very powerful indeed. As with all things reactive, changes in the trade service are automatically propagated to the appropriate grouping. Dynamic Data extends the reactive framework by accounting for adds, updates and removes. So when trades are created, closed or amended the groups will automatically reflect these changes.

The grouping object creates a cache of each group and it’s signature is like this.

public interface IGroup<TObject, TKey, out TGroupKey>
{
    TGroupKey Key {get;}
    IObservableCache<TObject, TKey> Cache { get; }
}

Three generic types, a bit ugly I admit but we have a dynamic collection of caches, one for each item matching the group selector in the underlying data source. When grouping I almost always apply a transform function to do something with the group. In this example we want to aggregate totals for each group, so first I have created an object which accepts the grouping in it’s constructor and apply some dynamic aggregations on the cache. For this we need to use dynamic data’s Transform operator. This is akin to the standard reactive Select operator but as with several other operators in dynamic data I have to use an alternative semantic dues to ambiguous reference problems for extensions.

var transformedGroup = tradeService.Live.Connect()
                .Group(trade => trade.CurrencyPair)
                .Transform(group => new CurrencyPairPosition(group))

This means for each group we have a CurrencyPairPosition object which will be used to dynamically calculate the sum total of the buys and sells so that the overall position can be calculated. For the aggregation I have used another dynamic data operator QueryWhenChanged. The following exert illustrates it’s usage

//constructor only for illustration
public CurrencyPairPosition(IGroup<Trade, long, string> tradesByCurrencyPair)
{
    var aggregations = tradesByCurrencyPair.Cache.Connect()
                        .QueryWhenChanged(query => //query and return a result
}

For each change in the underlying cache the operator is invoked. The parameter exposes a querying API so the underling cache can be queried in a functional manner. The consumer must return a result which becomes an observable of whatever is returned. In this example we have used some standard linq to objects to calculate the buy and sell totals and return a new object which contains these values.

If we expand out the above code we easily create an observable of trade positions for each currency pair,

  var aggregations = tradesByCurrencyPair .Cache.Connect()
                       .QueryWhenChanged(query =>
                        {
                            var buy = query.Items
                                        .Where(trade => trade.BuyOrSell == BuyOrSell.Buy)
                                        .Sum(trade=>trade.Amount);
                            var sell = query.Items
                                        .Where(trade => trade.BuyOrSell == BuyOrSell.Sell)
                                        .Sum(trade => trade.Amount);
                            var count = query.Count;
                            return new TradesPosition(buy,sell,count);
                        });
                //subscribe to the result and set a property to the latest

We are at the point where we have a CurrencyPairPosition object which always reflects the latest position so the last things is to expand the consuming code so we can bind to the collection of these.

 //not all code show and _data is a derived Observable collection
  var subscriber = tradeService.Live.Connect()
                .Group(trade => trade.CurrencyPair)
                .Transform(group => new CurrencyPairPosition(group))
                .Sort(SortExpressionComparer<CurrencyPairPosition>.Ascending(t => t.CurrencyPair))
                .ObserveOn(schedulerProvider.MainThread)
                .Bind(_data)
                .DisposeMany()
                .Subscribe();

This will maintain the sorted order and ensure that CurrencyPairPosition objects are disposed when no longer required. Now a little xaml bound to the data property can produce the screen at the top of the page.

In summary, Dynamic Data makes the reactive management of collections of data very very easy indeed. If you look at the code list below, you will see just how little is involved. There are about 170 lines of code over three c# classes which also includes namespace and using declarations. And what’s even better is despite the underlying data rapidly moving, nowhere have I had to concern myself with thread safety and all the operators are completely thread-safe so long as all code functionally remains inside the operators.

The full code for all the objects is

1. PositionsViewer.cs (The view model)
2. CurrencyPairPosition.cs
3. TradesPosition.cs (the object which holds the result)
4. PositionsView.xaml (the screen)

Advertisements

How to use the Material Design theme with Dragablz Tab Control

In this post I will demonstrate how to – very quickly – combine Dragablz and MaterialDesignColors in WPF to create a great looking control which supports full tear out and can use the Google Material Design colour palette.

Dragablz Tab Contrtol and Material Design Dragablz Tab Contrtol and Material Design

Start a new WPF project.  We rely on two NuGet packages, so get them installed straight away.  Install from the Package Manager tool in Visual Studio, or, from the NuGet console run these commands:

In the MainWindow.xaml, setup a simple usage of Dragablz TabablzControl:

Already if you run this project you will have a tab control that supports Chrome-style tearing out of tabs. But it wont look too good. So, the next step is to bring in the Material Design colours, and tell Dragablz to use the Material Design style.

Open up your App.xaml. We have to merge in three dictionaries.  The first two are to set up your…

View original post 142 more words

Getting Started

Although I have been blogging about dynamic data I have so far omitted any specific documentation. This is because dynamic data is functionally very rich and hence there is such a huge amount to document. Frankly with over 50 operators to explain I have been daunted. But at last I am on the case, so this is the beginning.

The documents live on Dynamic data documents on GitHub and for the next month or two I will keep updating these.

The core concept

It is perhaps easiest to think of dynamic data as reactive extensions (rx) for collections but more accurately dynamic data is a bunch of rx operators based on the concept of an observable change set. The change set notifies listeners of any changes to an underlying source and has the following signature.

IObservable<IChangeSet<TObject,TKey>> myFirstObservableChangeSet;

where IChangeSet represents a set of adds, updates, removes and moves (for sort dependent operators). Each observer receives the changes, applies some logic and in turn notifies it’s own changeset. In this way complex chains of operators can easily be chained together.

The only constraint of dynamic data is an object needs to have a key specified. This was a design choice right from the beginning as the internals of dynamic data need to identify any object and be able to look it up quickly and efficiently.

Creating an observable change set

To open up the world of dynamic data to any object, we need to feed the data into some mechanism which produces the observable change set. Unless you are creating a custom operator then there is no need to directly create one as there are several out of the box means of doing so.

The easiest way is to feed directly into dynamic data from an standard rx observable.

IObservable<T> myObservable;
IObservable<IEnumerable<T>> myObservable;
// Use the hashcode for the key
var mydynamicdatasource = myObservable.ToObservableChangeSet();
// or specify a key like this
var mydynamicdatasource = myObservable.ToObservableChangeSet(t=> t.key);

The problem with the above is the collection will grow forever so there are overloads to specify size limitation or expiry times (not shown).

To have much more control over the root collection then we need an in-memory data store which has the requisite add, update and remove methods. Like the above the cache can be created with or without specifying a key

// Use the hash code for the key
var mycache  = new SourceCache<TObject>();
// or specify a key like this
var mycache  = new SourceCache<TObject,TKey>(t => t.Key);

The cache produces an observable change set via it’s connect methods.

var oberverableChangeSet = mycache.Connect();

Another way is to directly from an observable collection, you can do this

var myobservablecollection= new ObservableCollection<T>();
// Use the hashcode for the key
var mydynamicdatasource = myobservablecollection.ToObservableChangeSet();
// or specify a key like this
var mydynamicdatasource = myobservablecollection.ToObservableChangeSet(t => t.Key);

This method is only recommended for simple queries which act only on the UI thread as ObservableCollection is not thread safe.

One other point worth making here is any observable change set can be converted into a cache.

var mycache = somedynamicdatasource.AsObservableCache();

This cache has the same connection methods as a source cache but is read only.

Examples

Now you know how to create the source observable, here are some few quick fire examples. But first, what is the expected behaviour or any standard conventions? Simple answer to that one.

  1. All operators must comply with the Rx guidelines.
  2. When an observer subscribes the initial items of the underlying source always form the first batch of changes.
  3. Empty change sets should never be fired.

In all of these examples the resulting sequences always exactly reflect the items is the cache. This is where the power of add, update and removes comes into it’s own as all the operations are maintained with no consumer based plumbing.

Example 1: filters a stream of live trades, creates a proxy for each trade and orders the result by most recent first. As the source is modified the observable collection will automatically reflect changes.

//Dynamic data has it's own take on an observable collection (optimised for populating from dynamic data observables)
var list = new ObservableCollectionExtended<TradeProxy>();
var myoperation = somedynamicdatasource
                    .Filter(trade=>trade.Status == TradeStatus.Live) 
                    .Transform(trade => new TradeProxy(trade))
                    .Sort(SortExpressionComparer<TradeProxy>.Descending(t => t.Timestamp))
                    .ObserveOnDispatcher()
                    .Bind(list) 
                    .DisposeMany()
                    .Subscribe()

Oh and I forgot to say, TradeProxy is disposable and DisposeMany() ensures items are disposed when no longer part of the stream.

Example 2: for filters which can be dynamically changed, we can use a filter controller

var filtercontroller = new FilterController<Trade>()
var myoperation = somedynamicdatasource.Filter(filtercontroller) 

//can invoke a filter change any time
filtercontroller.Change(trade=>//return some predicate);

Example 3: produces a stream which is grouped by status. If an item changes status it will be moved to the new group and when a group has no items the group will automatically be removed.

var myoperation = somedynamicdatasource
                    .Group(trade=>trade.Status) //This is NOT Rx's GroupBy 

Example 4: Suppose I am editing some trades and I have an observable on each trades which validates but I want to know when all items are valid then this will do the job.

IObservable<bool> allValid = somedynamicdatasource
                    .TrueForAll(trade => trade.IsValidObservable, (trade, isvalid) => isvalid)

This operator flattens the observables and returns the combined state in one line of code. I love it.

Example 5: will wire and un-wire items from the observable when they are added, updated or removed from the source.

var myoperation = somedynamicdatasource.Connect() 
                .MergeMany(trade=> trade.ObservePropertyChanged(t=>t.Amount))
                .Subscribe(ObservableOfAmountChangedForAllItems=>//do something with IObservable<PropChangedArg>)

Example 6: Produces a distinct change set of currency pairs

var currencyPairs= somedynamicdatasource
                    .DistinctValues(trade => trade.CurrencyPair)

The above examples are just a brief glimpse of what dynamic data can do. It will all be documented in time.

Want to know more?

There is so much more which will be documented but if you want to find out more I suggest:
Download the WPF trading example and go through the example screens
or try it out directly Dynamic data on Nuget