Reactive UI Programming Part 1 – Converting Between KO and RxJs
Brandon Wallace

This is Part 1 of a series of articles on Reactive UI Programming with Knockout and Reactive Extensions for JavaScript. You can read my introduction here.

When working with RxJs and Knockout, there will be times when you have an Rx Observable and need a Knockout observable so you can bind it to a view. And there will be times when you have a Knockout observable and need an Rx Observable so you can merge several event sources or some other type of logic that is more easily solved by the RxJs linq operators.

There’s a small project on GitHub called Knockout-Rx which is a quick example of how you can convert between RxJs and Knockout. This example solution isn’t quite sufficient for everyday use, especially in a complex application with many loosely couple components where a piece of code can’t be sure exactly where a given observable has come from.

In this post I’ll walk through the process of creating two methods to easily move back and forth between the 2 libraries. If you don’t want to read the explanations and just want the code, you can grab it, along with some unit tests from this jsFiddle.

Knockout.toRx

I’ll start with converting from a Knockout object to an Rx Observable since this is a simpler task. The basic idea is simple. Given a ko.subscribable, ko.observable, or ko.computed, I want to call a function to create an Rx.Observable which will signal anytime the ko object takes on a new value. I have found that, when I convert a ko.observable or ko.computed, sometimes I want the resulting Rx Observable sequence to start with the current value of the ko object–but only if the value is not undefined. Other times, I don’t want the current value. So I’d like this function to accommodate both behaviors.

So, we start by defining the function as a member of ko.subscribable.fn:

ko.subscribable.fn.toRx = function (startWithCurrentValue) {    var source = this;    return Rx.Observable.createWithDisposable(function (observer) { /* ... */ });};

ko.subscribable.fn contains the set of functions that are available to all ko.subscribable instances. Since ko.observable is also a ko.subscribable and ko.computed is also a ko.observable, our toRx method will be available as a member method of every knockout object we create.

I could have declared toRx as a Knockout Extender, but I’m not terribly fond of the syntax. I much prefer var r = someKOObservable.toRx(); over var r = someKOObservable.extend({ toRx: false });. It just seems like a step removed from regular JavaScript. Generally speaking, Knockout was not written with JavaScript prototype inheritance in mind. It does not use prototypes to provide member methods to its objects, and instead copies the members from the ‘fn’ object to each new instance of a ko object. This leads to an n-squared problem – as the number of member methods grows, the time spent setting up the methods on each new ko observable instance grows. If you have several hundred methods on your “prototype” and are making 1000’s of instances of observables, then this can start to be a performance problem. The Knockout project implemented these extenders as a way to solve that problem. But it really feels like it is just working around the problem with the original decision to not leverage the JavaScript prototype. Thus I just added my method to the fn object.

When toRx is called, this will be the source ko object we want to turn into an Rx Observable. We store this in a source variable and then we use Rx.Observable.createWithDisposable() to construct and return an Rx Observable.

Within the subscribe function that Rx.Observable.createWithDisposable(), observer is the Rx.Observer that is subscribing to our observable. Whenever our source produces a new value, we want to call observer.onNext to notify the observer.

var onNext = observer.onNext.bind(observer),

I’m using Function.bind to define a function which takes a value and calls observer.onNext with that value.

subscription = source.subscribe(onNext),

I subscribe to our knockout source and pass our onNext function as the subscription callback. Now, whenever source gets a new value, our observer will get notified. I store the subscribe result in a subscription variable. We can call dispose on this when we want to unsubscribe fromr our source.

if (startWithCurrentValue && ko.isObservable(source)) {    currentValue = source();     if (currentValue !== undefined) {        onNext(currentValue);    }}

I check if the caller has requested that the sequence start with the current value of the source object, and also check that the source object is a ko.observable (ko.subscribables do not have a current value and so the startWithCurrentValue option does not apply to them). If we should send the current value, I get the current value and, if it is not undefined, tell the observer about it.

return subscription;

Finally, I return our subscription variable. Rx.Observable.createWithDisposable()expects us to return a Disposable which it can dispose of when the observer unsubscribes from the Rx Observable. It turns out the subscribe call to our knockout source is exactly what we need.

Here’s the complete method:

+ expand source

Rx.toKO

Now we will talk about a function which will create a Knockout object that updates in response to an Rx Observable. On the surface, this doesn’t sound any harder than going from KO to RX. But it turns out there are a number of subtleties that need to be considered.

What kind of KO object should be produce?

We only have 3 to choose from…

ko.subscribable

Well, technically, ko.subscribables are the objects which most closely map to an Rx.Observable and we could easily create one that signaled whenever our source Rx.Observable signaled. But should we? It turns out, this just isn’t very useful. The primary reason to turn an Rx Observable into a KO object is so that you can bind it to a view, or use it in a computed. ko.subscribables are not really useful for that.

ko.observable

This seems like it could certainly work. This is the target chosen by the Knockout-Rx project I mentioned earlier. Let’s see what toKO would look like if we chose ko.observable:

Rx.Observable.prototype.toKO = function () {    var source = this,        value = ko.observable(),        subscription = source.subscribe(value);    value.dispose = subscription.dispose.bind(subscription);    return value;}

So, this *could* work. But I have some issues with it:

And so that leaves us with…

ko.computed

Computeds offer ways to solve my 2 complaints. We can use the deferEvaluationoption to prevent our read function from being called until something actually reads us. And we can wait to subscribe to our source Observable until the read function is called the first time. We can also create the computed without a write function, which will make it a read only observable (ko.isWriteableObservable() will return false).

Function definition

Rx.Observable.prototype.toKO = function () {

Since RxJs embraces the JavaScript prototype, we can just add our method to Rx.Observable.prototype.

var source = this,    value = ko.observable(),

source is the Rx Observable instance which needs to be converted to KO.value will be the ko.observable which will hold the most recent value seen by our source. This will be the value returned by our computed.

subscription = new Rx.SingleAssignmentDisposable(),

This will eventually hold the disposable subscription to our source. We use this so that we can have something we can dispose of if the computed is disposed before we subscribe to source.

computed = ko.computed({    read: function () {        if (!subscription.disposable()) {

This is the start of our read function of our computed. The first time this method is called, our subscription variable will not yet have had a disposable assigned to it, and so we know we have not yet subscribed to our source.

    ko.computed(function () {        subscription.disposable(source.subscribe(value));    }).dispose();}

Here is where we actually subscribe to our source and assign the disposable subscription to our subscription disposable. We pass in our value ko.observable as the subscription callback. Rx Observables call this callback with a single parameters which is the value that was observed. This will effectively write the new value into our value observable.

Why do we wrap the entire call within a new ko.computed which we immediately dispose of? This is to prevent our computed from accidentally subscribing to any ko observables that happen to get evaluated during our call to source.subscribe. We have no idea our source observable is or how it has implemented its subscribe method. What if our source observable reads some ko observables when we subscribe to it? If it does, then through the magical knockout dependency detection mechanisms, our computed will detect those evaluations and take a dependency upon those observables. We don’t want that. By wrapping the call within a new computed, we cause the new computed to take the dependency instead of our computed. Once the call completes, we have what we want: a subscription to the source observable. We now dispose of the new computed we just made to release any dependencies it took.

    return value();},

Now that we’ve subscribed to the source, we just return our value observable. Knockout will setup a dependency so that everytime value changes our computed will produce the new value also. And we’ve subscribed our value to our source. So this means when our source produces a new value, our computed will produce that value also.

    deferEvaluation: true}),

Here we set the deferEvaluation option so that the read method is not called until someone actually tries to read our computed.

    dispose = computed.dispose; computed.dispose = function () {    subscription.dispose();    dispose.apply(this, arguments);};

Next we replaced the dispose method that our computed observable has with a new one that will dispose of the subscription to our source before calling the original dispose method. This way, when someone disposes of our computed, we will unsubscribe from the source observable.

    return computed;};

We now have a read only computed that will not subscribe to the source until it is necessary.

Here’s the complete method:

+ expand source

Conclusion

These two functions are all that is required to make Knockout and RxJs play nicely together. Now we can construct powerful observable chains that are composed of a mixture of Knockout observables and computeds and Rx observables. These functions are essential ingredients needed by my future code examples.

These functions, along with some unit tests, are available in this jsFiddle.

Next post I’ll start showing some actual usage examples.

RECENT POSTS FROM
THIS AUTHOR