Rich Text Elements

Heading 1 | Link

Heading 2 | Link

Heading 3 | Link

heading 4 | Link

heading 5 | Link
Heading 6

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Suspendisse varius enim in eros elementum tristique. Duis cursus, mi quis viverra ornare, eros dolor interdum nulla, ut commodo diam libero vitae erat. Aenean faucibus nibh et justo cursus id rutrum lorem imperdiet. Nunc ut sem vitae risus tristique posuere.

Heading 4

  1. Numbered (Ordered) List Item
  2. Numbered (Ordered) List Item
  3. Numbered (Ordered) List Item

Heading 4

Image Caption

Subheading
Subheading

Introduction

On a recent client engagement where we had to load and process data from several data sources, we were tasked with a broader mandate to develop a wholesale data loading strategy for a suite of NoSQL purposes.

This project posed some interesting and unique data ingestion challenges. As most probably already know, ETL is a data integration process in which source data is Extracted, Transformed and Loaded into a target system. Developing ETL pipelines, requires one to create of a source data definition which subsequently gets transformed and loaded into the the target. We’ve all done this exercise dozens of times moving CSVs into some SQL database for years.

But what happens if you don’t know what the source data looks like? Or what happens if what it looks like changes slightly every once-in-a-while?

Furthermore, what happens if you have 100s of data sources like that? That’s the predicament we found ourselves in - 100s of data sources that we didn’t know the shape of, that may or may not evolve over time.

Oh, and the kicker is that we have to load them all in a week. Here they are….Ready, Set, Go!

How would you get ready for such an engagement?

Challenges

So let’s summarize the requirements and what we could depend on:

  1. We had to deal with around 450 data sources.
  2. A variety of data source types that includes flat files (comma, pipe and tab types) and relational database tables.
  3. A preview or sample for some of the data sources weren’t available prior to load, which would prevent us from determining the data structure of the fields.
  4. Schedule for these data source updates varies, some weekly, some monthly and some never change.
  5. Once data is available on FTP,  the load should happen immediately and the data staged to the database.

Potential Technology Options

Fortunately, we had a bit of flexibility as to how we needed to solve this problem. Our client gave us some latitude on the toolset we would choose. We opted to evaluate a few:

     1) Pentaho Data Integration (PDI) - provides the ETL capabilities that facilitates the process of capturing, cleansing, and       storing data. (https://help.pentaho.com/Documentation/7.1/0D0/Pentaho_Data_Integration)

     2) Apache Spark - open-source distributed general-purpose cluster-computing framework.       (https://spark.apache.org/)

     3) Apache NiFi  - provides an easy to use, powerful and reliable system to process and distribute data.       (https://nifi.apache.org/)

Due to time constraints, we did not have time to evaluate

What did we pick and why?

The key deciding factor was the fact that we could not depend on knowing up front what formats the data sources would come in ahead of time. The number of data sources was also very dynamic. We had to have a solution that could be malleable at runtime.

We ultimately selected Apache Spark as the technology option for the ETL Data pipeline. (Or more accurately, we used DataStax Enterprise Analytics which is the Spark implementation in the Datastax Enterprise stack.)

What attracted us to Apache Spark was the ability to infer schema based on source data and create tables on the fly using the inferred schema and load data in a distributed manner during the data ingestion phase of the data pipeline. Also Apache Spark’s ability to work across clusters and the excellent data processing APIs help solidify it as the best selection.

While NiFi can infer the schema on the fly, it could not create tables on the fly. Therefore it could not load data sources it had not yet encountered. Pentaho can neither infer schema nor create tables on the fly.

What We Built - the ETL Data Pipeline

Data sources consist of structured and unstructured data in text files and relational database tables. The Data Ingestion Spark process loads each data source into their corresponding tables in a Cassandra keyspace (schema). Another set of spark processes transform the ingested data into a set of domain tables.

Using Spark for Data Ingestion

We wrote a generic Spark job written in Scala, to read files or database tables into corresponding target tables in a Cassandra database schema.

The process includes:

1) Process Input Parameters - Parameters are passed in for the specific data source that’s being loaded. Some of the parameters include target keyspace name ,target table name, single file name or a list of file names, or wild card, list of primary key columns, file delimiter.

*Infers schema automatically and assigns the first column as the primary key, which may not  be the actual primary key, so a primary key list parameter is provided to specify one or more columns as the key

2) Load data sources into DataFrame -  Files or database tables are read into a Spark DataFrame using SparkSession’s read API with inferSchema turned on for the file.

For loading a delimited file the code is as follows:

spark.read
.option("header", "true")
.option("dateFormat", dateFormat)
.option("inferSchema", "true")
.option("delimiter", delimiter)
.csv(filePath)

*Files are uploaded to DSEFS (DSE file system) using CLI script.

For loading a table from a relational database (Oracle in this case) the code is as follows:

spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", tablename)
.option("user", dbUser)
.option("password", dbPassword)
.option("driver", "oracle.jdbc.OracleDriver")
.load()

3) Create Cassandra table with inferred schema - A table is created with the inferred schema using the Spark Connector’s DataFrame function createCassandraTable,

fileDF.createCassandraTable(
               cassandraschema,
               tableName,
               partitionKeyColumns = Some(Seq(pklist: _*)))

*pklist - list of one or more primary key fields passed in as parameter

*fileDF - the DataFrame loaded from a file

4) Load data into Cassandra table - A Spark DataFrame is loaded into a Cassandra table using the DataFrame write API,

fileDF.write.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace"-> keyspace,"table"-> tablename))
.mode(SaveMode.Append)
.save()

Conclusion

You should use Spark when…

You should use Nifi when...

You should use Pentaho when ….

Watch for part 2 of the Data Pipeline blog that discusses data ingestion using Apache NiFi integrated with Apache Spark (using Apache Livy) and Kafka.  

Questions about this blog? Contact us: info@experoinc.com

Deliverables may include