Trading Example Part 2. Manage market data

Part 1 to this series is here. Demo code is here

In this part we will monitor trades and update them with the latest market data. A live trade will typically have market data which is constantly streaming. So the problem to solve is to create the functionality to monitor live trades and keep the market price up to date on each trade.

For this we require 2 streams of data.

  1. Live trade stream
  2. Live market data stream

and we need to join them together and keep them in sync.

The live trade stream is easy using dynamic data.

var stream = tradeService.Trades.Connect()
                .Filter(trade => trade.Status == TradeStatus.Live)

And we’ll take it as a given that we can get the market price via a reactive stream.  The method signature would look something like this.

IObservable<decimal> ObservePrice(string currencyPair)

Each trade has a currency pair so we need to subscribe to streams for each currency pair. There are many ways to crack this particular egg but for the demo, I will first of all group the trades by currency pair. Again dynamic data makes it easy. Just group by currency pair.

var stream = tradeService.Trades.Connect()
                .Filter(trade => trade.Status == TradeStatus.Live)
                .Group(trade => trade.CurrencyPair)

This will give an observable cache of trades for each distinct currency pair. How cool is that? No code at all to maintain the grouping!  When trades are closed they will be removed from the grouping. And even better if a trade is updated to use a different currency pair the trade will be moved from the old group and added to the new one.  Now the trades are grouped by currency pair, we need to subscribe to the price stream and update each trade with the latest price as it changes.  For this I will introduce a new operator ‘SubscribeMany’ which subscribes to each item in a stream.

var stream = = tradeService.Trades.Connect()
  .Filter(trade => trade.Status == TradeStatus.Live)
  .Group(trade => trade.CurrencyPair)
  .SubscribeMany(grouping =>
  {
   //do something cool and return a disposable beacuse it will be
   // disposed when removed from the stream
  })

At a glance this seems nothing special but what it enables is to apply an action when the item is added or updated in a stream and to unsubscribe when an item is removed from a stream. In this case the operation applies to each distinct currency pair in the live trades query. What does this enable? Perhaps best illustrated by the following concrete example.

//...
    .SubscribeMany(grouping =>
   {
     var locker = new object();
     decimal latestPrice = 0;

    //1. Subscribe to price and update trades with the latest price
    var priceHasChanged = MarketPriceService.ObservePrice(grouping.Key)
    .Synchronize(locker)
    .Subscribe(price =>
   {
    latestPrice = price;
    UpdateTradesWithPrice(grouping.Cache.Items, latestPrice);
   });

   //2. Connect to data changes and update with the latest price. Exclude items which are removed from the stream because they are no longer live
   var dataHasChanged = grouping.Cache.Connect()
   .WhereReasonsAre(ChangeReason.Add, ChangeReason.Update)
   .Synchronize(locker)
   .Subscribe(changes => UpdateTradesWithPrice(changes.Select(change => change.Current), latestPrice));

   return new CompositeDisposable(priceHasChanged, dataHasChanged);
 })
.Subscribe()

where UpdateTradesWithPrice is a method as follows

private void UpdateTradesWithPrice(IEnumerable<Trade> trades, decimal price)
{
  foreach (var trade in trades)
  {
     trade.Price = price;
  }
}

Now we are function where live trades will be updated with the latest market data – always. And what’s more the example is totally thread safe. And even better there is very little plumbing code required.

Believe me this is a complex as dynamic data can be.  So if you follow this you can truly run with dynamic data. Also I promise that all subsequent posts will be easy.

Advertisement

Trading Example Part 1. Create a trade service

One of the things I love about dynamic data is no matter what the nature of the data which you are dealing with the code will look the same. I also love the power of data transformations and just how easy it is to tame sequences of data. However a lot of the capability and power is not obvious at a glance so this is the start of a series of posts where step by step I will illustrate usage of the library. I have decided to go in at the deep end and demonstrate a trading system as these systems generally have rapidly moving data and loads of complexity. What I hope to achieve is to show that by using dynamic data your code can be simplified greatly.

Source code can be found here

We will start off with a simple trade object

    public class Trade
    {
        public long Id { get; private set; }
        public string CurrencyPair { get; private set; }
        public string Customer { get; private set; }
        public decimal Price { get; set; }
        public TradeStatus Status { get; private set; }

        public Trade(long id, string customer, string currencyPair, TradeStatus status)
        {
            Id = id;
            Customer = customer;
            CurrencyPair = currencyPair;
            Status = status;
        }
    }

I want a service which exposes a load of trades.

    public interface ITradeService
    {
        IObservableCache<Trade, long> Trades { get; }
    }

Implementing this is a trivial matter. For clarity not all code is shown.

    public class TradeService : ITradeService
    {
        private readonly IObservableCache<Trade, long> _tradesCache;

        public TradeService()
        {
            //create a cache specifying that the unique key is id
            var tradesSource = new SourceCache<Trade, long>(trade => trade.Id);

            //call AsObservableCache() to hide the update methods as we are exposing the cache
            _tradesCache = tradesSource.AsObservableCache();

           //code to emulate an external trade provider
            var tradeLoader = GenerateTradesAndMaintainCache();

            //.....
        }

        public IObservableCache<Trade, long> Trades
        {
            get { return _tradesCache; }
        }
    }

We now have a trades service which I will be using in subsequent posts to demonstrate the functionality of dynamic data.

Part 2

Binding Example

This is the first in a series of real world dynamic data examples.

Lets’s assume I have a data  which is exposed via a service.

 public interface IPersonService
 {
      IObservableCache<Person, PersonKey> Cache { get; }
 }

and I want to bind the data to a screen. In dynamic data it is done like this.

 //this is a simple extension of observable collection which comes as part of the package
 var collection = new ObservableCollectionExtended<Person>();
 
 //Dynamic data is an extension of reactive so subscribe at the end of the chain
 var loader = personService.Cache.Connect()
 .Sort(SortExpressionComparer<Person>.Ascending(p=>p.FirstName).ThenByAscending(p=>p.LastName))
 .ObserveOnDispatcher()
 .Bind(collection)
 .Subscribe(); 

The binding will always reflect the data in the cache (and in the stated sort order).  That was easy so let’s extend this so we can apply a filter. This is also easy.

var loader = personService.Cache.Connect().Filter(p=>p.Age>50) 

or use a filter controller which allows the filter to be changed any time.

 //create a filter controller 
 var filterController = new FilterController<Person>();
 
 //filter can be changed any time by applying the following method
 filterController.Change(p=>p.Age>50);

 var loader = personService.Cache.Connect().Filter(filterController)

And that is that.  We can filter, sort and bind changeable data with very little code.