ZooKeeper Usage 5: C# IObservable
Sebastian Good

This is a multi-part series:

1. ephemeral nodes, 2. observable<zookeeper>, 3. where is zookeeper?, 4. .net api and tasks, 5. c# iobservable

One of ZooKeeper’s nice features is the ability to set up a watch on a node, and be updated whenever it changes. The Java API (usable from .NET) is straightforward, if a bit verbose.

// on ZooKeeper clientvoid getData(  String path,  Watcher watcher,  AsyncCallback.DataCallback cb,  Object ctx) // interface DataCallbackvoid processResult(  int rc, String path, Object ctx,   byte[] data, Stat stat)  // interface Watchervoid process(WatchedEvent event)

You can getData for a node (or just check that it exists), and register a Watcher that will be pinged if the node changes.

Like many remote observables, ZooKeeper only gives you one update. If you want to get more updates, you have to re-watch the node. (Oracle continuous query notifications work the same way.) It’s a defensive mechanism so that the server isn’t swamped with lots of callback registrations if clients go missing.

As before, we wanted to translate this very imperative, fairly verbose, model into a more composable model. We want a query that looks more like

IObservable<Maybe<byte[]>> WatchData(string path)

The IObservable pattern returns a sequence of results that may terminate in success or with an error.

What are the right semantics?

As with our previous getData example, there are some fairly obvious semantics: we should raise Maybe.Empty if the node does not exist (or is deleted), and Maybe.Return<byte[]> if the node comes into existence or changes values. But what about starting and ending the stream?

We decided there was no obvious reason the observable stream should end, since nodes can be deleted and recreated at will. We can leave that decision to the client of this API, who could TakeUntil(m => m.IsEmpty) if she felt like it, or unsubscribe when the user no longer cared to observe the value (such as a web socket watching a progress bar).

How should the stream start? A typical observable stream only emits values when something changes, such as a new stock quote, or key being pressed. But it would be frustrating to have to first do a getData call, and follow it with a watchData if you needed to know the current value of something and watch for changes. So we decided that the stream would immediately return with the current value of getData, e.g. empty or a byte array, then follow it with the result of changes.

Implementation

We have lots to do at once

Let’s do it all from one object, and kick everything off in the constructor, as before

class ZookeeperNodeDataChangeWatcher :  Watcher,   org.apache.zookeeper.AsyncCallback.DataCallback,  org.apache.zookeeper.AsyncCallback.StatCallback,  IDisposable ZookeeperNodeDataChangeWatcher(  IObserver<Maybe<byte[]>> observer,   string nodePath,   org.apache.zookeeper.ZooKeeper zooKeeper) {    _observer = observer;    _nodePath = nodePath;    _zookeeper = zooKeeper;    /*bool?*/_nodeExists = null;    GetData();}

Just as with Tasks, there’s no natural IObservable in the ZooKeeper API that we can work with. Instead, we implement our own. In general I’ve noticed that directly using an ISubject — tempting in this case — is an anti-pattern. It usually results in a bunch of race conditions and lifetime management issues that are complicated. It’s best if you just take an IObserver and call methods on it directly, managing the IObservable implementation by using the Rx framework’s Observable.Create. Assuming the watcher works correctly, you use it quite simply, like this:

IObservable<Maybe<byte[]>> WatchData(string path) {  return Observable    .Create<Maybe<byte[]>>(observer =>         new ZookeeperNodeDataChangeWatcher(observer, path, _zooKeeper)); }

Every time we call WatchData, we return an object which can start a query. The query won’t actually start until someone subscribes to the observable, triggering the call to the lambda supplied, which creates the ZookeeperNodeDataChangeWatcher we’ll describe in a bit.

Managing the state machine

We want to both read data and register for watches. The getData will get data and register a watch. The exists call simply establishes a watch on a node — it doesn’t matter why we watch it, we will get events whenever it changes. As we did previously, we pass this as the callback object for both calls, which will be used for both the asynchronous result and the watch updates.

void GetData() {  try {    _zookeeper.getData(_nodePath, this, this, null);  } catch (Exception e)  {    _observer.OnError(e);  }} void WatchAgain() {  try {    _zookeeper.exists(_nodePath, this, this, null);  } catch (Exception e) {    _observer.OnError(e);  }}

It’s a slightly tedious state machine to manage, but we just soldier through the cases. First, this callback comes from our first async getData, i.e. as soon as we start.

publicvoid processResult(  int i, string str, object obj,   byte[] barr, Stat s) {  int returnCode = i;  byte[] data = barr;  string path = str;   if (IsDisposed) { return; }  elseif (returnCode == KeeperException.Code.OK.intValue())  {    // node exists; notify observer of data    // API has already registered a watch    _nodeExists = true;    _observer.OnNext(Maybe.Return(data));  }  elseif (returnCode == KeeperException.Code.NONODE.intValue())  {    // node doesn't exist. have to explicitly add a watch    _nodeExists = false;    WatchAgain();    _observer.OnNext(Maybe.Empty<byte[]>());  } else {    // did we lose our connection?    _observer.OnError(      KeeperException.create(KeeperException.Code.get(i)));  }}

Now whether we’ve found data or an empty node, we are subscribed for further events. Subscribing for further events was done by calling the exists API, which also needs an asynchronous callback

void processResult(int i, string str, object obj, Stat s) {  if (IsDisposed) { return; }  int returnCode = i;  if (returnCode == KeeperException.Code.OK.intValue() ||      returnCode == KeeperException.Code.NONODE.intValue() ||      returnCode == KeeperException.Code.NODEEXISTS.intValue())   {    // in the case that a node has come into existence,     // but it didn't previously exist, we want to trigger     // a call to GetData(), just as we did when we constructed    // the observer.    bool nodeExistsNow = s != null;    bool? oldExists = _nodeExists;    _nodeExists = nodeExistsNow;    if (nodeExistsNow && nodeExistsNow != oldExists) { GetData(); }    return;  } else {    _observer.OnError(KeeperException.create(      KeeperException.Code.get(returnCode)));  }}

And finally, after doing all that work to sign up for watches, here’s where we’re told something has happened:

void process(org.apache.zookeeper.WatchedEvent e) {  if (IsDisposed) { return; }  if (e == null || !_nodePath == e.getPath()) {    // we were notified about something we weren't watching    // sign up again    WatchAgain();    return; }   var type = e.getType();  if (type == Watcher.Event.EventType.NodeCreated ||       type == Watcher.Event.EventType.NodeDataChanged)  {    // the node has some new info; let's go ask for it    GetData();  }  elseif (type == Watcher.Event.EventType.NodeDeleted) {    // the node was deleted, push out that fact, but     // then wait for changes; it might come again    _nodeExists = false;    _observer.OnNext(Maybe.Empty<byte[]>());    WatchAgain();  } else {    // other events are harmless; we'll watch some more    WatchAgain();  }}

While none of this code is particularly complicated, it comes in so many varieties in the underlying ZK API, it’s easy to get confused and implement lots of callbacks. By registering all callbacks on one object and re-subscribing for watches on new events, we roll up enormous complexity to a very simple and composable interface. In a sense, the ZK API has spilled its guts into its API, and we’ve tried to stitch them back together.

Disposable?

What about the various calls to IsDisposed in the code above? What was that about? Observable.Create expects to receive a IDisposable object to do its work. The created Observable will call the Dispose method when the caller unsubscribes. This allows us to remove the watch and stop taking up server resources. As you see above, this is straightforward to do in some cases: we just don’t register for another watch when we get a watch update if we have been disposed. But what if a watch is already outstanding and we haven’t received a callback yet?

There’s no API to unregister interest in a watch. It’s a known bug/feature request, but wasn’t released when we wrote the code above. This means excessive querying of nodes with this feature will result in a memory leak in the server and client. We managed that with some IOC Container trickery, which we should really describe sometime as well.

Thread safety?

You’ll also notice there’s no code to manage multi-threaded access to the member variables of our watcher, or calls to the supplied IObserver. This lovely state of affairs is due to the design choices we and the Zookeeper team made:

Style Matters

Our C# programmers found this interface much easier to work with than the primitives exposed by the Java API, mostly because it rolled up common behaviors in composable monads like Task and IObservable. This is one of the reasons a simple SWIG-like generated interface doesn’t always do the trick when using APIs from a different language. The verbose, callable-object paradigm that ZK found convenient to expose to C and Java programmers wasn’t very like modern C# programmers expect, and made it hard to re-use the massive leverage available in libraries like the TPL and Reactive Extensions. It takes extra work to make language-specific bindings in this way, but the work is worth it.

Someone in the node.js ecosystem did a similar wrapper (zkplus) for ZooKeeper there. He put it best:

If you haven’t worked with the ZooKeeper API directly, and question the value of this project, do so, then get back to me.
RECENT POSTS FROM
THIS AUTHOR