Share
BLOG

Building a Distributed Data Ingestion Pipeline

Summary:
Building a data ingestion pipeline using Spark, Kafka, DataStax, Nifi, and Pentaho.

Introduction

On a recent client engagement where we had to load and process data from several data sources, we were tasked with a broader mandate to develop a wholesale data loading strategy for a suite of NoSQL purposes.

This project posed some interesting and unique data ingestion challenges. As most probably already know, ETL is a data integration process in which source data is Extracted, Transformed and Loaded into a target system. Developing ETL pipelines, requires one to create of a source data definition which subsequently gets transformed and loaded into the the target. We’ve all done this exercise dozens of times moving CSVs into some SQL database for years.

But what happens if you don’t know what the source data looks like? Or what happens if what it looks like changes slightly every once-in-a-while?

Furthermore, what happens if you have 100s of data sources like that? That’s the predicament we found ourselves in - 100s of data sources that we didn’t know the shape of, that may or may not evolve over time.

Oh, and the kicker is that we have to load them all in a week. Here they are….Ready, Set, Go!

How would you get ready for such an engagement?

Challenges

So let’s summarize the requirements and what we could depend on:

  1. We had to deal with around 450 data sources.
  2. A variety of data source types that includes flat files (comma, pipe and tab types) and relational database tables.
  3. A preview or sample for some of the data sources weren’t available prior to load, which would prevent us from determining the data structure of the fields.
  4. Schedule for these data source updates varies, some weekly, some monthly and some never change.
  5. Once data is available on FTP,  the load should happen immediately and the data staged to the database.

Potential Technology Options

Fortunately, we had a bit of flexibility as to how we needed to solve this problem. Our client gave us some latitude on the toolset we would choose. We opted to evaluate a few:

     1) Pentaho Data Integration (PDI) - provides the ETL capabilities that facilitates the process of capturing, cleansing, and storing data (https://bit.ly/2CmCfwe)

  • Requires manual configuration of all data sources
  • All the target tables have to be pre-created
  • Provides a management GUI for administering the data flow

     2) Apache Spark - open-source distributed general-purpose cluster-computing framework.  ( https://spark.apache.org/)

  • Data is distributed and run across clusters
  • Open source
  • Can infer schema on file read
  • Can create tables on the fly
  • No Manual source to target mapping is needed due to infer schema

     3) Apache NiFi  - provides an easy to use, powerful and reliable system to process and distribute data. (https://nifi.apache.org/)

  • Runs across clusters and is distributed
  • Open source
  • Requires manual configuration of each data source
  • Requires pre-creation of tables
  • Provides a management GUI for administering the data flow

Due to time constraints, we did not have time to evaluate

What did we pick and why?

The key deciding factor was the fact that we could not depend on knowing up front what formats the data sources would come in ahead of time. The number of data sources was also very dynamic. We had to have a solution that could be malleable at runtime.

We ultimately selected Apache Spark as the technology option for the ETL Data pipeline. (Or more accurately, we used DataStax Enterprise Analytics which is the Spark implementation in the Datastax Enterprise stack.)

What attracted us to Apache Spark was the ability to infer schema based on source data and create tables on the fly using the inferred schema and load data in a distributed manner during the data ingestion phase of the data pipeline. Also Apache Spark’s ability to work across clusters and the excellent data processing APIs help solidify it as the best selection.

While NiFi can infer the schema on the fly, it could not create tables on the fly. Therefore it could not load data sources it had not yet encountered. Pentaho can neither infer schema nor create tables on the fly.

What We Built - the ETL Data Pipeline

Data sources consist of structured and unstructured data in text files and relational database tables. The Data Ingestion Spark process loads each data source into their corresponding tables in a Cassandra keyspace (schema). Another set of spark processes transform the ingested data into a set of domain tables.

Using Spark for Data Ingestion

We wrote a generic Spark job written in Scala, to read files or database tables into corresponding target tables in a Cassandra database schema.

The process includes:

1) Process Input Parameters - Parameters are passed in for the specific data source that’s being loaded. Some of the parameters include target keyspace name ,target table name, single file name or a list of file names, or wild card, list of primary key columns, file delimiter.

*Infers schema automatically and assigns the first column as the primary key, which may not  be the actual primary key, so a primary key list parameter is provided to specify one or more columns as the key

2) Load data sources into DataFrame -  Files or database tables are read into a Spark DataFrame using SparkSession’s read API with inferSchema turned on for the file.

For loading a delimited file the code is as follows:

spark.read
.option("header", "true")
.option("dateFormat", dateFormat)
.option("inferSchema", "true")
.option("delimiter", delimiter)
.csv(filePath)

*Files are uploaded to DSEFS (DSE file system) using CLI script.

For loading a table from a relational database (Oracle in this case) the code is as follows:

spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", tablename)
.option("user", dbUser)
.option("password", dbPassword)
.option("driver", "oracle.jdbc.OracleDriver")
.load()

3) Create Cassandra table with inferred schema - A table is created with the inferred schema using the Spark Connector’s DataFrame function createCassandraTable,

fileDF.createCassandraTable(
               cassandraschema,
               tableName,
               partitionKeyColumns = Some(Seq(pklist: _*)))

*pklist - list of one or more primary key fields passed in as parameter

*fileDF - the DataFrame loaded from a file

4) Load data into Cassandra table - A Spark DataFrame is loaded into a Cassandra table using the DataFrame write API,

fileDF.write.format("org.apache.
spark.sql.cassandra")
.options(Map("keyspace"-> keyspace,"table"-> tablename))
.mode(SaveMode.Append)
.save()


Conclusion

You should use Spark when…

  • Data ingestion needs to be processed in a clustered and distributed environment
  • Data source schema needs to be inferred
  • Tables need to be created on the fly based on inferred schemas
  • Maximum flexibility is needed, by writing code in Scala or Java, without constraints of a drag and drop GUI environment (both NiFi and Pentaho).

You should use Nifi when...

  • Data ingestion needs to be processed in a clustered and distributed environment
  • A GUI environment and an automated ETL tool is preferred
  • Data source schema needs to be inferred but destination tables can be pre-created

You should use Pentaho when ….

  • Data ingestion needs to be processed in a clustered and distributed environment
  • A GUI environment and an automated ETL tool is preferred
  • A small set of data sources are to be loaded
  • File data structure is known prior to load so that a schema is available for creating target table.

Learn more about Apache Spark by attending our Online Meetup - Speed Dating With Cassandra

Watch for part 2 of the Data Pipeline blog that discusses data ingestion using Apache NiFi integrated with Apache Spark (using Apache Livy) and Kafka.  

Questions about this blog? Contact us: info@experoinc.com

Subscribe to our quarterly newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.