On a recent client engagement where we had to load and process data from several data sources, we were tasked with a broader mandate to develop a wholesale data loading strategy for a suite of NoSQL purposes.
This project posed some interesting and unique data ingestion challenges. As most probably already know, ETL is a data integration process in which source data is Extracted, Transformed and Loaded into a target system. Developing ETL pipelines, requires one to create of a source data definition which subsequently gets transformed and loaded into the the target. We’ve all done this exercise dozens of times moving CSVs into some SQL database for years.
But what happens if you don’t know what the source data looks like? Or what happens if what it looks like changes slightly every once-in-a-while?
Furthermore, what happens if you have 100s of data sources like that? That’s the predicament we found ourselves in - 100s of data sources that we didn’t know the shape of, that may or may not evolve over time.
Oh, and the kicker is that we have to load them all in a week. Here they are….Ready, Set, Go!
How would you get ready for such an engagement?
So let’s summarize the requirements and what we could depend on:
Fortunately, we had a bit of flexibility as to how we needed to solve this problem. Our client gave us some latitude on the toolset we would choose. We opted to evaluate a few:
1) Pentaho Data Integration (PDI) - provides the ETL capabilities that facilitates the process of capturing, cleansing, and storing data (https://bit.ly/2CmCfwe)
2) Apache Spark - open-source distributed general-purpose cluster-computing framework. ( https://spark.apache.org/)
3) Apache NiFi - provides an easy to use, powerful and reliable system to process and distribute data. (https://nifi.apache.org/)
Due to time constraints, we did not have time to evaluate
The key deciding factor was the fact that we could not depend on knowing up front what formats the data sources would come in ahead of time. The number of data sources was also very dynamic. We had to have a solution that could be malleable at runtime.
We ultimately selected Apache Spark as the technology option for the ETL Data pipeline. (Or more accurately, we used DataStax Enterprise Analytics which is the Spark implementation in the Datastax Enterprise stack.)
What attracted us to Apache Spark was the ability to infer schema based on source data and create tables on the fly using the inferred schema and load data in a distributed manner during the data ingestion phase of the data pipeline. Also Apache Spark’s ability to work across clusters and the excellent data processing APIs help solidify it as the best selection.
While NiFi can infer the schema on the fly, it could not create tables on the fly. Therefore it could not load data sources it had not yet encountered. Pentaho can neither infer schema nor create tables on the fly.
Data sources consist of structured and unstructured data in text files and relational database tables. The Data Ingestion Spark process loads each data source into their corresponding tables in a Cassandra keyspace (schema). Another set of spark processes transform the ingested data into a set of domain tables.
We wrote a generic Spark job written in Scala, to read files or database tables into corresponding target tables in a Cassandra database schema.
The process includes:
1) Process Input Parameters - Parameters are passed in for the specific data source that’s being loaded. Some of the parameters include target keyspace name ,target table name, single file name or a list of file names, or wild card, list of primary key columns, file delimiter.
*Infers schema automatically and assigns the first column as the primary key, which may not be the actual primary key, so a primary key list parameter is provided to specify one or more columns as the key
2) Load data sources into DataFrame - Files or database tables are read into a Spark DataFrame using SparkSession’s read API with inferSchema turned on for the file.
For loading a delimited file the code is as follows:
*Files are uploaded to DSEFS (DSE file system) using CLI script.
For loading a table from a relational database (Oracle in this case) the code is as follows:
3) Create Cassandra table with inferred schema - A table is created with the inferred schema using the Spark Connector’s DataFrame function createCassandraTable,
*pklist - list of one or more primary key fields passed in as parameter
*fileDF - the DataFrame loaded from a file
4) Load data into Cassandra table - A Spark DataFrame is loaded into a Cassandra table using the DataFrame write API,
You should use Spark when…
You should use Nifi when...
You should use Pentaho when ….
Learn more about Apache Spark by attending our Online Meetup - Speed Dating With Cassandra
Watch for part 2 of the Data Pipeline blog that discusses data ingestion using Apache NiFi integrated with Apache Spark (using Apache Livy) and Kafka.
Questions about this blog? Contact us: firstname.lastname@example.org