Share
BLOG

Order Matters: Handling Message Sequencing in Spark

Tags:
Summary:
Messages coming into your Spark stream processor may not arrive in the order you expect. Learn how to handle the unexpected with Spark, Databricks, and JanusGraph, DataStax, Neo4j, or Microsoft Cosmos DB.

Handling Out of Order Events with Spark Streaming

So you've decided to use Apache Spark Streaming to update your real time analytics system. It's in charge of handling events being published by upstream services and collating them into a database. However, with any asynchronous system, it's possible that events arrive in a different order than the order they were published. This could be due to network outages, overloaded systems, or a variety of other causes. Regardless of the reason, how do you ensure that asynchronous events with dependent data are processed in the order you expect? For instance, what if one event contained a foreign key to data in another event? You would need to ensure that the referenced event was persisted first. In this blog post, we'll explore how we can handle this scenario using Spark Streaming.

When using Spark Streaming, messages are divided into batches using a provided batch duration configuration. If you specify a batch duration of 10 seconds, all messages received by spark within that 10 seconds will be converted into a batch to be processed by your streaming job. For this example, let's assume you are handling 2 types of events, a CreateOrder event and an AddItem event. To be able to process an AddItem event, you'll need to ensure that a CreateOrder event has been processed and persisted.

batched events

Each batch of events will have 3 states.

  • A CreateOrder event arrives with no corresponding AddItem events in the batch
  • An AddItem event arrives with no corresponding CreateOrder event in the batch
  • Both a CreateOrder and corresponding AddItem events arrive in the batch

To handle all 3 of these states, you'll need to do a JavaPairDStream::fullOuterJoin on the 2 streams.

JavaPairDStream<String, CreateOrder> createOrderStream = getCreateOrderPairStreamFromContext();  JavaPairDStream<String, List<AddOrderItem>> addItemStream = getAddItemPairStreamFromContext();  JavaPairDStream<String, Tuple2<Optional<CreateOrder>, Optional<List<AddOrderItem>>>> joinedStream =         createOrderStream.fullOuterJoin(addItemStream);

Additionally, I’m going to map the values to the following POJO class for clarity.

// JoinedOrderEvent.java package com.experoinc.demo;  import com.experoinc.demo.event.AddOrderItem; import com.experoinc.demo.event.CreateOrder; import lombok.NonNull; import org.apache.spark.api.java.Optional;  import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.List;  public class JoinedOrderEvent implements Serializable {      private static final long serialVersionUID = 1L;      private final Optional<CreateOrder> order;     private final Optional<List<AddOrderItem>> items;      public JoinedOrderEvent(CreateOrder order, List<AddOrderItem> items) {         this.order = Optional.ofNullable(order);         this.items = Optional.ofNullable(items);     }      public JoinedOrderEvent addItems(@NonNull Collection<AddOrderItem> additionalItems)     {         List<AddOrderItem> items = this.items.orElse(new ArrayList<>(additionalItems.size()));         items.addAll(additionalItems);          return new JoinedOrderEvent(this.order.orElse(null), items);     }      public Optional<CreateOrder> getOrder() {         return order;     }      public Optional<List<AddOrderItem>> getItems() {         return items;     }      public boolean hasOrder() {         return order.isPresent();     }      public boolean hasItems() {         return items.isPresent();     } }

The stream can be mapped to this class by following:

JavaPairDStream<String, JoinedOrderEvent> joinedOrderStream = joinedStream         .mapToPair((pair) -> {             String orderNumber = pair._1;             CreateOrder order = pair._2._1.orNull();             List<AddOrderItem> items = pair._2._2.orNull();              return Tuple2.apply(orderNumber, new JoinedOrderEvent(order, items));         });

full outer joined stream

If you were to process each stream of events independently, you could not guarantee that if the 2 events arrived in the same batch that the CreateOrder event would be processed before the AddItem event. A race condition would exist where different Spark workers could handle the actual work of saving the individual streams to the database and process them at different rates. Now that you have the 2 streams joined, you can ensure that messages received within the same batch can be processed together in the correct order. However, you'll still need to handle the scenario where an AddItem message arrives in a batch prior to a corresponding CreateOrder event. In the example below, you'll notice that an AddItem event referencing order number 101 arrives in a batch before the CreateOrder event for that order number.

out of order events

To handle this, you'll use the JavaPairDStream::mapWithState function. The mapWithState function allows you to process each tuple in a JavaPairDStream and store calculated values in memory by the key of the pair in the stream. It also requires the use of checkpointing to occasionally persist the state for fault tolerance. You’ll need to provide a StateSpec instance to mapWithState to define the behavior of how each pair will add, update, or remove data from state as well as what data to emit to be consumed downstream.

The way to create a StateSpec instance is to use the static StateSpec.function builder constructor. It requires an implementation of a Spark Function Interface. In our case we’ll create an implementation of the interface Function3<KeyType, ValueType, StateType, MappedType>

  • KeyType represents the type of key, in our case the order number String
  • ValueType represents the type of Value to be mapped, in our case the JoinedOrderEvent
  • StateType represents the type of object to be stored in state, in our case it will be a List
  • MappedType represents the type of object to be emitted each time this function is called, in our case we will use an Optional to represent the values to be persisted after each iteration. If the output is Optional::absent(), then nothing should be persisted.

This function will handle the following scenarios:

  • If a CreateOrder event exists in this batch, join it against any AddOrderItem entries in the state, clear the state for that order number, and emit the combined events.
  • If an AddItem event exists in this batch with no corresponding CreateOrder event, add the AddItem event to the state and emit Optional::absent()

Here is an abstract implementation of that interface:

// AbstractJoinedOrderStateFunc.java package com.experoinc.demo;  import com.experoinc.demo.event.AddOrderItem; import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.function.Function3; import org.apache.spark.streaming.State;  import java.util.List;  public abstract class AbstractJoinedOrderStateFunc implements Function3<         String,         Optional<JoinedOrderEvent>,         State<List<AddOrderItem>>,         Optional<JoinedOrderEvent>> {      @Override     public Optional<JoinedOrderEvent> call(             String orderNumber,             Optional<JoinedOrderEvent> optionalEvent,             State<List<AddOrderItem>> cachedItemState) throws Exception {          /*         This will only be triggered if the key (orderNumber) was evicted. Eviction settings are         configurable per the StateSpec implementation. In our case, this will not occur          */         if (!optionalEvent.isPresent()) {             return Optional.absent();         }          JoinedOrderEvent event = optionalEvent.get();          Optional<JoinedOrderEvent> returnedEvent;         // no order in joined event - right outer join         if(!event.hasOrder()) {             returnedEvent = handleRightJoin(orderNumber, event, cachedItemState);         }         // no items in joined event - left outer join         else if(!event.hasItems()) {             returnedEvent = handleLeftJoin(event, cachedItemState);         }         // inner join         else {             returnedEvent = handleInnerJoin(event, cachedItemState);         }          return returnedEvent;     }      /**      * Handle the case where we have a list of {@link AddOrderItem} instances in the batch with      * no corresponding {@link com.experoinc.demo.event.CreateOrder}      *      * If the order already exists in the database, we can go ahead and process the items.      * If the order doesn't exist, we need to store the items in state until an order comes along.      *      * @param orderNumber The order number the events are associated with      * @param event The event containing the items      * @param itemState The state object to add/remove items from      * @return The {@link JoinedOrderEvent} to emit downstream      */     private Optional<JoinedOrderEvent> handleRightJoin(             String orderNumber,             JoinedOrderEvent event,             State<List<AddOrderItem>> itemState)     {         // combine items in this event with any items currently stored in state         List<AddOrderItem> allItems = event.getItems().get();         if(itemState.exists()) {             allItems.addAll(itemState.get());         }          /*         If the order exists in the db, remove all the items from state and emit an event with all         the items to be persisted.          */         if(orderAlreadyPersisted(orderNumber)) {             itemState.remove();             return Optional.of(new JoinedOrderEvent(null, allItems));         }         /*         If the order does not exist in the db, update the state to include the items from this event          */         else {             itemState.update(allItems);             return Optional.absent();         }     }      /**      * Handle the case where we have a {@link com.experoinc.demo.event.CreateOrder} event but no      * corresponding items in the batch      *      * @param event The event with the order      * @param itemState The state object to add / remove items from      * @return The {@link JoinedOrderEvent} to emit downstream      */     private Optional<JoinedOrderEvent> handleLeftJoin(             JoinedOrderEvent event,             State<List<AddOrderItem>> itemState)     {         /*         There were items that corresponded with this order, include them in the joined event to emit         and remove them from the state          */         if(itemState.exists()) {             event = event.addItems(itemState.get());             itemState.remove();         }          return Optional.of(event);     }      /**      * Handle the case where we have a {@link com.experoinc.demo.event.CreateOrder} event and      * a list of {@link AddOrderItem} events in the same batch      *      * @param event The event with the order and items      * @param itemState The state object to add / remove items from      * @return The {@link JoinedOrderEvent} to emit downstream      */     private Optional<JoinedOrderEvent> handleInnerJoin(             JoinedOrderEvent event,             State<List<AddOrderItem>> itemState)     {         /*         There were items that corresponded with this order, include them with the items that already         existed in the joined event and remove them from the state          */         if(itemState.exists()) {             event = event.addItems(itemState.get());             itemState.remove();         }          return Optional.of(event);     }      /**      * Determine if an order with this order number is already persisted.      *      * @param orderNumber The order number to check against the database      * @return true if the order already exists; false otherwise      */     protected abstract boolean orderAlreadyPersisted(String orderNumber); }

Now you’ll include it in your stream processing:

AbstractJoinedOrderStateFunc stateFunc = createJoinedOrderStateFuncImpl(); JavaMapWithStateDStream<String, JoinedOrderEvent, List<AddOrderItem>, Optional<JoinedOrderEvent>> stateStream =             joinedOrderStream.mapWithState(StateSpec.function(stateFunc));  stateStream.filter(Optional::isPresent)     .map(Optional::get)     .foreachRDD((rdd) -> { /* persistence processing */ });

Here is a GIF representing the process:

events in same batch
events in different batch

At this point, you've guaranteed that even if the events arrive out of order you can process them as though they came in serially.

Subscribe to our quarterly newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.