In my previous post I introduced the observable list which was released in version 4 of Dynamic Data.
For a working demonstration of dynamic data see Dynamic Trader.
Although Dynamic Data has always had an observable cache I have never posted an ‘at a glance’ offering so I have put together this post for completeness. The observable cache is the big brother of the observable list. It has existed for years, is very well tried and tested and a lot of trouble has been taking to ensure good performance can be easily achieved. This document briefly outlines the usage and behaviour of the observable cache.
In my next post I will examine the differences between the observable cache and observable list and which circumstances should one be favoured over the other. I will also examine how performance can be fine tuned.
Create and maintain a cache
Create an observable cache like this:
var myCache = new SourceCache<TObject,TKey>(t => key);
This can be amending calling the the edit methods. For example:
myCache.Clear(); myCache.AddOrUpdate(myItems);
The Clear
and AddOrUpdate
methods above will each produce a distinct change notification. In order to increase efficiency when making multiple amendments, the cache provides a means of batch editing. This is achieved using the .BatchUpdate
method which ensures only a single change notification is produced.
myCache.BatchUpdate(innerCache => { innerCache.Clear(); innerCache.AddOrUpdate(myItems); });
If myCache
is to be exposed publicly it can be made read only using .AsObservableCache
IObservableCache<TObject,TKey> readonlyCache = myCache.AsObservableCache();
which hides the edit methods.
The cache is observed by calling myCache.Connect()
like this:
IObservable<IChangeSet<TObject,TKey>> myCacheObservable = myCache.Connect();
This creates an observable change set for which there are dozens of operators. The changes are transmitted as an Rx observable, so they are fluent and composable.
Observing dynamic data
Dynamic Data is based on the concept of creating and manipulating observable change sets. An observable change set is a collection of changes which have been applied to the underlying data source. The change set is always preceded with any data which is already in the underlying collection.
The primary method of creating observable change sets is to connect to an instances of the cache. There are alternative methods to produce observable change sets however, depending on the data source.
Connect to a Cache
The most common means of creating an observable change set is by calling Connect
on an observable cache
var myObservableChangeSet = myDynamicDataSource.Connect();
The observable change set always begins with any items which are in the cache when the subscription is made.
Create an Observable Change Set from an Rx Observable
Given either of the following Rx observables:
IObservable<T> myObservable; IObservable<IEnumerable<T>> myObservable;
an observable change set can be created like by calling .ToObservableChangeSet
like this:
var myObservableChangeSet = myObservable.ToObservableChangeSet(t=> t.key);
Chain together operators
Since dynamic data is based on the IObservable
interface and each operators returns an observable change set, operators can be chained together and flexibly composed.
For example if you have an observable cache containing trades you can sequentially apply multiple operators,
var myFluentOperation = myTrades .Filter(trade=>trade.Status == TradeStatus.Live) .Transform(trade => new TradeProxy(trade)) .Sort(SortExpressionComparer<TradeProxy>.Descending(t => t.Timestamp)) .Subscribe(changes=>//do something with the result...);
The result is a true observable. When values are added, replaced or removed from myTrades
, the result will always reflect these changes.
Create a Derived Cache
Any observable change set can be converted into an observable cache by calling the AsObservableCache
method
var readOnlyCache = myTrades .Filter(trade=>trade.Status == TradeStatus.Live) .AsObservableCache();
which produces a self-maintaining cache of live trades.
Operator overview
Dynamic data is built on top of Rx but since Rx does not deal with collection changes, there are specific operators which account for adds, updates and removes for a large number of operations. There are detailed in the following sections.
Restriction Operators
These operators reduce the amount of changes and hence underlying data when applied.
OPERATOR | DESCRIPTION |
---|---|
ExpireAfter | Time based expiry for a cache |
Filter | The semantic equivalent of Rx.Where. There is an overload to allow the predicate to be changed |
IgnoreUpdateWhen | Ignores an update when the condition is met. |
IncludeUpdateWhen | Only includes the update when the condition is met. |
LimitSizeTo | Limits the size of the source cache to the specified limit, providing a notification when an item is removed due to the size limit being exceeded |
NotEmpty | Prevent empty changesets. By default this is applied by dynamic data operators so if only required for custom operators |
Page | Applies paging to the the underlying data source. Parameters are provided and changed by providing an observable of page requests |
SkipInitial | Defer the subscription until there is underlying data has loaded and skip first set of changes |
Top | Limits the size of the result set to the specified number of items |
Virtualise | Resulting changes only includes only a virtualised subset of the underlying data. Parameters are provided and changed by providing an observable of virtual requests |
WhereReasonsAre | Includes changes for the specified reasons only |
WhereReasonsAreNot | Excludes changes for the specified reasons |
Transformative Operators
OPERATOR | DESCRIPTION |
---|---|
AsObservableCache | Converts an observable change set into a read only observable cache / hides the edit methods of the source cache. |
Convert | Light weight conversion / casting of a type as specified by a value selector |
DistinctValues | Return a changeset of distinct values as specified by a value selector |
Group | Semantic equivalent of Rx.GroupBy operator |
QueryWhenChanged | The latest copy of the underlying data is exposed for querying i) after each modification to the underlying data ii) upon subscription. The method accepts a result selector which accepts allows any resulting observable to be returned |
RemoveKey | Removes the key from each change. This changes the change set from a cache change set to a list changeset |
Sort | Allow sort operators using a specified comparer. There is an overload to allow the sort to be changed |
ToCollection | Converts the changeset into a fully formed collection. Each change in the source results in a new read only collection |
Transform | The semantic equivalent of Rx.Select |
TransformMany | Equivalent to a select many transform |
TransformSafe | The semantic equivalent of Rx.Select. Provides and overload to handle any errors encountered when converting an item |
TransformToTree | Transforms the object to a fully recursive tree, create a hiearchy based on the pivot function |
Join Operators
The following operators apply boolean logic logic between two or more observable change sets.
OPERATOR | DESCRIPTION |
---|---|
And | Applies a logical And between the underlying data in two or more observable change sets |
Or | Applies a logical Or between the underlying data in two or more observable change sets |
Xor | Applies a logical Xor between the underlying data in two or more observable change sets |
Except | Takes the results of the first observable change set and ignore any items produced by one or more other change sets |
Aggregation Operators
The aggregation operators are also new to Dynamic Data version 4. They produce light weight continuous aggregation of the underlying observable change set.
OPERATOR | DESCRIPTION |
---|---|
Count | Counts the number of items in the underlying data |
Sum | The sum of values matching the specified value selector |
Avg | The average value matching the specified value selector |
Maximum | The maximum value matching the specified value selector |
Minimum | The minimum value matching the specified value selector |
StdDev | The standard deviation of all the values matching the specified value selector |
Inspection of Items Operators
These operators concentrate on applying logic to individual items within the change set.
OPERATOR | DESCRIPTION |
---|---|
DisposeMany | Dispose each item when removed from the stream. Also disposes all items when the stream is completed. |
ForEachChange | Provides a callback for each individual item change |
MergeMany | Dynamically merges the observable which is selected from each item in the stream, and unmerges the item when it is no longer part of the stream. |
OnItemRemoved | Callback for each item as and when it is being removed from the stream. This is not the same as restricting using WhereReasonsAre as an item is also considered as being removed when a subscribed stream has been disposed |
SubscribeMany | Subscribes to each item when it is added to the stream and unsubcribes when it is removed. All items will be unsubscribed when the stream is disposed |
TrueForAny | Produces a boolean observable indicating whether the resulting value of whether any of the values from the specified observable matches the equality condition |
TrueForAll | Produces a boolean observable indicating whether the resulting value of whether all of the values from the specified observable matches the equality condition |
Watch | Returns an observable of any changes for a single item which matches a specified key |
WatchValue | Overload of watch which returns the changed value only |
WhenAnyChanged | Property changed observation of each item in the underlying collection which provides a notification when any item has changed. The notification includes the sender and value of the changed property |
WhenAnyValueChanged | Property changed observation of each item in the underlying collection which provides a notification when any item has changed. The notification only includes the value of the changed property |
Additional Operators
OPERATOR | DESCRIPTION |
---|---|
Adapt | Injects side effects using IChangeSetAdaptor interface |
Bind | Ensures a specified observable collection reflects the underling data |
Batch | This is a short cut to applying a standard Rx time based buffer followed by FlattenBufferResult (see below) |
BufferIf | Conditional buffer which accepts a boolean observable to turn buffering on or off |
ChangeKey | Changes the unique item key |
Clone | Populate a dictionary instance with the latest changes |
DeferUntilLoaded | Delay subscription until the source is populated with data |
FlattenBufferResult | A changeset is already a collection and therefore becomes a nested collection when Rx buffer operatons are applied. Use this to flatten the result back to a single change set |
PopulateInto | Populates a source cache instance with the latest changes |
RefCount | Change set equivalent to Publish().RefCount(). The source is cached so long as there is at least 1 subscriber |
Hello,
thanks for the article.
in your exemple you are filtering on status == live. But if i change this value on one object somewhere else in my code(so it is no longer live), the collection is not updated (object not removed).
How could i modify the sample for this purpose?
LikeLike
The best way is to create a new object with the new status and update the source cache. This will propagate the change.
There is an example of this here https://github.com/RolandPheasant/Dynamic.Trader
If you mean you want to apply the filter when a property changes it is doable but requires some plumbing. It would require an example for illustration – if you need more details contact me on https://gitter.im/RolandPheasant/DynamicData
LikeLike
Thanks for the quick reply!
I was “afraid” it was what i needed to do. Though i’m not creating a copy i’m just sending it again to cacheSource.AddOrUpdate() thanks to SouceCache.
But it is a little less clean as i have to think to do it every time i update a property that can be used in a filter.
The alternative solution i found to do it only once per prorety is to do a property observer for each:
_cacheSource.Connect().WhereReasonsAre(ChangeReason.Add).WhenPropertyChanged(x => x.Popular)
.Subscribe(x => {
_cacheSource.AddOrUpdate(x.Sender);
});
Not sure it is very good for perfomances but it works 🙂
LikeLike
In your solution, it will work and you do not need the wherereasonsare clause for it to work.
There are alternative and functionally pure solutions which make use of the overloads of the filter() operator. I cannot post an example for a could days.
If you sign into the dynamic data gitter page and ping me, I will be able to directly contact you when I post a solution
LikeLike
Hi Roland. Did you post a solution/eksample for this?
LikeLike
myCache.BatchUpdate is not working. There is no such method. Should it be replaced with Edit?
LikeLike
Hi Roland,
I’m a private stock trader who is looking for a new in-memory approach implementing a realtime producer-consumer resp. observer pattern. At the moment I’m gathering stock ticks resp. changes by a timer and writing each stock tick to a distinct file on hard disk. I’m doing this because I do subscribe on the file system watcher’s events on file creation. Task Parallel Library TPL is engaged to avoid losing any tick. This weird approach was chosen because I did struggle with concurrent collections at all, also when it comes to ui updating.
Is rx resp. dynamic data capable to overcome my weird create-file-listen-to-file-system-watcher-within-tpl-flow-approach ?
Where to start?
Thanks!
LikeLike