Integration with ReactiveUI

I have released DynamicData.ReactiveUI which is a very simple adaptor layer to assist with binding dynamic data observables with reactiveui’s ReactiveList object.

Install from Nuget

Nuget

If you are not familiar with dynamic data and what it enables I suggest:

  1. Go through previous posts in the blog
  2. Download the example wpf app from github
  3. .

To cut to the chase, here’s an example. The code takes existing in-memory trade objects, transforms them into a view model proxy and updates the target reactive list object with the resulting change sets. It also pre-filters the data with live trades, applies a user entered filter, orders the resulting data and disposes the proxy when no longer required. Phew – all that in effectively one line of code.

I did the same example without using Reactive UI code see Trading Example Part 3. Integrate with UI where I have explained the code in greater detail

    public class RxUiViewer : ReactiveObject, IDisposable
    {
       //this is the target list which we will populate from the dynamic data stream
        private readonly ReactiveList<TradeProxy> _data = new ReactiveList<TradeProxy>();
        //the filter controller is used to inject filtering into a observable
        private readonly FilterController<Trade> _filter = new FilterController<Trade>();
        private readonly IDisposable _cleanUp;
        private string _searchText;

        public RxUiViewer(ITradeService tradeService)
        {
            //Change the filter when the user entered search text changes
            var filterApplier = this.WhenAnyValue(x => x.SearchText)
                .Throttle(TimeSpan.FromMilliseconds(250))
                .Subscribe(_ => ApplyFilter());
           
            ApplyFilter();

            var loader = tradeService.Trades
                .Connect(trade => trade.Status == TradeStatus.Live) //prefilter live trades only
                .Filter(_filter)    // apply user filter
                   //if targetting Net4 or Net45 platform can use parallelisation for transforms 'cause it's quicker
                .Transform(trade => new TradeProxy(trade), new ParallelisationOptions(ParallelType.Ordered, 5))
                .Sort(SortExpressionComparer<TradeProxy>.Descending(t => t.Timestamp), SortOptimisations.ComparesImmutableValuesOnly)
                .ObserveOn(RxApp.MainThreadScheduler)
                .Bind(_data)        //bind the results to the ReactiveList 
                .DisposeMany()      //since TradeProxy is disposable dispose when no longer required
                .Subscribe();

            _cleanUp = new CompositeDisposable(loader, _filter, filterApplier);
        }

        private void ApplyFilter()
        {
            if (string.IsNullOrEmpty(SearchText))
            {
                _filter.ChangeToIncludeAll();
            }
            else
            {
                _filter.Change(t => t.CurrencyPair.Contains(SearchText, StringComparison.OrdinalIgnoreCase) ||
                                    t.Customer.Contains(SearchText, StringComparison.OrdinalIgnoreCase));
            }
        }

        public string SearchText
        {
            get { return _searchText; }
            set { this.RaiseAndSetIfChanged(ref _searchText, value); }
        }

        public IReadOnlyReactiveList<TradeProxy> Data
        {
            get { return _data; }
        }

        public void Dispose()
        {
            _cleanUp.Dispose();
        }
    }

coupled with a little xaml can produce this

ReactiveUIImage

Very little code, but a very powerful example,

Advertisement

Gone Portable

Dynamic data is now a portable class library available on most platforms. See below.

Portable

Additionally there is a separate dotnet 4.0 library because I know there are enterprises out there stuck in the old days (investment banks maybe?).

Now you can do some cool rx for collections stuff of WP8, IOS and Android as well as windows desktop and server.

I will explain what the plinq assemblies enable in a post in the near future.

Trading Example Part 4. Filter on calculated values

Intrinsic to collection change notifications, items can be notified by add, update and remove events. If ordering is supported a collection will support a move notification. Of course Dynamic Data supports these. Yet something which is often overlooked is data or functions of data are sometimes by necessity mutable. How can a collection which notifies respond to mutable changes?

To deal with this scenario, dynamic data introduces the concept of an Evaluate notification. This notification forms part of a change set and tells a consumer an item needs to be re-assessed or re-evaluated. This may effect things like filtering or sorting of an item. I feel a concrete example best illustrates this concept. Suppose we have the following function:

Func<Trade,bool> isNearToMarketPrice = trade => return Math.Abs(trade.PercentFromMarket) <= 1 %

Where PercentFromMarket is a calculation which is recalculated with each and every market data tick. Using standard linq a list of near to market trades can be retrieved as follows.

var nearToMaketTrades = myListOfTrades.Where(isNearToMarketPrice);

The manifest problem with this query is it pull based only and has no means of observing when the market price has been recalculated, or alternatively how can re-evaluation be forced. Based on practical experience of dealing with this kind problem I introduced multiple means of injecting the evaluate command into a dynamic data stream. Here I will use a filter controller illustrated in a previous blog and we will apply it to the trade service here.

In summary any of the controllers in dynamic data are used to inject commands and meta data into a stream. For this example we need a dynamic filter which is applied to a stream of trades.

//create a filter controller and set it's filter
var filter = new FilterController<Trade>();
filter.Change(trade => Math.Abs(trade.PercentFromMarket) <= percentFromMarket());

//create a stream of live trades where the trade matches the above filter
var filteredByPercent  = myTradeService.Trades
                         .Connect(trade=>trade.Status==TradeStatus.Live)
                         .Synchronize(locker)
                         .Filter(filter)

The filter controller has an overload to force re-evaluation.

filter.Revalue() // to reevaluate all
// or
filter.Reevaluate(Func<T,bool> itemSelector) //to re-evaluate selected items

The only missing element is when do we invoke re-evalulation. We have 2 choices. Either the service which calculates market prices provides a notification of trades which have been re-calculated or we poll on a period. Option 1 would be suitable for algo trading where everything must happen preferably with zero latency but for simplicity in the example we will poll as follows.

//re-evaluate filter periodically
var reevaluator = Observable.Interval(TimeSpan.FromMilliseconds(250))
                         .Subscribe(_ => filter.Reevaluate());

And that is that. We have a live stream of trades, where closed trades are automatically filtered out from the source and the filter controller constantly re-applies to ensure only trades near to the market are included in the result.   And as ever what is beginning to become my catch phrase – that is easy!

After wrapping the function into a cold observable, here’s the final code.

public IObservable<IChangeSet<Trade, long>> Query(Func<uint> percentFromMarket)
{
    if (percentFromMarket == null) throw new ArgumentNullException("percentFromMarket");
    return Observable.Create<IChangeSet<Trade, long>>
	(observer =>
	 {
	     var locker = new object();
	     Func<Trade,bool> nearToMaketTrades = trade => Math.Abs(trade.PercentFromMarket) <= percentFromMarket();
	     var filter = new FilterController<Trade>(nearToMaketTrades);

	     //re-evaluate filter periodically
	     var reevaluator = Observable.Interval(TimeSpan.FromMilliseconds(250))
		 .Synchronize(locker)
		 .Subscribe(_ => filter.Reevaluate());

	     //filter on live trades matching % specified
	     var subscriber = _tradeService.Trades
		 .Connect(trade=>trade.Status==TradeStatus.Live)
		 .Synchronize(locker)
		 .Filter(filter)
		 .SubscribeSafe(observer);

	     return new CompositeDisposable(subscriber, filter, reevaluator);
	 });
}

This observable will form part of a future post where I want to build the foundation of an auto trading system.

But for now, in a few lines of code (see NearToMarketViewer.cs) we can put the data onto the screen.

Near to market