Recently I’ve come across the need to use an ambient logical context that would automatically flow with my program logic as it jumps from thread to thread. The .NET Framework has support for this when you are working with Threads and Tasks. It is not well documented, but it Just Works…mostly.
However, my application is a fusion of Rx.Net, Tasks, and Threads and there is no information readily available on what support (if any) Rx has for this facility. This post attempts to rectify that.
Note: All of my work was done on Windows 7 x64, .NET Framework 4.5, Rx.Net 2.2.4.
In the server application I am working on, clients connect to the server via a WebSocket and then issue commands and queries through that socket. Each client connects on behalf of a specific user and so any requests from that socket are run on behalf of that user. I have the following requirements:
When writing synchronous code, one usually solves this problem by putting your contextual data in thread-local storage. But this doesn’t really suffice when writing asynchronous code. Asynchronous code often hops threads and thread-local storage doesn’t hop with it.
Ever wonder how your Thread.CurrentPrincipal stays correct as your Tasks jump from thread to thread? It turns out that the .NET Framework has a facility for storing ambient contextual information that should follow the execution flow across threads. It is called the ExecutionContext and it is not well documented. But it is this facility that is transparently flowing your context from thread to thread to follow your logical execution context. Stephen Toub has a good blog article describing it.
One of the things contained by the ExecutionContext is the LogicalCallContext. This is essentially a property bag of random ambient contextual data that should flow with logical execution paths. It supports stack-like scoping of data (so that a child task can change contextual data that will only be seen by that child and its children–the parent task will not see the changes). Stephen Cleary wrote a detailed article about it.
So it seems like LogicalCallContext is the opportune place to store my ambient context. It flows with Async/Await and TPL. It flows with threads. But does it flow with IObservables? Since I can find no mention of it anywhere, I suspect it doesn’t…
I’d like to be able to write code something like this:
For the rest of this blog, I will be using this class to read/set my contextual data within the LogicalCallContext. To keep things simple, we will just use a string as our contextual data. There’s no reason we couldn’t use an immutable RequestContext object instead.
Here is an example nunit test that shows how the context is saved when we await a task and restored when our continuation runs. It also shows how the child scope can be modified without affecting the calling scope.
On .NET 4.5, this unit test passes. We’ve just verified what Stephen Cleary wrote in his blog.
So how does Rx behave?
Let’s start with this simple function, which will observe an observable and yield the results as a task.
What value should this Task produce, assuming that the source Observable produces 3 items then completes?
As it turns out, the answer is: it depends on whether source is hot or cold!
What I am wanting and expecting is that the Observables work similar to Tasks. With TPL, we write code which runs as a continuation when the task completes. This continuation code observes the task. The continuation code runs in its own logical context that is independent of whatever context the task is running in.
I think of this as the Observer Context. When an observer subscribes to an observable (with or without adding extra operators like Select), the observer’s callbacks execute with the observer’s logical context.
Thus, I’d expect to see the following results:
For this unit test, I use Observable.Interval(…).Take(3) as my cold observable.
We get exactly what we want: “global”, “observe”, “observe”, “observe”, “observe”.
Here, I’ve modified the unit test to pass a Subject in as a hot observable. The unit test then pumps 3 values through the subject (using Observable.Interval — we just proved that it will capture our context).
So in this version, our “global” context is pushing values through the hot observable to be observed by our observer.
In this version, we get: “global”, “global”, “global”, “global”, “observe”.
Our observer’s callbacks were executed with the global ExecutionContext, not our local observer context. More specifically, the callbacks were executed in the context in which the source data was produced.
How can the same Observe method produce different results depending upon the temperature of the observable?
Answering that question requires understanding how the Subscribe call really works.
When we Subscribe to an observable, we create a standard synchronous call stack that walks up the observable chain to its source. This entire code execution path executes within the Observer Context, just like any other synchronous call stack. (What about SubscribeOn? That does indeed make the Subscribe process asynchronous, but most of the Rx Schedulers use some standard .NET facility to introduce the asynchronicity and thus the ExecutionContext flows through the same as if it were synchronous).
For a cold observable, the Subscribe callstack eventually (usually) triggers work which produces calls to OnNext, OnError, OnCompleted (perhaps it spins up a Task to download a file, or a long compuation, or just loops through an array). This work, whether it be a Task or a Thread or something scheduled with an Rx Scheduler, is initiated while in the context running during the Subscribe call (i.e. the Observer Context). So whenever it calls one of the On* methods, this call is also in the same context (since context flows). Now starts the synchronous call stack of On* calls as the data works it back down the observable chain to the observer. All of that code runs in the observer context.
Thus, for a cold observable, the Observer Context flows up through the chain during subscribe, gets captured by the cold observable’s side effects, then flows back down to the observer during each notification and we basically get the desired behavior.
But for hot observables, the Subscribe call eventually ends with nothing more than adding the observer to a list of observers. No side effects are triggered. No Tasks are spawned. No jobs queued. No On* notifications scheduled. Thus the observer context is never captured by the ultimate notification source. When that notification source eventually sends down a notification, the chain of On* calls will run in whatever context produced the notification. This is why, in our example, the observer callbacks ran in the “global” context.
Thus, for a hot observable, the Notification Source context flows down to the observer.
It sort of makes sense. Hot observables typically broadcast their notifications to a group of observers, each of which will have its own context. How could the single source be running in multiple observer contexts?
Now, imagine a scenario like this:
12var source = Observable.Merge(cold1, hot1, hot2, hot3, hot4, cold2);await Observe(source);
We have 4 different hot observables, each possibly running in a different context, as well as 2 cold observables that will be running in the observer’s context. They will arrive in any order. In this example, each notification will likely be running in a different context. Oh my.
In most cases, Rx could simply take care of this for us so that ExecutionContext just works. Most of the rest of .NET asynchronous API’s have taken this approach. It would make things less surprising. This is a difficult topic and most developers don’t need to understand it. Just having it work intuitively would save everyone a lot of heart ache.
Imagine that when the Subscribe call reached the source hot observable. When the code runs which adds the new observer to the list of observers, it is running in that observer’s context. What if the observable captured that ExecutionContext and put it in with the observer? Then, whenever the observable was broadcasting a new notification, as it iterated through the list of observers, it issued each notification within that observer’s captured ExecutionContext?
Now, the notification source would run under its own context, but whenever it starts the call chain to notify an observer, it would switch to that observer’s context and issue all of the On* notification calls with that context.
Is there a way to do this without changing the way Rx works?
Yes. What if we write our own Rx operator which can be used to capture the observer context during subscription and use that context during notifications?
Here is what I came up with:
So, this is an extension method that can be used with any observable to capture the observer context. All notifications issued from that point in the chain will be called in the ExecutionContext in force when the subscribe call reached that part of the chain.
Here we use Observable.Create to let us hook into the subscription process.
Here we capture the current ExecutionContext. We then check if we did not capture a context (Capture will return null if execution context flow has been suppressed). In this case, we just short-circuit and subscribe the observer to the source without any additional work.
Here we create a CompositeDisposable to hold both our captured context and the subscription to source.
Whenever we are notified with a new value, we create a copy of the context that we can use to issue this notification (ExecutionContext instances are only usable once…because they might be mutated during the call and we do not want to capture those mutations). We use the Run method to call observer.OnNext.
The error notification works similarly. However, we can take advantage of the fact that there will be no notifications after this, so we can use our original captured context instead of making yet another copy.
And the same for the completion notification callback.
We can modify our previous Hot Observable unit test to call our new extension method so that the hot observable captures the observer context:
And now we get the desired results: “global”,”observe”,”observe”,”observe”,”observe”.
I personally think so. This routine only works if you remember to call it on a hot observable. Which means developers will forget and it will be hard to debug.
Also, since ExecutionContext contains the Security Context (which contains the CurrentPrincipal as well as any Security Permissions), it is a security risk to let the context flow from the source to the observer. The source may be using special security privileges or identity impersonation to gain access to secure resources needed to produce the notifications. Letting random low-privileged observers submit callbacks which run under the elevated permissions is just asking for trouble.
Here’s an example service that uses impersonation to access the source data, but uses observer context flow to ensure observer’s do not get to run their code in this privileged context.
Most of the schedulers just work because they use regular .Net async API’s and so the context is captured correctly. I tested ObserveOn, SubscribeOn with TaskPoolScheduler, ThreadPoolScheduler, NewThreadScheduler. None of those combinations had any effect on my results. I did not test any of the other schedulers
I don’t expect ImmediateScheduler to cause any troubles.
I think CurrentThreadScheduler potentially has issues due to its trampoline. If you schedule an action while an action is already running (possibly under a different context), then your action just gets put in a work queue. When that other action completes, it takes the next action from the queue and runs it, which will cause your action to run under the same context that the other action was running under. However I’ve not worked out if this is an actual issue in practice.
I’m not sure about the UI Dispatcher scheduler variants.
Observable.FromEvent* methods act like hot observables and so need the FlowObserverExecutionContext treatment. All of the Subjects are effectively hot. All Connectable Observables are hot. Most of the other observables, like Range, Generate, Timer, Interval, etc are cold.
I’m sure there’s a few gotchas still hiding in the woodshed. Please let me know if you find any.
So, now we know how context flows with async/await TPL. And I’ve shown how we can make Rx play nicely with this flow. So now I have the tools necessary to get my ambient RequestContext flowing throughout my system.