Log Entry Viewer Using Dynamic Data and ReactiveUI

Log Entry Viewer

In this post I show how to create a log entry viewer which uses both Dynamic Data and ReactiveUI. This example screen allows viewing, filtering and removal of log entry items from memory.

There are several features which I hope act as a good example of dynamic data, reactive ui and where reactive ui and dynamic data can compliment each other. This post is long as there is loads of explanation required yet the implementation is reasonably easy and straight forward.

All the code is in the demo project is here Dynamic data demo project on GitHub
The main view model is here LogEntryViewer.cs
The log entry service code LogEntryService.cs

Apart from some domain objects and a little xaml, the above classes consisting of a few hundred lines of code is all that is required to implement the screen. It is easy once you understand a few key concepts about dynamic data and a few key concepts about reactive ui. I hope after you read this article and gone though the code, you will think the same.

1. Create an log entry observable

We need to get a reactive log stream of log entry elements. Using Log4Net we can create a custom appender see ReactiveLogAppender.cs. The appended publishes each log entry to a static subject ISubject. This subject is exposed via a static property ReactiveLogAppender.LogEntryObservable {get;} and will be used by a service to load and manage log entries.

2. Populate an observable cache and expose as a service

There are overloads in dynamic data which convert a standard rx observable into a cache. For example you could do the following which takes the log entry observable, converts it first of all into an observable change set, then caches the result. The size limiter is optional and ensures the cache does not indefinitely grow. When the size limit is reached the oldest items are removed from the internal cache. There is also an overload for time limited expiration of items.

var logEntryCache = ReactiveLogAppender.LogEntryObservable
                     .ToObservableChangeSet(le=>le.Key,limitSizeTo: 10000) 
                     .AsObservableCache(); 

This code results in a read only observable cache. But for this example I want to expose methods to remove items, so instead I create an editable cache as follows. Since this is a cache, I have had to specify a unique key which for the log entry domain object is a counter.

var editableCache = new SourceCache<LogEntry, long>(l => l.Key);
//limit size
var sizeLimiter = editableCache.LimitSizeTo(10000).Subscribe();
//load from observable
var loader = ReactiveLogAppender.LogEntryObservable
                    .Subscribe(editableCache.AddOrUpdate);

If you want to make the cache read only, call the following method.

var readonlycache=editableCache.AsObservableCache();

For this example we will expose the cache via a service

public interface ILogEntryService
{
  IObservableCache<LogEntry, long> Items { get; }
  void Add(LogEntry items);
  void Remove(IEnumerable<LogEntry> items);
  void Remove(IEnumerable<long> keys);
}

Implementing these methods is trivial so I will not bore you will all the details of the service . The complete code is here LogEntryService.cs. This service is instantiated when the system starts as we need to catch all log entries even before the user opens the log entry view screen.

3. Create the view model, using features from both dynamic data and reactive ui

There is a lot happening in this screen so I will break it down into it’s functional parts

a) Populate a reactive list and enable user filtering

I have written extensively in previous posts about filtering and loading data into an observable collection using dynamic data observables. But to recap in dynamic data any adds, updates and removes are notified via an observable change set. Each operator is responsible for responding to a source observable, doing something and then in turn publishing it’s own change set. This way long and complicated transformation streams of data can be composed with very simple fluent and declarative code.

For the log entry example we take the data from log entry service and create an observable which processes the result and binds the data to the screen using the following code.

//this is the target list which is bound to the view
var reactiveList = new ReactiveList<LogEntryProxy>();

//a controller which allows changing a filter any time
var filter = = new FilterController<LogEntryProxy>(l => true);

 //filter, sort and populate reactive list,
var dataLoader = logEntryService.Items.Connect() //connect to the cache
     .Transform(le => new LogEntryProxy(le))     //Create a proxy
     .Filter(filter)                             //Apply a dynamic filter
     .Sort(SortExpressionComparer<LogEntryProxy>.Descending(l => l.TimeStamp))
     .ObserveOn(RxApp.MainThreadScheduler)
     .Bind(reactiveList)                          
     .DisposeMany()                              //automatic disposal
     .Subscribe();

The above code takes data from the log entry service, creates a proxy, sorts by most recent first, binds to the reactive list and finally when the proxy is finished with it is disposed.

The missing ingredient is the code to apply a filter. For this we need a string field with a 2 way binding to a view, and when it changes a new filter is build and applied.

//Binding to allow a user to to enter some text 
 public string SearchText
 {
     get { return _searchText; }
     set { this.RaiseAndSetIfChanged(ref _searchText, value); } 
 }

//using reactive ui operator to respond to any change 
var filterApplier = this.WhenAnyValue(x => x.SearchText)
                .Throttle(TimeSpan.FromMilliseconds(250))
                .Select(BuildFilter)
                .Subscribe(filter.Change); //apply predicate to filter controller

 //build the predicate
 private Func<LogEntryProxy, bool> BuildFilter(string searchText)
 {
     if (string.IsNullOrEmpty(SearchText))
         return logentry => true;

     return logentry => logentry.Message.Contains(SearchText, StringComparison.OrdinalIgnoreCase)
                     || logentry.Level.ToString().Contains(SearchText, StringComparison.OrdinalIgnoreCase);
 }

Now we have an observable list of log entry items where the user can enter some search text and the result will match the text. Not terribly exciting so far, but terribly simple!

Herein we will make the the screen more interesting and create some stuff which will give user some feed back.

b) Highlight new items

As this is a log entry view items are continually being added so the first bit of user feed back is to highlight newly added rows. The solution to this is simply achieved using reactive code made better by applying a the reactive ui ToProperty() operator. I will create a flag on the LogEntryProxy which will be used to highlight new rows for 2 seconds.

To create a reactive ui property:

//the super cool lazy backing field
 private readonly ObservableAsPropertyHelper<bool> _recent;

//which is exposed like this.
 public bool Recent  {  get { return _recent.Value; } 

and populated like this

_recent = Observable.Create<bool>(observer =>
                                {
                                    var isRecent = DateTime.Now.Subtract(original.TimeStamp).TotalSeconds < 2;
                                    if (!isRecent)   return Disposable.Empty;
                                    observer.OnNext(true);
                                    return Observable.Timer(TimeSpan.FromSeconds(2)).Select(_=>false).SubscribeSafe(observer);
                                }).ToProperty(this,lep=>lep.Recent);

Full code LogEntryProxy.cs

The power of ToProperty is it is lazy. It encapsulates a strategy which I have used for a long time to radically speed up reactive code where code is invoked only on demand, yet I had never encapsulated the concept so well. This is why it is one of my favourite parts of RXUI,

As a practical explanation, if I opened a list with 10,000 items where each row has an observable then 10,000 observables are created as part of loading the list even though a user can only see about 30 at the time. This is clearly performance and memory intensive. In the trading systems which I write each row can have dozens of observations on each proxy so only loading what is required when it is required is a very powerful strategy indeed.

The only more efficient performance and memory strategy which could be deployed is to make observables visibility aware meaning they can be hydrated and de-hydrated on demand. This requires a higher level of control i.e. some form of controller which perhaps I will write about in the future. But not now.

The resulting flag on the proxy can be used to apply a simple xaml trigger in wpf, enabling the recently added rows to be highlighted

<Style TargetType="{x:Type ListViewItem}" BasedOn="{StaticResource MetroListViewItem}">
    <Style.Triggers>
     <DataTrigger Binding="{Binding Recent}" Value="True">
             <Setter Property="Background" Value="{DynamicResource SecondaryAccentBrush}"/>
         <Setter Property="Foreground"  Value="{DynamicResource SecondaryAccentForegroundBrush}"/>   
     </DataTrigger>
    </Style.Triggers>
</Style>

I’ve got to be honest here, I have no idea how I would apply the above trigger using reactive ui binding capabilities, If anyone can let me know, then please do so,

c) Create a delete command and provide some user text

In the example, the user can delete log entry items. So we need a command which can be enabled or disabled according to whether items are selected. The starting point is I created a controller that manages which items are selected by exposing an observable collection of selected items. SelectionController.cs and it’s implementation is not part of what I am trying to illustrate so I will profer no extra explantion about it.

Knowing what items are selected enables us to build the delete command.

The first task is to convert the observable collection into a strongly typed observable change set which provides an observable of adds, updates and removes. Dynamic data provides an operator for this out of the box.

//create the selection controller. This is bound using an attached property
var selectionController = new SelectionController()

//Selected property is ObservableCollection<object>, so convert the select items into an observable change set of log entries
var selectedItems = selectionController.Selected.ToObservableChangeSet().Transform(obj => (LogEntryProxy)obj);

This observable can be directly converted into a command using a combination of dynamic data’s QueryWhenChanged operator and reactive ui’s ToCommand operator.

//make a command out of selected items, enabling the command when there is a selection 
 var deleteCommand = selectedItems
                          .QueryWhenChanged(query => query.Count > 0)
                          .ToCommand();

QueryWhenChanged exposes the inner items of the list when any change has occurred and it returns an observable of whatever data is returned. In this example query.Count>0 is a boolean so the operator returns a boolean observable which enables the command conversion.

This command can now be directly bound to a button. It will be enabled only when items are selected. However so far we are doing nothing when the command is invoked so we must subscribe to the reactive command as follows.

//Assign action when the command is invoked
 var commandInvoker =  this.WhenAnyObservable(x => x.DeleteCommand)
          .Subscribe(_ =>
          {
              //do the code to remove items from LogEntryService
          });

Just one finishing touch for the delete operation, we could do with a message tailored from the user selection. Again I will use the same QueryWhenChanged operator but this time convert the resulting string observable to a property.

var deleteItemsText = selectedItems.QueryWhenChanged(query =>
       {
           if (query.Count == 0) return "Select log entries to delete";
           if (query.Count == 1) return "Delete selected log entry?";
           return string.Format("Delete {0} log entries?", query.Count);
       })
       .StartWith("Select log entries to delete")
       .ToProperty(this, viewmodel => viewmodel.DeleteItemsText);

  //deleted items property
   public string DeleteItemsText    { get { return _deleteItemsText.Value; }   }

A simple binding to DeleteItemsText can now display the appropriate message.

d) Fade out deleted items

The next visual feedback to apply is to fade out when an item is deleted. Instead of removing an item immediately it will gradually fade out and then be removed.

Translated into code, this means each removed item needs to be delayed for a period, and upon the delay period commencing, a flag should be set on the object which will tell the view to run a fade out transition. There are many solutions to this problem but I found the most functional was to create a dynamic data based custom operator. I came up with an extension.

public static IObservable<IChangeSet<TObject, TKey>> DelayRemove<TObject, TKey>(this IObservable<IChangeSet<TObject, TKey>> source,  TimeSpan delayPeriod, Action<TObject> onDefer)
{
   if (source == null) throw new ArgumentNullException("source");
   if (onDefer == null) throw new ArgumentNullException("onDefer");

   return Observable.Create<IChangeSet<TObject, TKey>>(observer =>
   {
       var locker = new object();
       var shared = source.Publish();
       var notRemoved = shared.WhereReasonsAreNot(ChangeReason.Remove)
       .Synchronize(locker);

       var removes = shared.WhereReasonsAre(ChangeReason.Remove)
       .Do(changes => changes.Select(change => change.Current).ForEach(onDefer))
       .Delay(delayPeriod)
       .Synchronize(locker);

       var subscriber = notRemoved.Merge(removes).SubscribeSafe(observer);
       return new CompositeDisposable(subscriber, shared.Connect());

   });
}

In this operator the observable is split into 2 observables, where one part is the adds and updates and the other is the removes. The removed part has a delay applied and when delayed calls back to the consumer.

This operator can now be applied to any dynamic data observable. In our case adjust the observable which we use to load the reactive list and make a DelayRemove call.

//filter, sort and populate reactive list,
var dataLoader = logEntryService.Items.Connect()
                .DelayRemove(TimeSpan.FromSeconds(0.75),proxy =>
                      {
                          //set a property which will start a xaml fadeout storyboard
                          proxy.FlagForRemove();
                          //deselect property other selected brush will be used
                          _selectionController.DeSelect(proxy);
                      })
                 .//more operators to filter and load the list

The xaml for the delay

<Style TargetType="{x:Type ListViewItem}" BasedOn="{StaticResource MetroListViewItem}">
       <Style.Triggers>
       <DataTrigger Binding="{Binding Removing}" Value="True">
           <DataTrigger.EnterActions>
               <BeginStoryboard>
                   <Storyboard>
                       <DoubleAnimation
                               Storyboard.TargetProperty="(TextBlock.Opacity)"
                               From="1.0" To="0.1" Duration="0:0:0.75"/>
                   </Storyboard>
               </BeginStoryboard>
           </DataTrigger.EnterActions>
       </DataTrigger>
   </Style.Triggers>
 </Style>

e) Log count summary

The final feature is a summary of the log count. Yet again. There is nothing new below which has not already been explained above so all I need to say is the following code queries the log entry service and produces an aggregated summary of the different log levels.

//aggregate total items
var summariser = logEntryService.Items.Connect()
      .QueryWhenChanged(query =>
      {
          var items = query.Items.ToList();
          var debug = items.Count(le => le.Level == LogLevel.Debug);
          var info = items.Count(le => le.Level == LogLevel.Info);
          var warn = items.Count(le => le.Level == LogLevel.Warning);
          var error = items.Count(le => le.Level == LogLevel.Error);
          return new LogEntrySummary(debug, info, warn, error);
      })
      .Subscribe(s => Summary = s);

This can now be bound to display a running aggregation of the log level item counts.

4. Integrate Reactive UI into the existing sample dynamic data project

a) Use another IOC container

Reactive UI has it’s own dependency injection mechanism yet my demo application was already set up to use the marvellous structure map. You can if you wish use the built in mechanism for all your registrations but most of us have have our own favourite IOC container. Thankfully ReactiveUI accommodates our choices by exposing an API to allow use of any mechanism.

I found out how to use this API after a quick google search. This custom-structuremap-dependency-resolver-for-reactiveui-5 taught me to implement IMutableDependencyResolver and register it. However this example uses reactive ui 5 and reactive ui 6 now sits on top of Splat, so the registration takes place with Splat.

//create a structure map container
var container =  new Container(x=> x.AddRegistry<AppRegistry>());
//ReactiveUIDependencyResolver implements IMutableDependencyResolver
var resolver =  new ReactiveUIDependencyResolver(container);

//...Register views (see next section)

//register IMutableDependencyResolver
Locator.Current = resolver;

See ReactiveUIDependencyResolver.cs for implementation of IMutableDependencyResolver using structure map.

b) Set up ReactiveUI view hosting

I defer now to this excellent post A GitHub Client with WPF and ReactiveUI Part 2 for the brief and concise explanation of how views are handled using ReactiveUI’s ViewModelViewHost. It taught me everything I need to know to get up an running with the reactive ui way of doing things. Well maybe not everything but it got me off to a running start. So I will not dwell too much on details as I do not wish to replicate that post, but i will briefly explain.

ReactiveUI makes use of a view model host control ViewModelViewHost which exposes a view model dependency property. When some content is bound to the ViewModel property, the dependency resolver resolves the view which is registered via splat.

I created a control with the following definition

<reactiveUi:ViewModelViewHost ViewModel="{Binding Content}"
                        HorizontalContentAlignment="Stretch"
                        VerticalContentAlignment="Stretch"/>

For full xaml RxUiHostView.xaml
The view model which this is bound to RxUIHostViewModel.cs

In demo app, when the view model is required, it is directly resolved and bound to the above xaml. This is where reactive ui takes control. For the view model host to resolve the correct view the dependency resolver must be told what view to use for each view model. This is achieved as followings using the dependency resolver.

//register the log entry view example
resolver.Register(() => new LogEntryView(), typeof(IViewFor<LogEntryViewer>));

This has associated the view and the view model. The view must implement IViewFor which is illustrated next.

c) View to View Model Bindings

Reactive ui provides it’s own binding functionality. I am not too well versed in all of the details so I have not gone the whole hog, but for those who are interested this in the code being my view.

I have set up a few of the bindings using reactive ui bindings for illustrative purposes and the remainder I use wpf bindings.

 public partial class LogEntryView : UserControl, IViewFor<LogEntryViewer>
 {
     public LogEntryView()
     {
         InitializeComponent();

         //use wpf bindings for the data context    
         this.WhenAnyValue(x => x.ViewModel).BindTo(this, x => x.DataContext);

         //use rx ui binding for the search text box and delete command
         this.Bind(ViewModel, model => model.SearchText, view => view.SearchTextBox.Text);
         this.OneWayBind(ViewModel, model => model.DeleteCommand, view => view.DeleteButton.Command);
         //should be able to bind to a command as follows but for some reason it kept throwing, so I quickly qave up!
          //  this.BindCommand(ViewModel, model => model.DeleteCommand, view => view.DeleteButton);
     }

     public static readonly DependencyProperty ViewModelProperty = DependencyProperty.Register(
         "ViewModel", typeof (LogEntryViewer), typeof (LogEntryView), new PropertyMetadata(default(LogEntryViewer)));

     public LogEntryViewer ViewModel
     {
         get { return (LogEntryViewer) GetValue(ViewModelProperty); }
         set { SetValue(ViewModelProperty, value); }
     }

     object IViewFor.ViewModel
     {
         get { return ViewModel; }
         set { ViewModel = (LogEntryViewer)value; }
     }
 }

If anyone wants to change the bindings for this screen to completely use rx ui bindings, please be my guest. Make the changes in github and submit a pull request. Thanks!

5. That’s all folks

Both dynamic data and ReactiveUI are functionally very rich and this article only touches the surface of both but I believe the code in this example will be a good starting point for anyone wishing to find out about either of these libraries.

Trading Demo Part 5 – Aggregate Dynamic Data

This is part 5 of building a trading system series. The idea has been to take one source of rapidly changing trading data and demonstrate the range, power and simplicity of applying dynamic data operators to it. Throughout the series I have made the assumption that the reader is already familiar and comfortable with the absolutely brilliant reactive extensions framework. For those who are not I strongly recommend to check them out. There is a steep learning curve but I assure anyone that once you start thinking reactive you will never turn back. You will probably wonder how you ever programmed without it.

If you are new to dynamic data I suggest you first read this Getting Started and if you have not seen the previous parts they are

1. Expose the data as a service
2. Manage market data
3. Integration with the UI
4. Filter on calculated values

Also if you want to download the code which goes with this example, go to Trading Demo on GitHub and download and run in visual studio. Open the menu option labelled Trading Positions and you will see the following screen.

Aggregated Positions

This screen connects to the trade service data, groups by currency pair and aggregates the totals for each grouping of currency pairs. With dynamic data doing this becomes so simple with very little code involved as I am about to show.

The starting point is to group the trade data by currency pair. This is achieved using dynamic data’s Group operator.

//where tradeService is the trade service created in Part 1.
var mygroup = tradeService.Live.Connect()
                          .Group(trade => trade.CurrencyPair)

This does not look like much but the group operator is very powerful indeed. As with all things reactive, changes in the trade service are automatically propagated to the appropriate grouping. Dynamic Data extends the reactive framework by accounting for adds, updates and removes. So when trades are created, closed or amended the groups will automatically reflect these changes.

The grouping object creates a cache of each group and it’s signature is like this.

public interface IGroup<TObject, TKey, out TGroupKey>
{
    TGroupKey Key {get;}
    IObservableCache<TObject, TKey> Cache { get; }
}

Three generic types, a bit ugly I admit but we have a dynamic collection of caches, one for each item matching the group selector in the underlying data source. When grouping I almost always apply a transform function to do something with the group. In this example we want to aggregate totals for each group, so first I have created an object which accepts the grouping in it’s constructor and apply some dynamic aggregations on the cache. For this we need to use dynamic data’s Transform operator. This is akin to the standard reactive Select operator but as with several other operators in dynamic data I have to use an alternative semantic dues to ambiguous reference problems for extensions.

var transformedGroup = tradeService.Live.Connect()
                .Group(trade => trade.CurrencyPair)
                .Transform(group => new CurrencyPairPosition(group))

This means for each group we have a CurrencyPairPosition object which will be used to dynamically calculate the sum total of the buys and sells so that the overall position can be calculated. For the aggregation I have used another dynamic data operator QueryWhenChanged. The following exert illustrates it’s usage

//constructor only for illustration
public CurrencyPairPosition(IGroup<Trade, long, string> tradesByCurrencyPair)
{
    var aggregations = tradesByCurrencyPair.Cache.Connect()
                        .QueryWhenChanged(query => //query and return a result
}

For each change in the underlying cache the operator is invoked. The parameter exposes a querying API so the underling cache can be queried in a functional manner. The consumer must return a result which becomes an observable of whatever is returned. In this example we have used some standard linq to objects to calculate the buy and sell totals and return a new object which contains these values.

If we expand out the above code we easily create an observable of trade positions for each currency pair,

  var aggregations = tradesByCurrencyPair .Cache.Connect()
                       .QueryWhenChanged(query =>
                        {
                            var buy = query.Items
                                        .Where(trade => trade.BuyOrSell == BuyOrSell.Buy)
                                        .Sum(trade=>trade.Amount);
                            var sell = query.Items
                                        .Where(trade => trade.BuyOrSell == BuyOrSell.Sell)
                                        .Sum(trade => trade.Amount);
                            var count = query.Count;
                            return new TradesPosition(buy,sell,count);
                        });
                //subscribe to the result and set a property to the latest

We are at the point where we have a CurrencyPairPosition object which always reflects the latest position so the last things is to expand the consuming code so we can bind to the collection of these.

 //not all code show and _data is a derived Observable collection
  var subscriber = tradeService.Live.Connect()
                .Group(trade => trade.CurrencyPair)
                .Transform(group => new CurrencyPairPosition(group))
                .Sort(SortExpressionComparer<CurrencyPairPosition>.Ascending(t => t.CurrencyPair))
                .ObserveOn(schedulerProvider.MainThread)
                .Bind(_data)
                .DisposeMany()
                .Subscribe();

This will maintain the sorted order and ensure that CurrencyPairPosition objects are disposed when no longer required. Now a little xaml bound to the data property can produce the screen at the top of the page.

In summary, Dynamic Data makes the reactive management of collections of data very very easy indeed. If you look at the code list below, you will see just how little is involved. There are about 170 lines of code over three c# classes which also includes namespace and using declarations. And what’s even better is despite the underlying data rapidly moving, nowhere have I had to concern myself with thread safety and all the operators are completely thread-safe so long as all code functionally remains inside the operators.

The full code for all the objects is

1. PositionsViewer.cs (The view model)
2. CurrencyPairPosition.cs
3. TradesPosition.cs (the object which holds the result)
4. PositionsView.xaml (the screen)

Integration with ReactiveUI

I have released DynamicData.ReactiveUI which is a very simple adaptor layer to assist with binding dynamic data observables with reactiveui’s ReactiveList object.

Install from Nuget

Nuget

If you are not familiar with dynamic data and what it enables I suggest:

  1. Go through previous posts in the blog
  2. Download the example wpf app from github
  3. .

To cut to the chase, here’s an example. The code takes existing in-memory trade objects, transforms them into a view model proxy and updates the target reactive list object with the resulting change sets. It also pre-filters the data with live trades, applies a user entered filter, orders the resulting data and disposes the proxy when no longer required. Phew – all that in effectively one line of code.

I did the same example without using Reactive UI code see Trading Example Part 3. Integrate with UI where I have explained the code in greater detail

    public class RxUiViewer : ReactiveObject, IDisposable
    {
       //this is the target list which we will populate from the dynamic data stream
        private readonly ReactiveList<TradeProxy> _data = new ReactiveList<TradeProxy>();
        //the filter controller is used to inject filtering into a observable
        private readonly FilterController<Trade> _filter = new FilterController<Trade>();
        private readonly IDisposable _cleanUp;
        private string _searchText;

        public RxUiViewer(ITradeService tradeService)
        {
            //Change the filter when the user entered search text changes
            var filterApplier = this.WhenAnyValue(x => x.SearchText)
                .Throttle(TimeSpan.FromMilliseconds(250))
                .Subscribe(_ => ApplyFilter());
           
            ApplyFilter();

            var loader = tradeService.Trades
                .Connect(trade => trade.Status == TradeStatus.Live) //prefilter live trades only
                .Filter(_filter)    // apply user filter
                   //if targetting Net4 or Net45 platform can use parallelisation for transforms 'cause it's quicker
                .Transform(trade => new TradeProxy(trade), new ParallelisationOptions(ParallelType.Ordered, 5))
                .Sort(SortExpressionComparer<TradeProxy>.Descending(t => t.Timestamp), SortOptimisations.ComparesImmutableValuesOnly)
                .ObserveOn(RxApp.MainThreadScheduler)
                .Bind(_data)        //bind the results to the ReactiveList 
                .DisposeMany()      //since TradeProxy is disposable dispose when no longer required
                .Subscribe();

            _cleanUp = new CompositeDisposable(loader, _filter, filterApplier);
        }

        private void ApplyFilter()
        {
            if (string.IsNullOrEmpty(SearchText))
            {
                _filter.ChangeToIncludeAll();
            }
            else
            {
                _filter.Change(t => t.CurrencyPair.Contains(SearchText, StringComparison.OrdinalIgnoreCase) ||
                                    t.Customer.Contains(SearchText, StringComparison.OrdinalIgnoreCase));
            }
        }

        public string SearchText
        {
            get { return _searchText; }
            set { this.RaiseAndSetIfChanged(ref _searchText, value); }
        }

        public IReadOnlyReactiveList<TradeProxy> Data
        {
            get { return _data; }
        }

        public void Dispose()
        {
            _cleanUp.Dispose();
        }
    }

coupled with a little xaml can produce this

ReactiveUIImage

Very little code, but a very powerful example,

Gone Portable

Dynamic data is now a portable class library available on most platforms. See below.

Portable

Additionally there is a separate dotnet 4.0 library because I know there are enterprises out there stuck in the old days (investment banks maybe?).

Now you can do some cool rx for collections stuff of WP8, IOS and Android as well as windows desktop and server.

I will explain what the plinq assemblies enable in a post in the near future.

Trading Example Part 4. Filter on calculated values

Intrinsic to collection change notifications, items can be notified by add, update and remove events. If ordering is supported a collection will support a move notification. Of course Dynamic Data supports these. Yet something which is often overlooked is data or functions of data are sometimes by necessity mutable. How can a collection which notifies respond to mutable changes?

To deal with this scenario, dynamic data introduces the concept of an Evaluate notification. This notification forms part of a change set and tells a consumer an item needs to be re-assessed or re-evaluated. This may effect things like filtering or sorting of an item. I feel a concrete example best illustrates this concept. Suppose we have the following function:

Func<Trade,bool> isNearToMarketPrice = trade => return Math.Abs(trade.PercentFromMarket) <= 1 %

Where PercentFromMarket is a calculation which is recalculated with each and every market data tick. Using standard linq a list of near to market trades can be retrieved as follows.

var nearToMaketTrades = myListOfTrades.Where(isNearToMarketPrice);

The manifest problem with this query is it pull based only and has no means of observing when the market price has been recalculated, or alternatively how can re-evaluation be forced. Based on practical experience of dealing with this kind problem I introduced multiple means of injecting the evaluate command into a dynamic data stream. Here I will use a filter controller illustrated in a previous blog and we will apply it to the trade service here.

In summary any of the controllers in dynamic data are used to inject commands and meta data into a stream. For this example we need a dynamic filter which is applied to a stream of trades.

//create a filter controller and set it's filter
var filter = new FilterController<Trade>();
filter.Change(trade => Math.Abs(trade.PercentFromMarket) <= percentFromMarket());

//create a stream of live trades where the trade matches the above filter
var filteredByPercent  = myTradeService.Trades
                         .Connect(trade=>trade.Status==TradeStatus.Live)
                         .Synchronize(locker)
                         .Filter(filter)

The filter controller has an overload to force re-evaluation.

filter.Revalue() // to reevaluate all
// or
filter.Reevaluate(Func<T,bool> itemSelector) //to re-evaluate selected items

The only missing element is when do we invoke re-evalulation. We have 2 choices. Either the service which calculates market prices provides a notification of trades which have been re-calculated or we poll on a period. Option 1 would be suitable for algo trading where everything must happen preferably with zero latency but for simplicity in the example we will poll as follows.

//re-evaluate filter periodically
var reevaluator = Observable.Interval(TimeSpan.FromMilliseconds(250))
                         .Subscribe(_ => filter.Reevaluate());

And that is that. We have a live stream of trades, where closed trades are automatically filtered out from the source and the filter controller constantly re-applies to ensure only trades near to the market are included in the result.   And as ever what is beginning to become my catch phrase – that is easy!

After wrapping the function into a cold observable, here’s the final code.

public IObservable<IChangeSet<Trade, long>> Query(Func<uint> percentFromMarket)
{
    if (percentFromMarket == null) throw new ArgumentNullException("percentFromMarket");
    return Observable.Create<IChangeSet<Trade, long>>
	(observer =>
	 {
	     var locker = new object();
	     Func<Trade,bool> nearToMaketTrades = trade => Math.Abs(trade.PercentFromMarket) <= percentFromMarket();
	     var filter = new FilterController<Trade>(nearToMaketTrades);

	     //re-evaluate filter periodically
	     var reevaluator = Observable.Interval(TimeSpan.FromMilliseconds(250))
		 .Synchronize(locker)
		 .Subscribe(_ => filter.Reevaluate());

	     //filter on live trades matching % specified
	     var subscriber = _tradeService.Trades
		 .Connect(trade=>trade.Status==TradeStatus.Live)
		 .Synchronize(locker)
		 .Filter(filter)
		 .SubscribeSafe(observer);

	     return new CompositeDisposable(subscriber, filter, reevaluator);
	 });
}

This observable will form part of a future post where I want to build the foundation of an auto trading system.

But for now, in a few lines of code (see NearToMarketViewer.cs) we can put the data onto the screen.

Near to market

Demo app

I have been busy creating example code of how to use dynamic dynamic data yet this blog has fallen well behind the code. The demo now has 5 simple yet powerful examples with a 6th in mind. The problem is I love being creative with code too much to be spending my time writing about it. With new years on the way perhaps a resolution is required.

Although I am no designer I like systems to look good. After all during the development cycle F5 can be pressed tens of thousands of times so I have taken the trouble to apply some styles.

A big thanks to MahApps for their awesome wpf library and to Dragablz for creating Chrome style draggable tabs.

Trading Demo Menu

As you can see from the image, the menu page has hyperlinks to the code behind and to this blog. Hopefully this will give a coherent means of seeing how the data on the screen matches up with the code.

Almost forgot, but the demo is on GitHub at https://github.com/RolandPheasant/TradingDemo

Trading Example Part 3. Integrate with UI

This is the third part of the trading example series. If you are not already familiar with the previous parts they are, part 1 is here and part 2 here. Also a fully working demo project is here. Since publishing part 2 I have embellished the sample code to make it look more production like so if you previously downloaded it I suggest you download it again.

So far we have created a trade service and a job which updates the market prices. Now I am going to demonstrate how dynamic data can help to present the data onto a WPF screen.

There are few ingredients we need to throw into the mix:

  1. A filter controller as I want the user to be able to search for trades by entering some text.
  2. A proxy to the Trade object as the market price changes and WPF requires an INotifyPropertyChanged invocation.
  3. An observable collection so the screen can be updated with changed items,

The filter controller  which is used to reapply filters within a dynamic stream  is constructed this

private readonly FilterController<Trade> _filter = new FilterController<Trade>();

I recommend using the dynamic data  version of  observable collection

  private readonly IObservableCollection<TradeProxy> _data = new ObservableCollectionExtended<TradeProxy>();        

which is optimised for binding dynamic streams. If however you choose to use the standard observable collection or the one provided by ReactiveUI, you can but will have to write you own operator to update the bindings. I will discuss how to in a future post.

Finally we need a simple proxy of the trade object.

public class TradeProxy:AbstractNotifyPropertyChanged, IDisposable, IEquatable<TradeProxy>;
 {
    private readonly Trade _trade;
    private readonly IDisposable _cleanUp;

    public TradeProxy(Trade trade)
    {
    _trade = trade;

    //market price changed is a observable on the trade object
    _cleanUp = trade.MarketPriceChanged
                     .Subscribe(_ =>; OnPropertyChanged("MarketPrice"));
    }

 public decimal MarketPrice
 {
      get { return _trade.MarketPrice; }
 }

 // additional members below (not show)

With these elements in place we can now easily get data from the trade service, filter it, convert it to a proxy, bind to an observable collection and dispose the proxy when it is no longer required.

            var loader = tradeService.Trades
                .Connect(trade => trade.Status == TradeStatus.Live) //prefilter live trades only
                .Filter(_filter) // apply user filter
                .Transform(trade => new TradeProxy(trade))
                .Sort(SortExpressionComparer<TradeProxy>.Descending(t => t.Timestamp),SortOptimisations.ComparesImmutableValuesOnly)
                .ObserveOnDispatcher()
                .Bind(_data)   // update observable collection bindings
                .DisposeMany() //since TradeProxy is disposable dispose when no longer required
                .Subscribe();

The only missing code is to apply a user entered filter. It looks something like this:

 var filterApplier =//..watch for changes to the search text bindings
 .Sample(TimeSpan.FromMilliseconds(250))
 .Subscribe(_ =>  {
                     Func<Trade,bool> predicate= //build search predicate;
                     _filter.Change(predicate);
                  }
            );

I will not explain the xaml required as this is beyond dynamic data. But a few lines of xaml bound to the result of observable collection can give this.
Live trades 3

Was that easy? Take a look at the source code LiveTradesViewer.cs. 80 lines of code including white space.

All I say now is don’t tell your boss that you can do all this in a few lines of code otherwise you may have a pay cut!

Trading Example Part 2. Manage market data

Part 1 to this series is here. Demo code is here

In this part we will monitor trades and update them with the latest market data. A live trade will typically have market data which is constantly streaming. So the problem to solve is to create the functionality to monitor live trades and keep the market price up to date on each trade.

For this we require 2 streams of data.

  1. Live trade stream
  2. Live market data stream

and we need to join them together and keep them in sync.

The live trade stream is easy using dynamic data.

var stream = tradeService.Trades.Connect()
                .Filter(trade => trade.Status == TradeStatus.Live)

And we’ll take it as a given that we can get the market price via a reactive stream.  The method signature would look something like this.

IObservable<decimal> ObservePrice(string currencyPair)

Each trade has a currency pair so we need to subscribe to streams for each currency pair. There are many ways to crack this particular egg but for the demo, I will first of all group the trades by currency pair. Again dynamic data makes it easy. Just group by currency pair.

var stream = tradeService.Trades.Connect()
                .Filter(trade => trade.Status == TradeStatus.Live)
                .Group(trade => trade.CurrencyPair)

This will give an observable cache of trades for each distinct currency pair. How cool is that? No code at all to maintain the grouping!  When trades are closed they will be removed from the grouping. And even better if a trade is updated to use a different currency pair the trade will be moved from the old group and added to the new one.  Now the trades are grouped by currency pair, we need to subscribe to the price stream and update each trade with the latest price as it changes.  For this I will introduce a new operator ‘SubscribeMany’ which subscribes to each item in a stream.

var stream = = tradeService.Trades.Connect()
  .Filter(trade => trade.Status == TradeStatus.Live)
  .Group(trade => trade.CurrencyPair)
  .SubscribeMany(grouping =>
  {
   //do something cool and return a disposable beacuse it will be
   // disposed when removed from the stream
  })

At a glance this seems nothing special but what it enables is to apply an action when the item is added or updated in a stream and to unsubscribe when an item is removed from a stream. In this case the operation applies to each distinct currency pair in the live trades query. What does this enable? Perhaps best illustrated by the following concrete example.

//...
    .SubscribeMany(grouping =>
   {
     var locker = new object();
     decimal latestPrice = 0;

    //1. Subscribe to price and update trades with the latest price
    var priceHasChanged = MarketPriceService.ObservePrice(grouping.Key)
    .Synchronize(locker)
    .Subscribe(price =>
   {
    latestPrice = price;
    UpdateTradesWithPrice(grouping.Cache.Items, latestPrice);
   });

   //2. Connect to data changes and update with the latest price. Exclude items which are removed from the stream because they are no longer live
   var dataHasChanged = grouping.Cache.Connect()
   .WhereReasonsAre(ChangeReason.Add, ChangeReason.Update)
   .Synchronize(locker)
   .Subscribe(changes => UpdateTradesWithPrice(changes.Select(change => change.Current), latestPrice));

   return new CompositeDisposable(priceHasChanged, dataHasChanged);
 })
.Subscribe()

where UpdateTradesWithPrice is a method as follows

private void UpdateTradesWithPrice(IEnumerable<Trade> trades, decimal price)
{
  foreach (var trade in trades)
  {
     trade.Price = price;
  }
}

Now we are function where live trades will be updated with the latest market data – always. And what’s more the example is totally thread safe. And even better there is very little plumbing code required.

Believe me this is a complex as dynamic data can be.  So if you follow this you can truly run with dynamic data. Also I promise that all subsequent posts will be easy.

Trading Example Part 1. Create a trade service

One of the things I love about dynamic data is no matter what the nature of the data which you are dealing with the code will look the same. I also love the power of data transformations and just how easy it is to tame sequences of data. However a lot of the capability and power is not obvious at a glance so this is the start of a series of posts where step by step I will illustrate usage of the library. I have decided to go in at the deep end and demonstrate a trading system as these systems generally have rapidly moving data and loads of complexity. What I hope to achieve is to show that by using dynamic data your code can be simplified greatly.

Source code can be found here

We will start off with a simple trade object

    public class Trade
    {
        public long Id { get; private set; }
        public string CurrencyPair { get; private set; }
        public string Customer { get; private set; }
        public decimal Price { get; set; }
        public TradeStatus Status { get; private set; }

        public Trade(long id, string customer, string currencyPair, TradeStatus status)
        {
            Id = id;
            Customer = customer;
            CurrencyPair = currencyPair;
            Status = status;
        }
    }

I want a service which exposes a load of trades.

    public interface ITradeService
    {
        IObservableCache<Trade, long> Trades { get; }
    }

Implementing this is a trivial matter. For clarity not all code is shown.

    public class TradeService : ITradeService
    {
        private readonly IObservableCache<Trade, long> _tradesCache;

        public TradeService()
        {
            //create a cache specifying that the unique key is id
            var tradesSource = new SourceCache<Trade, long>(trade => trade.Id);

            //call AsObservableCache() to hide the update methods as we are exposing the cache
            _tradesCache = tradesSource.AsObservableCache();

           //code to emulate an external trade provider
            var tradeLoader = GenerateTradesAndMaintainCache();

            //.....
        }

        public IObservableCache<Trade, long> Trades
        {
            get { return _tradesCache; }
        }
    }

We now have a trades service which I will be using in subsequent posts to demonstrate the functionality of dynamic data.

Part 2

Binding Example

This is the first in a series of real world dynamic data examples.

Lets’s assume I have a data  which is exposed via a service.

 public interface IPersonService
 {
      IObservableCache<Person, PersonKey> Cache { get; }
 }

and I want to bind the data to a screen. In dynamic data it is done like this.

 //this is a simple extension of observable collection which comes as part of the package
 var collection = new ObservableCollectionExtended<Person>();
 
 //Dynamic data is an extension of reactive so subscribe at the end of the chain
 var loader = personService.Cache.Connect()
 .Sort(SortExpressionComparer<Person>.Ascending(p=>p.FirstName).ThenByAscending(p=>p.LastName))
 .ObserveOnDispatcher()
 .Bind(collection)
 .Subscribe(); 

The binding will always reflect the data in the cache (and in the stated sort order).  That was easy so let’s extend this so we can apply a filter. This is also easy.

var loader = personService.Cache.Connect().Filter(p=>p.Age>50) 

or use a filter controller which allows the filter to be changed any time.

 //create a filter controller 
 var filterController = new FilterController<Person>();
 
 //filter can be changed any time by applying the following method
 filterController.Change(p=>p.Age>50);

 var loader = personService.Cache.Connect().Filter(filterController)

And that is that.  We can filter, sort and bind changeable data with very little code.