Observable Cache At A Glance

In my previous post I introduced the observable list which was released in version 4 of Dynamic Data.

For a working demonstration of dynamic data see Dynamic Trader.

Although Dynamic Data has always had an observable cache I have never posted an ‘at a glance’ offering so I have put together this post for completeness. The observable cache is the big brother of the observable list. It has existed for years, is very well tried and tested and a lot of trouble has been taking to ensure good performance can be easily achieved. This document briefly outlines the usage and behaviour of the observable cache.

In my next post I will examine the differences between the observable cache and observable list and which circumstances should one be favoured over the other. I will also examine how performance can be fine tuned.

Create and maintain a cache

Create an observable cache like this:

var myCache = new SourceCache<TObject,TKey>(t => key);

This can be amending calling the the edit methods. For example:

myCache.Clear();
myCache.AddOrUpdate(myItems);

The Clear and AddOrUpdate methods above will each produce a distinct change notification. In order to increase efficiency when making multiple amendments, the cache provides a means of batch editing. This is achieved using the .BatchUpdate method which ensures only a single change notification is produced.

myCache.BatchUpdate(innerCache =>
              {
                  innerCache.Clear();
                  innerCache.AddOrUpdate(myItems);
              });

If myCache is to be exposed publicly it can be made read only using .AsObservableCache

IObservableCache<TObject,TKey> readonlyCache = myCache.AsObservableCache();

which hides the edit methods.

The cache is observed by calling myCache.Connect() like this:

IObservable<IChangeSet<TObject,TKey>> myCacheObservable = myCache.Connect();

This creates an observable change set for which there are dozens of operators. The changes are transmitted as an Rx observable, so they are fluent and composable.

Observing dynamic data

Dynamic Data is based on the concept of creating and manipulating observable change sets. An observable change set is a collection of changes which have been applied to the underlying data source. The change set is always preceded with any data which is already in the underlying collection.

The primary method of creating observable change sets is to connect to an instances of the cache. There are alternative methods to produce observable change sets however, depending on the data source.

Connect to a Cache

The most common means of creating an observable change set is by calling Connect on an observable cache

var myObservableChangeSet = myDynamicDataSource.Connect();

The observable change set always begins with any items which are in the cache when the subscription is made.

Create an Observable Change Set from an Rx Observable

Given either of the following Rx observables:

IObservable<T> myObservable;
IObservable<IEnumerable<T>> myObservable;

an observable change set can be created like by calling .ToObservableChangeSet like this:

var myObservableChangeSet = myObservable.ToObservableChangeSet(t=> t.key);

Chain together operators

Since dynamic data is based on the IObservable interface and each operators returns an observable change set, operators can be chained together and flexibly composed.

For example if you have an observable cache containing trades you can sequentially apply multiple operators,

var myFluentOperation = myTrades
                    .Filter(trade=>trade.Status == TradeStatus.Live) 
                    .Transform(trade => new TradeProxy(trade))
                    .Sort(SortExpressionComparer<TradeProxy>.Descending(t => t.Timestamp))
                    .Subscribe(changes=>//do something with the result...);  

The result is a true observable. When values are added, replaced or removed from myTrades, the result will always reflect these changes.

Create a Derived Cache

Any observable change set can be converted into an observable cache by calling the AsObservableCache method

var readOnlyCache = myTrades
            .Filter(trade=>trade.Status == TradeStatus.Live) 
            .AsObservableCache();

which produces a self-maintaining cache of live trades.

Operator overview

Dynamic data is built on top of Rx but since Rx does not deal with collection changes, there are specific operators which account for adds, updates and removes for a large number of operations. There are detailed in the following sections.

Restriction Operators

These operators reduce the amount of changes and hence underlying data when applied.

OPERATOR DESCRIPTION
ExpireAfter Time based expiry for a cache
Filter The semantic equivalent of Rx.Where. There is an overload to allow the predicate to be changed
IgnoreUpdateWhen Ignores an update when the condition is met.
IncludeUpdateWhen Only includes the update when the condition is met.
LimitSizeTo Limits the size of the source cache to the specified limit, providing a notification when an item is removed due to the size limit being exceeded
NotEmpty Prevent empty changesets. By default this is applied by dynamic data operators so if only required for custom operators
Page Applies paging to the the underlying data source. Parameters are provided and changed by providing an observable of page requests
SkipInitial Defer the subscription until there is underlying data has loaded and skip first set of changes
Top Limits the size of the result set to the specified number of items
Virtualise Resulting changes only includes only a virtualised subset of the underlying data. Parameters are provided and changed by providing an observable of virtual requests
WhereReasonsAre Includes changes for the specified reasons only
WhereReasonsAreNot Excludes changes for the specified reasons

Transformative Operators

OPERATOR DESCRIPTION
AsObservableCache Converts an observable change set into a read only observable cache / hides the edit methods of the source cache.
Convert Light weight conversion / casting of a type as specified by a value selector
DistinctValues Return a changeset of distinct values as specified by a value selector
Group Semantic equivalent of Rx.GroupBy operator
QueryWhenChanged The latest copy of the underlying data is exposed for querying i) after each modification to the underlying data ii) upon subscription. The method accepts a result selector which accepts allows any resulting observable to be returned
RemoveKey Removes the key from each change. This changes the change set from a cache change set to a list changeset
Sort Allow sort operators using a specified comparer. There is an overload to allow the sort to be changed
ToCollection Converts the changeset into a fully formed collection. Each change in the source results in a new read only collection
Transform The semantic equivalent of Rx.Select
TransformMany Equivalent to a select many transform
TransformSafe The semantic equivalent of Rx.Select. Provides and overload to handle any errors encountered when converting an item
TransformToTree Transforms the object to a fully recursive tree, create a hiearchy based on the pivot function

Join Operators

The following operators apply boolean logic logic between two or more observable change sets.

OPERATOR DESCRIPTION
And Applies a logical And between the underlying data in two or more observable change sets
Or Applies a logical Or between the underlying data in two or more observable change sets
Xor Applies a logical Xor between the underlying data in two or more observable change sets
Except Takes the results of the first observable change set and ignore any items produced by one or more other change sets

Aggregation Operators

The aggregation operators are also new to Dynamic Data version 4. They produce light weight continuous aggregation of the underlying observable change set.

OPERATOR DESCRIPTION
Count Counts the number of items in the underlying data
Sum The sum of values matching the specified value selector
Avg The average value matching the specified value selector
Maximum The maximum value matching the specified value selector
Minimum The minimum value matching the specified value selector
StdDev The standard deviation of all the values matching the specified value selector

Inspection of Items Operators

These operators concentrate on applying logic to individual items within the change set.

OPERATOR DESCRIPTION
DisposeMany Dispose each item when removed from the stream. Also disposes all items when the stream is completed.
ForEachChange Provides a callback for each individual item change
MergeMany Dynamically merges the observable which is selected from each item in the stream, and unmerges the item when it is no longer part of the stream.
OnItemRemoved Callback for each item as and when it is being removed from the stream. This is not the same as restricting using WhereReasonsAre as an item is also considered as being removed when a subscribed stream has been disposed
SubscribeMany Subscribes to each item when it is added to the stream and unsubcribes when it is removed. All items will be unsubscribed when the stream is disposed
TrueForAny Produces a boolean observable indicating whether the resulting value of whether any of the values from the specified observable matches the equality condition
TrueForAll Produces a boolean observable indicating whether the resulting value of whether all of the values from the specified observable matches the equality condition
Watch Returns an observable of any changes for a single item which matches a specified key
WatchValue Overload of watch which returns the changed value only
WhenAnyChanged Property changed observation of each item in the underlying collection which provides a notification when any item has changed. The notification includes the sender and value of the changed property
WhenAnyValueChanged Property changed observation of each item in the underlying collection which provides a notification when any item has changed. The notification only includes the value of the changed property

Additional Operators

OPERATOR DESCRIPTION
Adapt Injects side effects using IChangeSetAdaptor interface
Bind Ensures a specified observable collection reflects the underling data
Batch This is a short cut to applying a standard Rx time based buffer followed by FlattenBufferResult (see below)
BufferIf Conditional buffer which accepts a boolean observable to turn buffering on or off
ChangeKey Changes the unique item key
Clone Populate a dictionary instance with the latest changes
DeferUntilLoaded Delay subscription until the source is populated with data
FlattenBufferResult A changeset is already a collection and therefore becomes a nested collection when Rx buffer operatons are applied. Use this to flatten the result back to a single change set
PopulateInto Populates a source cache instance with the latest changes
RefCount Change set equivalent to Publish().RefCount(). The source is cached so long as there is at least 1 subscriber

7 thoughts on “Observable Cache At A Glance

  1. Hello,
    thanks for the article.
    in your exemple you are filtering on status == live. But if i change this value on one object somewhere else in my code(so it is no longer live), the collection is not updated (object not removed).
    How could i modify the sample for this purpose?

    Like

    • The best way is to create a new object with the new status and update the source cache. This will propagate the change.

      There is an example of this here https://github.com/RolandPheasant/Dynamic.Trader

      If you mean you want to apply the filter when a property changes it is doable but requires some plumbing. It would require an example for illustration – if you need more details contact me on https://gitter.im/RolandPheasant/DynamicData

      Like

      • Thanks for the quick reply!
        I was “afraid” it was what i needed to do. Though i’m not creating a copy i’m just sending it again to cacheSource.AddOrUpdate() thanks to SouceCache.
        But it is a little less clean as i have to think to do it every time i update a property that can be used in a filter.

        The alternative solution i found to do it only once per prorety is to do a property observer for each:
        _cacheSource.Connect().WhereReasonsAre(ChangeReason.Add).WhenPropertyChanged(x => x.Popular)
        .Subscribe(x => {
        _cacheSource.AddOrUpdate(x.Sender);
        });
        Not sure it is very good for perfomances but it works 🙂

        Like

      • In your solution, it will work and you do not need the wherereasonsare clause for it to work.

        There are alternative and functionally pure solutions which make use of the overloads of the filter() operator. I cannot post an example for a could days.

        If you sign into the dynamic data gitter page and ping me, I will be able to directly contact you when I post a solution

        Like

  2. Hi Roland,
    I’m a private stock trader who is looking for a new in-memory approach implementing a realtime producer-consumer resp. observer pattern. At the moment I’m gathering stock ticks resp. changes by a timer and writing each stock tick to a distinct file on hard disk. I’m doing this because I do subscribe on the file system watcher’s events on file creation. Task Parallel Library TPL is engaged to avoid losing any tick. This weird approach was chosen because I did struggle with concurrent collections at all, also when it comes to ui updating.

    Is rx resp. dynamic data capable to overcome my weird create-file-listen-to-file-system-watcher-within-tpl-flow-approach ?

    Where to start?

    Thanks!

    Like

Leave a comment