So you've decided to use Apache Spark Streaming to update your real time analytics system. It's in charge of handling events being published by upstream services and collating them into a database. However, with any asynchronous system, it's possible that events arrive in a different order than the order they were published. This could be due to network outages, overloaded systems, or a variety of other causes. Regardless of the reason, how do you ensure that asynchronous events with dependent data are processed in the order you expect? For instance, what if one event contained a foreign key to data in another event? You would need to ensure that the referenced event was persisted first. In this blog post, we'll explore how we can handle this scenario using Spark Streaming.
When using Spark Streaming, messages are divided into batches using a provided batch duration configuration. If you specify a batch duration of 10 seconds, all messages received by spark within that 10 seconds will be converted into a batch to be processed by your streaming job. For this example, let's assume you are handling 2 types of events, a CreateOrder event and an AddItem event. To be able to process an AddItem event, you'll need to ensure that a CreateOrder event has been processed and persisted.
Each batch of events will have 3 states.
To handle all 3 of these states, you'll need to do a JavaPairDStream::fullOuterJoin on the 2 streams.
Additionally, I’m going to map the values to the following POJO class for clarity.
The stream can be mapped to this class by following:
If you were to process each stream of events independently, you could not guarantee that if the 2 events arrived in the same batch that the CreateOrder event would be processed before the AddItem event. A race condition would exist where different Spark workers could handle the actual work of saving the individual streams to the database and process them at different rates. Now that you have the 2 streams joined, you can ensure that messages received within the same batch can be processed together in the correct order. However, you'll still need to handle the scenario where an AddItem message arrives in a batch prior to a corresponding CreateOrder event. In the example below, you'll notice that an AddItem event referencing order number 101 arrives in a batch before the CreateOrder event for that order number.
To handle this, you'll use the JavaPairDStream::mapWithState function. The mapWithState function allows you to process each tuple in a JavaPairDStream and store calculated values in memory by the key of the pair in the stream. It also requires the use of checkpointing to occasionally persist the state for fault tolerance. You’ll need to provide a StateSpec instance to mapWithState to define the behavior of how each pair will add, update, or remove data from state as well as what data to emit to be consumed downstream.
The way to create a StateSpec instance is to use the static StateSpec.function builder constructor. It requires an implementation of a Spark Function Interface. In our case we’ll create an implementation of the interface Function3<KeyType, ValueType, StateType, MappedType>
This function will handle the following scenarios:
Here is an abstract implementation of that interface:
Now you’ll include it in your stream processing:
Here is a GIF representing the process:
At this point, you've guaranteed that even if the events arrive out of order you can process them as though they came in serially.