Handling Out of Order Events with Spark Streaming
So you've decided to use Apache Spark Streaming to update your real time analytics system. It's in charge of handling events being published by upstream services and collating them into a database. However, with any asynchronous system, it's possible that events arrive in a different order than the order they were published. This could be due to network outages, overloaded systems, or a variety of other causes. Regardless of the reason, how do you ensure that asynchronous events with dependent data are processed in the order you expect? For instance, what if one event contained a foreign key to data in another event? You would need to ensure that the referenced event was persisted first. In this blog post, we'll explore how we can handle this scenario using Spark Streaming.
When using Spark Streaming, messages are divided into batches using a provided batch duration configuration. If you specify a batch duration of 10 seconds, all messages received by spark within that 10 seconds will be converted into a batch to be processed by your streaming job. For this example, let's assume you are handling 2 types of events, a CreateOrder event and an AddItem event. To be able to process an AddItem event, you'll need to ensure that a CreateOrder event has been processed and persisted.
Each batch of events will have 3 states.
- A CreateOrder event arrives with no corresponding AddItem events in the batch
- An AddItem event arrives with no corresponding CreateOrder event in the batch
- Both a CreateOrder and corresponding AddItem events arrive in the batch
To handle all 3 of these states, you'll need to do a JavaPairDStream::fullOuterJoin on the 2 streams.
Additionally, I’m going to map the values to the following POJO class for clarity.
The stream can be mapped to this class by following:
If you were to process each stream of events independently, you could not guarantee that if the 2 events arrived in the same batch that the CreateOrder event would be processed before the AddItem event. A race condition would exist where different Spark workers could handle the actual work of saving the individual streams to the database and process them at different rates. Now that you have the 2 streams joined, you can ensure that messages received within the same batch can be processed together in the correct order. However, you'll still need to handle the scenario where an AddItem message arrives in a batch prior to a corresponding CreateOrder event. In the example below, you'll notice that an AddItem event referencing order number 101 arrives in a batch before the CreateOrder event for that order number.
To handle this, you'll use the JavaPairDStream::mapWithState function. The mapWithState function allows you to process each tuple in a JavaPairDStream and store calculated values in memory by the key of the pair in the stream. It also requires the use of checkpointing to occasionally persist the state for fault tolerance. You’ll need to provide a StateSpec instance to mapWithState to define the behavior of how each pair will add, update, or remove data from state as well as what data to emit to be consumed downstream.
The way to create a StateSpec instance is to use the static StateSpec.function builder constructor. It requires an implementation of a Spark Function Interface. In our case we’ll create an implementation of the interface Function3<KeyType, ValueType, StateType, MappedType>
- KeyType represents the type of key, in our case the order number String
- ValueType represents the type of Value to be mapped, in our case the JoinedOrderEvent
- StateType represents the type of object to be stored in state, in our case it will be a List
- MappedType represents the type of object to be emitted each time this function is called, in our case we will use an Optional to represent the values to be persisted after each iteration. If the output is Optional::absent(), then nothing should be persisted.
This function will handle the following scenarios:
- If a CreateOrder event exists in this batch, join it against any AddOrderItem entries in the state, clear the state for that order number, and emit the combined events.
- If an AddItem event exists in this batch with no corresponding CreateOrder event, add the AddItem event to the state and emit Optional::absent()
Here is an abstract implementation of that interface:
Now you’ll include it in your stream processing:
Here is a GIF representing the process:
At this point, you've guaranteed that even if the events arrive out of order you can process them as though they came in serially.