Net Dots

.Net and programming in general

Making list navigation responsive using Reactive Extensions-part 2 (SampleResponsive)

In the previous article in this series we used standard RX operator Throttle to reduce the frequency of details view updates while navigating between records in a list. We have identified two problems with this  approach:

  • A “single” navigation does not cause the details to start refreshing immediately, creating an illusion that the system is sluggish.
  • While we navigating continuously, details are not refreshed, so we do not have a good idea where in the list we are located currently.

The second point can be addressed by using Sample instead of Throttle. Think of Sample as of a strobe light – it flashes periodically revealing the last event that happened just before. If we replace Throttle with Sample in out example from previous article, we would have responsive solution with some “feedback” but unfortunately our biggest problem of not “reacting immediately” still stands.

We need a new observable extension, that follows these rules:

  • “Front” event or an event that happens after some period of inactivity is passed through immediately.
  • Events that come frequently are filtered out but some are passed through periodically
  • The last event before inactivity is always passed through.

Sounds complex? The good news, there is a “real world” apparatus implementing these rules so it can serve as a reference model for us. Here:

ak47

 

Think of it. Always ready to fire (the way we like our details navigation), firing at fixed intervals when trigger is held. Let’s copy the model and use in our "SampleResponsive” extension. Machine gun works by using the power of a shot (recoil or gas) to load the next round into the chamber. In our case, we need somehow mix the external events (trigger presses) with internal process (reloading) and this combination of events (fire) will be our target observable, to which we would subscribe and which would also feed the internal process. In other words, our system will contain a positive feedback loop. What is interesting, the RX logo itself contains a reference to this concept:

RXLogo

Interesting, isn’t it? Let’s start then. To “stitch” the loop together, we must use a Subject, otherwise we have no way to feed the stream of events back to itself. The Subject, being both IObserver and IObservable, can be subscribed on and subscribed to. Let’s call it fire:

                 
                 var  fire = new  Subject <T>();
  

Now we need to emulate the process of “reloading” which we will create with a simple Delay extension, assuming for simplicity it takes a half of a second to reload the next round:

 
                 var  whenCanFire = fire.Delay(TimeSpan .FromSeconds(0.5));
 

This whenCanFire observable signifies a collection of moments when a fresh round in the chamber becomes available. Now, wait a second… To start firing, we need to have the first round ready, otherwise we will not be able to start this chain of events. Let’s feed the first round manually:

 
                 var  whenCanFire = fire
                     .Delay(TimeSpan .FromSeconds(0.5))
                     .StartWith(default (T));
  

Now we can bring together these two sequences – one when trigger is pressed (source parameter in our SampleResponsive extension) and another is whenCanFire from above. What RX function we can use for this task? Unfortunately, the entire library contains only two “parallel combinators” – Zip and CombineLatest. All other extensions are either “non-parallel”, meaning that combined observables play different roles; or not “combinators”, meaning that they do not bring pairs of events together (Merge is an example). CombineLatest is probably the closest to what we want, but if we use it, our “machine gun” would act like one with a trigger that “stuck” – one press of the trigger and it would fire continuously without additional presses. We need to combine only one press with one round, consuming both in the firing process. Let’s call this new observable extension CombineVeryLatest and we will create it in the next article. For now, let’s just assume it has the same signature as CombineLatest with the difference that each element in the pair can be used only once. We use our new extension to join the external and internal processes and to create the positive feedback loop:

 
                 var  subscription = src
                     .CombineVeryLatest(whenCanFire, (x, flag) => x)
                     .Subscribe(fire);
  

Bringing it all together, here is our extension:

 
         public  static  IObservable <T> SampleResponsive<T>(
                 this  IObservable <T> source, TimeSpan  delay)
         {
             var  fire = new  Subject <T>();
 
             var  whenCanFire = fire
                 .Delay(delay)
                 .StartWith(default (T));
 
             var  subscription = source
                 .CombineVeryLatest(whenCanFire, (x, flag) => x)
                 .Subscribe(fire);
 
             return  fire.Finally(subscription.Dispose);
         }
  

We still have some work to do. Observables come in two “flavors” or rather “temperatures” – cold and hot. “Cold” ones perform all subscription activity separately for each subscriber, “Hot” observables do it once, when first subscription appears. What temperature is ours? It is hard to say, SampleResponsive combines a Subject which is a Hot observable with CombineVeryLatest, which is a Cold observable – maybe we can call ours “lukewarm”. This is not good – f.e. the above implementation would create the subject and internal subscription always, even if nobody subscribes to it. So where should we go temperature-wise – to hot or to cold? Let’s ask ourselves – if we have two subscribers  are we Ok to receive the same set of events for each? I think yes, so the decision is made to make our extension Hot:

 
         public  static  IObservable <T> SampleResponsive<T>(
                 this  IObservable <T> source, TimeSpan  delay)
         {
             return  source.Publish(src =>
             {
 
                 var  fire = new  Subject <T>();
 
                 var  whenCanFire = fire
                     .Delay(delay)
                     .StartWith(default (T));
 
                 var  subscription = src
                     .CombineVeryLatest(whenCanFire, (x, flag) => x)
                     .Subscribe(fire);
 
                 return  fire.Finally(subscription.Dispose);
             });
         }
  

Now all “internals” – Subject, subscription, etc. will be created only once when the first subscriber checks in. Let’s use it, replacing the Throttle in our subscription (see previous article) with SampleResponsive:

 
             _whenItemSelected
 //                .Throttle(TimeSpan.FromSeconds(0.4)) 
                 .SampleResponsive(TimeSpan .FromSeconds(0.4))
                 .Do(_ => ReadDetailData())
                 .Subscribe(_ => InvokePropertyChanged("Details" ));
  

Running the application, we can see that we are back to square one – it acts sluggish again. Looking at the debug output, we can see our “detail reading” process happens on the main UI thread. There are two ways to deal with it and demand “task pool processing”. One, we can use ObserveOn operator:

 
             _whenItemSelected
 //                .Throttle(TimeSpan.FromSeconds(0.4)) 
                 .SampleResponsive(TimeSpan .FromSeconds(0.4))
                 .ObserveOn(Scheduler .TaskPool)
                 .Do(_ => ReadDetailData())
                 .Subscribe(_ => InvokePropertyChanged("Details" ));
  

Another (and this is preferred approach, since it both gives developer a greater flexibility and enables RX TestScheduler unit testing) is to introduce a scheduler parameter to our extension. Doing this is quite easy – you just squeeze in the scheduler to all elements that can take it (in our case, Subject and Delay):

 
         public  static  IObservable <T> SampleResponsive<T>(
                 this  IObservable <T> source, TimeSpan  delay, IScheduler  scheduler = null )
         {
             return  source.Publish(src =>
             {
                 var  fire = new  Subject <T>(scheduler ?? Scheduler .Immediate);
 
                 var  whenCanFire = fire
                     .Delay(delay, scheduler ?? Scheduler .TaskPool)
                     .StartWith(default (T));
 
                 var  subscription = src
                     .CombineVeryLatest(whenCanFire, (x, flag) => x)
                     .Subscribe(fire);
 
                 return  fire.Finally(subscription.Dispose);
             });
         }
  

It is good to have .Net4 default parameter as the scheduler – but it makes you responsible for providing the right defaults when the parameter is missing. You might notice that the default scheduler used above is different for the Subject and for the Delay. Of course you can make separate statements for default and non-default, but your code would look ugly:

 
                 var  whenCanFire = scheduler == null 
                     ? fire.Delay(delay).StartWith(default (T))
                     : fire.Delay(delay, scheduler).StartWith(default (T));
  

Another better way is to use Reflector and look at the Reactive libraries IL code to find out what defaults different functions use. Here is a little “cheat sheet” showing RX functions and objects grouped by their default scheduler:

Scheduler.ThreadPool: (you can use also Scheduler.TaskPool as a default parameter for these)

Delay

BufferWithTime

GenerateWithTime

Interval

Sample

Start

Throttle

TimeInterval

TimeOut

Timer

TimeStamp

ToAsync

Scheduler.CurrentThread:

Generate

Repeat

Replay

Subscribe (to enumerable)

ToObservable

ReplaySubject constructor

Scheduler.Immediate:

Merge

Prune

Publish

Return

StartWith

Throw

Subject constructor

BehaviorSubject constructor

AsyncSubject constructor

 

Now our extension is ready. Here’s the code using it:

 
             _whenItemSelected
                 .SampleResponsive(TimeSpan .FromSeconds(0.4), Scheduler .TaskPool)
                 .Do(_ => ReadDetailData())
                 .Subscribe(_ => InvokePropertyChanged("Details" ));
  

Now it works as planned – you can navigate fast and smooth, and details are displayed after a minimal delay if you’re changing selection once. In the next article in the series, we will look at the code for CombineVeryLatest.

Advertisements

December 13, 2010 Posted by | Uncategorized | 1 Comment