Trading Example Part 2. Manage market data

Part 1 to this series is here. Demo code is here

In this part we will monitor trades and update them with the latest market data. A live trade will typically have market data which is constantly streaming. So the problem to solve is to create the functionality to monitor live trades and keep the market price up to date on each trade.

For this we require 2 streams of data.

  1. Live trade stream
  2. Live market data stream

and we need to join them together and keep them in sync.

The live trade stream is easy using dynamic data.

var stream = tradeService.Trades.Connect()
                .Filter(trade => trade.Status == TradeStatus.Live)

And we’ll take it as a given that we can get the market price via a reactive stream.  The method signature would look something like this.

IObservable<decimal> ObservePrice(string currencyPair)

Each trade has a currency pair so we need to subscribe to streams for each currency pair. There are many ways to crack this particular egg but for the demo, I will first of all group the trades by currency pair. Again dynamic data makes it easy. Just group by currency pair.

var stream = tradeService.Trades.Connect()
                .Filter(trade => trade.Status == TradeStatus.Live)
                .Group(trade => trade.CurrencyPair)

This will give an observable cache of trades for each distinct currency pair. How cool is that? No code at all to maintain the grouping!  When trades are closed they will be removed from the grouping. And even better if a trade is updated to use a different currency pair the trade will be moved from the old group and added to the new one.  Now the trades are grouped by currency pair, we need to subscribe to the price stream and update each trade with the latest price as it changes.  For this I will introduce a new operator ‘SubscribeMany’ which subscribes to each item in a stream.

var stream = = tradeService.Trades.Connect()
  .Filter(trade => trade.Status == TradeStatus.Live)
  .Group(trade => trade.CurrencyPair)
  .SubscribeMany(grouping =>
  {
   //do something cool and return a disposable beacuse it will be
   // disposed when removed from the stream
  })

At a glance this seems nothing special but what it enables is to apply an action when the item is added or updated in a stream and to unsubscribe when an item is removed from a stream. In this case the operation applies to each distinct currency pair in the live trades query. What does this enable? Perhaps best illustrated by the following concrete example.

//...
    .SubscribeMany(grouping =>
   {
     var locker = new object();
     decimal latestPrice = 0;

    //1. Subscribe to price and update trades with the latest price
    var priceHasChanged = MarketPriceService.ObservePrice(grouping.Key)
    .Synchronize(locker)
    .Subscribe(price =>
   {
    latestPrice = price;
    UpdateTradesWithPrice(grouping.Cache.Items, latestPrice);
   });

   //2. Connect to data changes and update with the latest price. Exclude items which are removed from the stream because they are no longer live
   var dataHasChanged = grouping.Cache.Connect()
   .WhereReasonsAre(ChangeReason.Add, ChangeReason.Update)
   .Synchronize(locker)
   .Subscribe(changes => UpdateTradesWithPrice(changes.Select(change => change.Current), latestPrice));

   return new CompositeDisposable(priceHasChanged, dataHasChanged);
 })
.Subscribe()

where UpdateTradesWithPrice is a method as follows

private void UpdateTradesWithPrice(IEnumerable<Trade> trades, decimal price)
{
  foreach (var trade in trades)
  {
     trade.Price = price;
  }
}

Now we are function where live trades will be updated with the latest market data – always. And what’s more the example is totally thread safe. And even better there is very little plumbing code required.

Believe me this is a complex as dynamic data can be.  So if you follow this you can truly run with dynamic data. Also I promise that all subsequent posts will be easy.

Leave a comment