Leveraging Apache Spark power for reading from relational Databases
We all know Spark is powerful in processing massive amount of data. But how do we leverage it’s power when legacy data lies in relational databases.
Apache spark provides dataframe Api for reading/writing from relational database using jdbc just like other sources(Kafka, Cassandra etc.).
Reading from relational Database using spark dataframe api.
Note: Make sure that you include jdbc driver dependency in your project for your database.
E.g in this case dependencies += “mysql” % “mysql-connector-java” % “8.0.18”
Let’s say that we have a cluster of 3 executors and each executor with 2 cores.
You can easily test this from spark-shell too if you don’t feel like setting up a project :)
spark-shell — master yarn — executor-memory 8G — executor-cores 2 — num-executors 3 — packages “mysql:mysql-connector-java:8.0.18”
Now let’s take a look behind the scene how this query will be executed from Spark to DB.
One of the task from am executor will initiate a jdbc connection to database and submit a query.
select * from table
Interesting thing to note here is that rest of the task in this executor and other executors will be idle and will do nothing.
This fact is also confirmed by Spark UI. Irrespective of how many executors or cores you have, only task was launched for reading from JDBC.
This defeats the purpose of parallel processing that Spark provides.
Spark provides additional parameters to enable multiple reads from table based on a partitioned column.
numPartitions → This parameter controls how many output partitions you will have in your output file and how many jdbc connections spark will initiate for you.
partitionColumn → This column is used to split the query per jdbc connection by Spark.
lowerBound → Lowest value for partition column
upperBound → Highest value for partition column
Now when we run count action on spark, spark will initiate 3 DB connection for us and will split each jdbc connection in three partitions.
First value is the partition number, second value is starting value for partitioned column and third value is end value for partitioned column.
partition(0,1,2)
partition(1,3,4)
partition(2,5,6)
so each partition will have it’s own query something like this:
In case you’re interested, this is how spark decides jdbc partitions:
Let’s take a look at Spark UI:
We can see that spark initiated 3 tasks this time to read the table.
This is how it looks like:
Some Tips:
- Choose parameter numPartition wisely, so that it doesn’t damage or slow your database.
- Choose partitionKey column which has most cardinality, so that it doesn’t create skew in partitions, otherwise you’ll notice that some tasks are taking longer than others.
- dbtable parameter doesn’t need to be a table, it could be a view or subquery as well. However I wouldn’t suggest to pass complex query , rather bring it to spark and do transformations there, this way you’ll be able to distribute it among many tasks.