Trading Example Part 4. Filter on calculated values

Intrinsic to collection change notifications, items can be notified by add, update and remove events. If ordering is supported a collection will support a move notification. Of course Dynamic Data supports these. Yet something which is often overlooked is data or functions of data are sometimes by necessity mutable. How can a collection which notifies respond to mutable changes?

To deal with this scenario, dynamic data introduces the concept of an Evaluate notification. This notification forms part of a change set and tells a consumer an item needs to be re-assessed or re-evaluated. This may effect things like filtering or sorting of an item. I feel a concrete example best illustrates this concept. Suppose we have the following function:

Func<Trade,bool> isNearToMarketPrice = trade => return Math.Abs(trade.PercentFromMarket) <= 1 %

Where PercentFromMarket is a calculation which is recalculated with each and every market data tick. Using standard linq a list of near to market trades can be retrieved as follows.

var nearToMaketTrades = myListOfTrades.Where(isNearToMarketPrice);

The manifest problem with this query is it pull based only and has no means of observing when the market price has been recalculated, or alternatively how can re-evaluation be forced. Based on practical experience of dealing with this kind problem I introduced multiple means of injecting the evaluate command into a dynamic data stream. Here I will use a filter controller illustrated in a previous blog and we will apply it to the trade service here.

In summary any of the controllers in dynamic data are used to inject commands and meta data into a stream. For this example we need a dynamic filter which is applied to a stream of trades.

//create a filter controller and set it's filter
var filter = new FilterController<Trade>();
filter.Change(trade => Math.Abs(trade.PercentFromMarket) <= percentFromMarket());

//create a stream of live trades where the trade matches the above filter
var filteredByPercent  = myTradeService.Trades
                         .Connect(trade=>trade.Status==TradeStatus.Live)
                         .Synchronize(locker)
                         .Filter(filter)

The filter controller has an overload to force re-evaluation.

filter.Revalue() // to reevaluate all
// or
filter.Reevaluate(Func<T,bool> itemSelector) //to re-evaluate selected items

The only missing element is when do we invoke re-evalulation. We have 2 choices. Either the service which calculates market prices provides a notification of trades which have been re-calculated or we poll on a period. Option 1 would be suitable for algo trading where everything must happen preferably with zero latency but for simplicity in the example we will poll as follows.

//re-evaluate filter periodically
var reevaluator = Observable.Interval(TimeSpan.FromMilliseconds(250))
                         .Subscribe(_ => filter.Reevaluate());

And that is that. We have a live stream of trades, where closed trades are automatically filtered out from the source and the filter controller constantly re-applies to ensure only trades near to the market are included in the result.   And as ever what is beginning to become my catch phrase – that is easy!

After wrapping the function into a cold observable, here’s the final code.

public IObservable<IChangeSet<Trade, long>> Query(Func<uint> percentFromMarket)
{
    if (percentFromMarket == null) throw new ArgumentNullException("percentFromMarket");
    return Observable.Create<IChangeSet<Trade, long>>
	(observer =>
	 {
	     var locker = new object();
	     Func<Trade,bool> nearToMaketTrades = trade => Math.Abs(trade.PercentFromMarket) <= percentFromMarket();
	     var filter = new FilterController<Trade>(nearToMaketTrades);

	     //re-evaluate filter periodically
	     var reevaluator = Observable.Interval(TimeSpan.FromMilliseconds(250))
		 .Synchronize(locker)
		 .Subscribe(_ => filter.Reevaluate());

	     //filter on live trades matching % specified
	     var subscriber = _tradeService.Trades
		 .Connect(trade=>trade.Status==TradeStatus.Live)
		 .Synchronize(locker)
		 .Filter(filter)
		 .SubscribeSafe(observer);

	     return new CompositeDisposable(subscriber, filter, reevaluator);
	 });
}

This observable will form part of a future post where I want to build the foundation of an auto trading system.

But for now, in a few lines of code (see NearToMarketViewer.cs) we can put the data onto the screen.

Near to market

Demo app

I have been busy creating example code of how to use dynamic dynamic data yet this blog has fallen well behind the code. The demo now has 5 simple yet powerful examples with a 6th in mind. The problem is I love being creative with code too much to be spending my time writing about it. With new years on the way perhaps a resolution is required.

Although I am no designer I like systems to look good. After all during the development cycle F5 can be pressed tens of thousands of times so I have taken the trouble to apply some styles.

A big thanks to MahApps for their awesome wpf library and to Dragablz for creating Chrome style draggable tabs.

Trading Demo Menu

As you can see from the image, the menu page has hyperlinks to the code behind and to this blog. Hopefully this will give a coherent means of seeing how the data on the screen matches up with the code.

Almost forgot, but the demo is on GitHub at https://github.com/RolandPheasant/TradingDemo

Trading Example Part 3. Integrate with UI

This is the third part of the trading example series. If you are not already familiar with the previous parts they are, part 1 is here and part 2 here. Also a fully working demo project is here. Since publishing part 2 I have embellished the sample code to make it look more production like so if you previously downloaded it I suggest you download it again.

So far we have created a trade service and a job which updates the market prices. Now I am going to demonstrate how dynamic data can help to present the data onto a WPF screen.

There are few ingredients we need to throw into the mix:

  1. A filter controller as I want the user to be able to search for trades by entering some text.
  2. A proxy to the Trade object as the market price changes and WPF requires an INotifyPropertyChanged invocation.
  3. An observable collection so the screen can be updated with changed items,

The filter controller  which is used to reapply filters within a dynamic stream  is constructed this

private readonly FilterController<Trade> _filter = new FilterController<Trade>();

I recommend using the dynamic data  version of  observable collection

  private readonly IObservableCollection<TradeProxy> _data = new ObservableCollectionExtended<TradeProxy>();        

which is optimised for binding dynamic streams. If however you choose to use the standard observable collection or the one provided by ReactiveUI, you can but will have to write you own operator to update the bindings. I will discuss how to in a future post.

Finally we need a simple proxy of the trade object.

public class TradeProxy:AbstractNotifyPropertyChanged, IDisposable, IEquatable<TradeProxy>;
 {
    private readonly Trade _trade;
    private readonly IDisposable _cleanUp;

    public TradeProxy(Trade trade)
    {
    _trade = trade;

    //market price changed is a observable on the trade object
    _cleanUp = trade.MarketPriceChanged
                     .Subscribe(_ =>; OnPropertyChanged("MarketPrice"));
    }

 public decimal MarketPrice
 {
      get { return _trade.MarketPrice; }
 }

 // additional members below (not show)

With these elements in place we can now easily get data from the trade service, filter it, convert it to a proxy, bind to an observable collection and dispose the proxy when it is no longer required.

            var loader = tradeService.Trades
                .Connect(trade => trade.Status == TradeStatus.Live) //prefilter live trades only
                .Filter(_filter) // apply user filter
                .Transform(trade => new TradeProxy(trade))
                .Sort(SortExpressionComparer<TradeProxy>.Descending(t => t.Timestamp),SortOptimisations.ComparesImmutableValuesOnly)
                .ObserveOnDispatcher()
                .Bind(_data)   // update observable collection bindings
                .DisposeMany() //since TradeProxy is disposable dispose when no longer required
                .Subscribe();

The only missing code is to apply a user entered filter. It looks something like this:

 var filterApplier =//..watch for changes to the search text bindings
 .Sample(TimeSpan.FromMilliseconds(250))
 .Subscribe(_ =>  {
                     Func<Trade,bool> predicate= //build search predicate;
                     _filter.Change(predicate);
                  }
            );

I will not explain the xaml required as this is beyond dynamic data. But a few lines of xaml bound to the result of observable collection can give this.
Live trades 3

Was that easy? Take a look at the source code LiveTradesViewer.cs. 80 lines of code including white space.

All I say now is don’t tell your boss that you can do all this in a few lines of code otherwise you may have a pay cut!

Trading Example Part 2. Manage market data

Part 1 to this series is here. Demo code is here

In this part we will monitor trades and update them with the latest market data. A live trade will typically have market data which is constantly streaming. So the problem to solve is to create the functionality to monitor live trades and keep the market price up to date on each trade.

For this we require 2 streams of data.

  1. Live trade stream
  2. Live market data stream

and we need to join them together and keep them in sync.

The live trade stream is easy using dynamic data.

var stream = tradeService.Trades.Connect()
                .Filter(trade => trade.Status == TradeStatus.Live)

And we’ll take it as a given that we can get the market price via a reactive stream.  The method signature would look something like this.

IObservable<decimal> ObservePrice(string currencyPair)

Each trade has a currency pair so we need to subscribe to streams for each currency pair. There are many ways to crack this particular egg but for the demo, I will first of all group the trades by currency pair. Again dynamic data makes it easy. Just group by currency pair.

var stream = tradeService.Trades.Connect()
                .Filter(trade => trade.Status == TradeStatus.Live)
                .Group(trade => trade.CurrencyPair)

This will give an observable cache of trades for each distinct currency pair. How cool is that? No code at all to maintain the grouping!  When trades are closed they will be removed from the grouping. And even better if a trade is updated to use a different currency pair the trade will be moved from the old group and added to the new one.  Now the trades are grouped by currency pair, we need to subscribe to the price stream and update each trade with the latest price as it changes.  For this I will introduce a new operator ‘SubscribeMany’ which subscribes to each item in a stream.

var stream = = tradeService.Trades.Connect()
  .Filter(trade => trade.Status == TradeStatus.Live)
  .Group(trade => trade.CurrencyPair)
  .SubscribeMany(grouping =>
  {
   //do something cool and return a disposable beacuse it will be
   // disposed when removed from the stream
  })

At a glance this seems nothing special but what it enables is to apply an action when the item is added or updated in a stream and to unsubscribe when an item is removed from a stream. In this case the operation applies to each distinct currency pair in the live trades query. What does this enable? Perhaps best illustrated by the following concrete example.

//...
    .SubscribeMany(grouping =>
   {
     var locker = new object();
     decimal latestPrice = 0;

    //1. Subscribe to price and update trades with the latest price
    var priceHasChanged = MarketPriceService.ObservePrice(grouping.Key)
    .Synchronize(locker)
    .Subscribe(price =>
   {
    latestPrice = price;
    UpdateTradesWithPrice(grouping.Cache.Items, latestPrice);
   });

   //2. Connect to data changes and update with the latest price. Exclude items which are removed from the stream because they are no longer live
   var dataHasChanged = grouping.Cache.Connect()
   .WhereReasonsAre(ChangeReason.Add, ChangeReason.Update)
   .Synchronize(locker)
   .Subscribe(changes => UpdateTradesWithPrice(changes.Select(change => change.Current), latestPrice));

   return new CompositeDisposable(priceHasChanged, dataHasChanged);
 })
.Subscribe()

where UpdateTradesWithPrice is a method as follows

private void UpdateTradesWithPrice(IEnumerable<Trade> trades, decimal price)
{
  foreach (var trade in trades)
  {
     trade.Price = price;
  }
}

Now we are function where live trades will be updated with the latest market data – always. And what’s more the example is totally thread safe. And even better there is very little plumbing code required.

Believe me this is a complex as dynamic data can be.  So if you follow this you can truly run with dynamic data. Also I promise that all subsequent posts will be easy.

Trading Example Part 1. Create a trade service

One of the things I love about dynamic data is no matter what the nature of the data which you are dealing with the code will look the same. I also love the power of data transformations and just how easy it is to tame sequences of data. However a lot of the capability and power is not obvious at a glance so this is the start of a series of posts where step by step I will illustrate usage of the library. I have decided to go in at the deep end and demonstrate a trading system as these systems generally have rapidly moving data and loads of complexity. What I hope to achieve is to show that by using dynamic data your code can be simplified greatly.

Source code can be found here

We will start off with a simple trade object

    public class Trade
    {
        public long Id { get; private set; }
        public string CurrencyPair { get; private set; }
        public string Customer { get; private set; }
        public decimal Price { get; set; }
        public TradeStatus Status { get; private set; }

        public Trade(long id, string customer, string currencyPair, TradeStatus status)
        {
            Id = id;
            Customer = customer;
            CurrencyPair = currencyPair;
            Status = status;
        }
    }

I want a service which exposes a load of trades.

    public interface ITradeService
    {
        IObservableCache<Trade, long> Trades { get; }
    }

Implementing this is a trivial matter. For clarity not all code is shown.

    public class TradeService : ITradeService
    {
        private readonly IObservableCache<Trade, long> _tradesCache;

        public TradeService()
        {
            //create a cache specifying that the unique key is id
            var tradesSource = new SourceCache<Trade, long>(trade => trade.Id);

            //call AsObservableCache() to hide the update methods as we are exposing the cache
            _tradesCache = tradesSource.AsObservableCache();

           //code to emulate an external trade provider
            var tradeLoader = GenerateTradesAndMaintainCache();

            //.....
        }

        public IObservableCache<Trade, long> Trades
        {
            get { return _tradesCache; }
        }
    }

We now have a trades service which I will be using in subsequent posts to demonstrate the functionality of dynamic data.

Part 2

Binding Example

This is the first in a series of real world dynamic data examples.

Lets’s assume I have a data  which is exposed via a service.

 public interface IPersonService
 {
      IObservableCache<Person, PersonKey> Cache { get; }
 }

and I want to bind the data to a screen. In dynamic data it is done like this.

 //this is a simple extension of observable collection which comes as part of the package
 var collection = new ObservableCollectionExtended<Person>();
 
 //Dynamic data is an extension of reactive so subscribe at the end of the chain
 var loader = personService.Cache.Connect()
 .Sort(SortExpressionComparer<Person>.Ascending(p=>p.FirstName).ThenByAscending(p=>p.LastName))
 .ObserveOnDispatcher()
 .Bind(collection)
 .Subscribe(); 

The binding will always reflect the data in the cache (and in the stated sort order).  That was easy so let’s extend this so we can apply a filter. This is also easy.

var loader = personService.Cache.Connect().Filter(p=>p.Age>50) 

or use a filter controller which allows the filter to be changed any time.

 //create a filter controller 
 var filterController = new FilterController<Person>();
 
 //filter can be changed any time by applying the following method
 filterController.Change(p=>p.Age>50);

 var loader = personService.Cache.Connect().Filter(filterController)

And that is that.  We can filter, sort and bind changeable data with very little code.

What is Dynamic Data?

Dynamic Data is a portable class library which brings the power of Reactive Extensions (Rx) to collections.

Mutable collections frequently experience additions, updates, and removals (among other changes). Dynamic Data provides two collection implementations, an observable list and an observable cache that expose changes to the collection via an observable change set. The resulting observable change sets can be manipulated and transformed using Dynamic Data’s robust and powerful array of change set operators. These operators receive change notifications, apply some logic, and subsequently provide their own change notifications. Because of this, operators are fully composable and can be chained together to perform powerful and very complicated operations while maintaining simple, fluent code.

Using Dynamic Data’s collections and change set operators make in-memory data management extremely easy and can reduce the size and complexity of your code base by abstracting complicated and often repetitive collection based operations.

An example please

Given a Trade object create an observable list like this

var myTrades = new SourceList<Trade>();

or a cache which requires that a unique key is specified

var myTrades = new SourceCache<Trade,long>(trade => trade.Id);

Either of these collections are made observable by calling the Connect() method which produces an observable change set.

var myObservableTrades = myTrades.Connect();

This example first filters a stream of trades to select only live trades, then creates a proxy for each live trade, and finally orders the results by most recent first. The resulting trade proxies are bound on the dispatcher thread to the specified observable collection.

ReadOnlyObservableCollection<TradeProxy> data;

var loader = myObservableTrades
    .Filter(trade => trade.Status == TradeStatus.Live) //filter on live trades only
    .Transform(trade => new TradeProxy(trade))         //create a proxy
    .Sort(SortExpressionComparer<TradeProxy>.Descending(t => t.Timestamp)) 
    .ObserveOnDispatcher()          //ensure operation is on the UI thread
    .Bind(out data)         //Populate the observable collection
    .DisposeMany()          //Dispose TradeProxy when no longer required
    .Subscribe();  

Since the TradeProxy object is disposable, the DisposeMany operator ensures that the proxy objects are disposed when they are no longer part of this observable stream.

This is the tip of the iceberg. The observable cache over 60 operators and the observable list has around 35 operators and counting.

Want to find out more?

Read the following for a quick overview

1. Observable Cache at a glance
2. Observable List at a glance
3. Getting Started
4. Binding example

Download the demo and source code

1. Demo WPF trading system demo on GitHub
2. Demo WPF reactive tree demo on GitHub
3. Dynamic Data source code on GitHub

Some funky UI specific stuff

1. Dynamically filter, sort and page
2. Reactive Tree
3. Advanced search hints with logical collection operators
4. Aggregation of dynamic data

Work through the ‘Creating a trading system’ series

1. Expose the data as a service
2. Manage market data
3. Integration with the UI
4. Filter on calculated values
5. Aggregation of dynamic data

Example integration with ReactiveUI

1. Integration with ReactiveUI for a simple example.
2. Log entry viewer for an advanced example.

Install from Nuget

1. Dynamic Data
or 2. Dynamic Data Adaptors for Reactive UI if you are already using ReactiveUI