Reactive Extensions and ExecutionContext
Brandon Wallace

Recently I’ve come across the need to use an ambient logical context that would automatically flow with my program logic as it jumps from thread to thread.  The .NET Framework has support for this when you are working with Threads and Tasks.  It is not well documented, but it Just Works…mostly.

However, my application is a fusion of Rx.Net, Tasks, and Threads and there is no information readily available on what support (if any) Rx has for this facility.  This post attempts to rectify that.

Note: All of my work was done on Windows 7 x64, .NET Framework 4.5, Rx.Net 2.2.4.

The Problem

In the server application I am working on, clients connect to the server via a WebSocket and then issue commands and queries through that socket.  Each client connects on behalf of a specific user and so any requests from that socket are run on behalf of that user.  I have the following requirements:

 

 LogicalCallContext & ExecutionContext

When writing synchronous code, one usually solves this problem by putting your contextual data in thread-local storage.  But this doesn’t really suffice when writing asynchronous code.  Asynchronous code often hops threads and thread-local storage doesn’t hop with it.

Ever wonder how your Thread.CurrentPrincipal stays correct as your Tasks jump from thread to thread? It turns out that the .NET Framework has a facility for storing ambient contextual information that should follow the execution flow across threads.  It is called the ExecutionContext and it is not well documented.  But it is this facility that is transparently flowing your context from thread to thread to follow your logical execution context.  Stephen Toub has a good blog article describing it.

One of the things contained by the ExecutionContext is the LogicalCallContext.  This is essentially a property bag of random ambient contextual data that should flow with logical execution paths.  It supports stack-like scoping of data (so that a child task can change contextual data that will only be seen by that child and its children–the parent task will not see the changes).  Stephen Cleary wrote a detailed article about it.

So it seems like LogicalCallContext is the opportune place to store my ambient context.  It flows with Async/Await and TPL.  It flows with threads.  But does it flow with IObservables?  Since I can find no mention of it anywhere, I suspect it doesn’t…

My Goal

I’d like to be able to write code something like this:

private IDisposable OnNewConnection(IClient connection)
{
  Context.SetUser(GetUser(connection));
  Context.SetConnectionId(GetConnectionId(connection));
   // Context.User and Context.ConnectionId
  // are now available for the rest of the code in this
  // method, including the lambdas.
  // It is even available as context for any logging calls
  // that occur from *within* the IClient code which
  // is handling the socket communications
  return connection.Subscribe(request =>
  {
    Context.SetRequestId(GetRequestId(request));
     // RequestId is now available from here down.
    // which means the biLayer has access to it.
    _biLayer.Handle(request);
  });
}

MyContext

For the rest of this blog, I will be using this class to read/set my contextual data within the LogicalCallContext.  To keep things simple, we will just use a string as our contextual data. There’s no reason we couldn’t use an immutable RequestContext object instead.

public static class MyContext
{
  private static readonly string _KEY = "my-context";
   public static string Value
  {
    get { return CallContext.LogicalGetData(_KEY) asstring ?? "not set"; }
    set { CallContext.LogicalSetData(_KEY, value); }
  }
}
 // Usage
MyContext.Value = "some value";
Console.WriteLine("context={0}", MyContext.Value);

Async/Await Example

Here is an example nunit test that shows how the context is saved when we await a task and restored when our continuation runs. It also shows how the child scope can be modified without affecting the calling scope.

// Captures the current context value
// then changes the context to a new value
// then waits for the job to finish
// then returns a string showing all the values received.
private async Task<string> Observe(Task<string> job)
{
  var initialValue = MyContext.Value; // should be "global"
  MyContext.Value = "observe"; // callers should not see this
  var jobResult = await job; // should return "global" since the job was started outside our scope
  var finalValue = MyContext.Value; // should be "observe"
  return string.Join("-", initialValue, jobResult, finalValue);
}
 // Waits for delay to finish, then returns the context value
private async Task<string> GetContext(Task delay)
{
  await delay;
  return MyContext.Value;
}
 [Test]
public async Task ContextFlowsCorrectlyWithTasks()
{  MyContext.Value = "global";
  var delay = new TaskCompletionSource<int>();
  var result = Observe(GetContext(delay.Task));
  Assert.That(MyContext.Value, Is.EqualTo("global"), "Observe() did not change the outer context")
;  delay.TrySetResult(0); // trigger things to run.   Assert.That(await result, Is.EqualTo("global-global-observe"), "GetContext() returned outer context, and Observe() has its own private inner context that started off equal to the outer");  Assert.That(MyContext.Value, Is.EqualTo("global"), "Observe() did not change the outer context");}

On .NET 4.5, this unit test passes. We’ve just verified what Stephen Cleary wrote in his blog.

Reactive Extensions

So how does Rx behave?

An Example Observer

Let’s start with this simple function, which will observe an observable and yield the results as a task.

privateasync Task<string[]> Observe<T>(IObservable<T> source){  // capture the initial context data  // as an array for easy concatenation later  var initialContext = new[] { MyContext.Value };   // change the context data in our local scope  MyContext.Value = "observe";   // Each time a value arrives,  // replace it with whatever the current  // context is.  Collect these in  // an array  var results = await source.Select(_ => MyContext.Value).ToArray();   // Now see what our context is.  var finalContext = new[] { MyContext.Value };   // return it all.  return initialContext.Concat(results).Concat(finalContext).ToArray();}

What value should this Task produce, assuming that the source Observable produces 3 items then completes?

As it turns out, the answer is: it depends on whether source is hot or cold!

What I expected/wanted

What I am wanting and expecting is that the Observables work similar to Tasks. With TPL, we write code which runs as a continuation when the task completes. This continuation code observes the task. The continuation code runs in its own logical context that is independent of whatever context the task is running in.

I think of this as the Observer Context. When an observer subscribes to an observable (with or without adding extra operators like Select), the observer’s callbacks execute with the observer’s logical context.

Thus, I’d expect to see the following results:

What we get for Cold Observables

For this unit test, I use Observable.Interval(…).Take(3) as my cold observable.

[Test]publicasync Task ColdObservablesUseObserverContextForCallbacks(){  MyContext.Value = "global";  var cold = Observable.Interval(TimeSpan.FromMilliseconds(1)).Take(3);  var t = Observe(cold);  Assert.That(MyContext.Value, Is.EqualTo("global"), "Observe() did not change the outer context");  var result = await t;  Assert.That(result[0], Is.EqualTo("global"), "Observe() started out seeing global context");  Assert.That(result[4], Is.EqualTo("observe"), "observer set private inner context to different value");  Assert.That(result[1] + result[2] + result[3], Is.EqualTo("observeobserveobserve"), "observer callbacks ran under private inner context");  Assert.That(MyContext.Value, Is.EqualTo("global"), "Observe() did not change the outer context");}

We get exactly what we want: “global”, “observe”, “observe”, “observe”, “observe”.

What we get for Hot Observables

Here, I’ve modified the unit test to pass a Subject in as a hot observable. The unit test then pumps 3 values through the subject (using Observable.Interval — we just proved that it will capture our context).

So in this version, our “global” context is pushing values through the hot observable to be observed by our observer.

[Test]publicasync Task HotObservablesUseObserverContextForCallbacks(){  MyContext.Value = "global";  var hot = new Subject<long>();  var t = Observe(hot);  Assert.That(MyContext.Value, Is.EqualTo("global"), "Observe() did not change the outer context");  Observable.Interval(TimeSpan.FromMilliseconds(1)).Take(3).Subscribe(hot);  var result = await t;  Assert.That(result[0], Is.EqualTo("global"), "Observe() started out seeing global context");  Assert.That(result[4], Is.EqualTo("observe"), "observer set private inner context to different value");  Assert.That(result[1] + result[2] + result[3], Is.EqualTo("observeobserveobserve"), "observer callbacks ran under private inner context");  Assert.That(MyContext.Value, Is.EqualTo("global"), "Observe() did not change the outer context");}

In this version, we get: “global”, “global”, “global”, “global”, “observe”.

Our observer’s callbacks were executed with the global ExecutionContext, not our local observer context. More specifically, the callbacks were executed in the context in which the source data was produced.

What is going on here?

How can the same Observe method produce different results depending upon the temperature of the observable?

Answering that question requires understanding how the Subscribe call really works.

When we Subscribe to an observable, we create a standard synchronous call stack that walks up the observable chain to its source. This entire code execution path executes within the Observer Context, just like any other synchronous call stack. (What about SubscribeOn? That does indeed make the Subscribe process asynchronous, but most of the Rx Schedulers use some standard .NET facility to introduce the asynchronicity and thus the ExecutionContext flows through the same as if it were synchronous).

For a cold observable, the Subscribe callstack eventually (usually) triggers work which produces calls to OnNextOnErrorOnCompleted (perhaps it spins up a Task to download a file, or a long compuation, or just loops through an array). This work, whether it be a Task or a Thread or something scheduled with an Rx Scheduler, is initiated while in the context running during the Subscribe call (i.e. the Observer Context). So whenever it calls one of the On* methods, this call is also in the same context (since context flows). Now starts the synchronous call stack of On* calls as the data works it back down the observable chain to the observer. All of that code runs in the observer context.

Thus, for a cold observable, the Observer Context flows up through the chain during subscribe, gets captured by the cold observable’s side effects, then flows back down to the observer during each notification and we basically get the desired behavior.

But for hot observables, the Subscribe call eventually ends with nothing more than adding the observer to a list of observers. No side effects are triggered. No Tasks are spawned. No jobs queued. No On* notifications scheduled. Thus the observer context is never captured by the ultimate notification source. When that notification source eventually sends down a notification, the chain of On* calls will run in whatever context produced the notification. This is why, in our example, the observer callbacks ran in the “global” context.

Thus, for a hot observable, the Notification Source context flows down to the observer.

It sort of makes sense. Hot observables typically broadcast their notifications to a group of observers, each of which will have its own context. How could the single source be running in multiple observer contexts?

Now, imagine a scenario like this:

12var source = Observable.Merge(cold1, hot1, hot2, hot3, hot4, cold2);await Observe(source);

We have 4 different hot observables, each possibly running in a different context, as well as 2 cold observables that will be running in the observer’s context. They will arrive in any order. In this example, each notification will likely be running in a different context. Oh my.

Could Rx just implicitly handle this for us?

In most cases, Rx could simply take care of this for us so that ExecutionContext just works. Most of the rest of .NET asynchronous API’s have taken this approach. It would make things less surprising. This is a difficult topic and most developers don’t need to understand it. Just having it work intuitively would save everyone a lot of heart ache.

Imagine that when the Subscribe call reached the source hot observable. When the code runs which adds the new observer to the list of observers, it is running in that observer’s context. What if the observable captured that ExecutionContext and put it in with the observer? Then, whenever the observable was broadcasting a new notification, as it iterated through the list of observers, it issued each notification within that observer’s captured ExecutionContext?

Now, the notification source would run under its own context, but whenever it starts the call chain to notify an observer, it would switch to that observer’s context and issue all of the On* notification calls with that context.

FlowObserverExecutionContext()

Is there a way to do this without changing the way Rx works?

Yes. What if we write our own Rx operator which can be used to capture the observer context during subscription and use that context during notifications?

Here is what I came up with:

publicstatic IObservable<T> FlowObserverExecutionContext<T>(this IObservable<T> source){  return Observable.Create<T>(observer =>  {    // Capture the observer's execution context    var context = ExecutionContext.Capture();    if (context == null)    {      // Context flow is suppressed.      return source.Subscribe(observer);    }     try    {      var observerContext = context;      var subscription = new SingleAssignmentDisposable();      var disposables = new CompositeDisposable(subscription, observerContext);      subscription.Disposable = source.Subscribe(        value =>        {          // contexts are only usable once.  So create a copy for each onNext notification          using (var c = observerContext.CreateCopy())          {            // Run the notification with this context            ExecutionContext.Run(c, _ => observer.OnNext(value), null);          }        },        error =>        {          // OnError or OnComplete get called at most once, so we can use the original context copy          ExecutionContext.Run(observerContext, _ => observer.OnError(error), null);        },        () =>        {          // OnError or OnComplete get called at most once, so we can use the original context copy          ExecutionContext.Run(observerContext, o => ((IObserver<T>)o).OnCompleted(), observer);        });       context = null; // prevent it from being disposed in finally block below      return disposables;    }    finally    {        if (context != null)        {            context.Dispose();        }    }  });}

Explanation

So, this is an extension method that can be used with any observable to capture the observer context. All notifications issued from that point in the chain will be called in the ExecutionContext in force when the subscribe call reached that part of the chain.

return Observable.Create<T>(observer =>{

Here we use Observable.Create to let us hook into the subscription process.

// Capture the observer's execution contextvar context = ExecutionContext.Capture();if (context == null){  // Context flow is suppressed.  return source.Subscribe(observer);}

Here we capture the current ExecutionContext. We then check if we did not capture a context (Capture will return null if execution context flow has been suppressed). In this case, we just short-circuit and subscribe the observer to the source without any additional work.

var observerContext = context;var subscription = new SingleAssignmentDisposable();var disposables = new CompositeDisposable(subscription, observerContext);subscription.Disposable = source.Subscribe(

Here we create a CompositeDisposable to hold both our captured context and the subscription to source.

value =>{  // contexts are only usable once.  So create a copy for each onNext notification  using (var c = observerContext.CreateCopy())  {    // Run the notification with this context    ExecutionContext.Run(c, _ => observer.OnNext(value), null);  }},

Whenever we are notified with a new value, we create a copy of the context that we can use to issue this notification (ExecutionContext instances are only usable once…because they might be mutated during the call and we do not want to capture those mutations). We use the Run method to call observer.OnNext.

error =>{  // OnError or OnComplete get called at most once, so we can use the original context copy  ExecutionContext.Run(observerContext, _ => observer.OnError(error), null);},

The error notification works similarly. However, we can take advantage of the fact that there will be no notifications after this, so we can use our original captured context instead of making yet another copy.

() =>{  // OnError or OnComplete get called at most once, so we can use the original context copy  ExecutionContext.Run(observerContext, o => ((IObserver<T>)o).OnCompleted(), observer);});

And the same for the completion notification callback.

Unit Test

We can modify our previous Hot Observable unit test to call our new extension method so that the hot observable captures the observer context:

[Test]publicasync Task HotObservablesUseObserverContextForCallbacks(){  MyContext.Value = "global";  var hot = new Subject<long>();  var t = Observe(hot.FlowObserverExecutionContext());  Assert.That(MyContext.Value, Is.EqualTo("global"), "Observe() did not change the outer context");  Observable.Interval(TimeSpan.FromMilliseconds(1)).Take(3).Subscribe(hot);  var result = await t;  Assert.That(result[0], Is.EqualTo("global"), "Observe() started out seeing global context");  Assert.That(result[4], Is.EqualTo("observe"), "observer set private inner context to different value");  Assert.That(result[1] + result[2] + result[3], Is.EqualTo("observeobserveobserve"), "observer callbacks ran under private inner context");  Assert.That(MyContext.Value, Is.EqualTo("global"), "Observe() did not change the outer context");}

And now we get the desired results: “global”,”observe”,”observe”,”observe”,”observe”.

So, should Rx do this for us?

I personally think so. This routine only works if you remember to call it on a hot observable. Which means developers will forget and it will be hard to debug.

Also, since ExecutionContext contains the Security Context (which contains the CurrentPrincipal as well as any Security Permissions), it is a security risk to let the context flow from the source to the observer. The source may be using special security privileges or identity impersonation to gain access to secure resources needed to produce the notifications. Letting random low-privileged observers submit callbacks which run under the elevated permissions is just asking for trouble.

Here’s an example service that uses impersonation to access the source data, but uses observer context flow to ensure observer’s do not get to run their code in this privileged context.

publicclass MyService{  private IConnectableObservable<IEvent> _topic;   publicclass MyService(WindowsIdentity serviceAccount)  {    // use some privileged context    // to access the source data    serviceAccount.Impersonate();    IObservable<IEvent> eventSource = // ...;     // use Publish to broadcast the    // single source to multiple observers    _topic = eventSource.Publish();  }   internal IDisposable Start() { return _topic.Connect(); }    public IObservable<IEvent> Topic  {    get    {      // flow the observer context      // we do not want the      // low-privileged observers      // to have their code running      // in our high-privileged      // context.      // Then filter out any events      // the observer context is not      // allowed to see.      return _topic         .FlowObserverExecutionContext()         .Where(e => IsAllowedToSee(CurrentPrincipal, e));    }}

Any other Rx gotchas

Most of the schedulers just work because they use regular .Net async API’s and so the context is captured correctly. I tested ObserveOnSubscribeOn with TaskPoolSchedulerThreadPoolSchedulerNewThreadScheduler. None of those combinations had any effect on my results. I did not test any of the other schedulers

I don’t expect ImmediateScheduler to cause any troubles.

I think CurrentThreadScheduler potentially has issues due to its trampoline.  If you schedule an action while an action is already running (possibly under a different context), then your action just gets put in a work queue.  When that other action completes, it takes the next action from the queue and runs it, which will cause your action to run under the same context that the other action was running under.  However I’ve not worked out if this is an actual issue in practice.

I’m not sure about the UI Dispatcher scheduler variants.

Observable.FromEvent* methods act like hot observables and so need the FlowObserverExecutionContext treatment. All of the Subjects are effectively hot. All Connectable Observables are hot. Most of the other observables, like Range, Generate, Timer, Interval, etc are cold.

I’m sure there’s a few gotchas still hiding in the woodshed. Please let me know if you find any.

Conclusion

So, now we know how context flows with async/await TPL. And I’ve shown how we can make Rx play nicely with this flow. So now I have the tools necessary to get my ambient RequestContext flowing throughout my system.

RECENT POSTS FROM
THIS AUTHOR