From Spark in Action, Second Ed. by Jean Georges Perrin
You’ve probably seen a simple use-case where Spark ingests data from a CSV file, then performs a simple operation, and then stores the result in the database. In this article, you’re going to see what happened behind the scenes.
Walking through your application
This article explores how Spark works behind the scenes. To start, we’ll look a little more closely at the first typical operation: the connection to the master. After this non-functional step, let’s walk through the ingestion, the transformation, and, finally, the publishing of the data in the RDBMS.
Connecting to a master
For every Spark application, the first operation is to connect to the Spark master and get a Spark session. This is an operation you’ll do every time. This is illustrated by the code fragment of Listing 1 and Figure 1.
In this context, you’re connecting to Spark in local mode.
Listing 1 Getting a Spark session
SparkSession spark = SparkSession.builder()
.appName("CSV to DB")
Method chaining makes Java more compact
In the recent years, more and more Java APIs use method chaining as in SparkSession.builder().appName(…).master(…).getOrCreate(). You may have seen situations where more intermediate objects were created, a bit like:
Object1 o1 = new Object1();
Object2 o2 = o1.getObject2();
Spark’s API uses a lot of method chaining.
Method chaining makes your code more compact, more readable. One major drawback is debugging: imagine a Null Pointer Exception (NPE) in the middle of your chain and you have to spend more time debugging it.
Local mode isn’t a cluster but it’s much easier
In order for you to be able to run the example in this article without setting up a full cluster, I specified
local as the value for the master because you’re running Spark in local mode.
If you had a cluster, you’d give the address of the cluster instead.
For the sake of building your mental model, assume you have a cluster instead of the local mode.
All the illustrations in this article represent a timeline. At t0, you start your application (your
main() function) and at t1, you get your session.
This first step is always to connect to a master. You can now ask Spark to load the CSV file.
Loading or ingesting the CSV file
Loading, ingesting, reading are synonyms for what you’re going to do now: ask Spark to load the data contained in the CSV file. Spark can use distributed ingestion through the different nodes of the cluster.
As you can imagine, as with all good masters, it doesn’t do much, it relies on slaves or workers. You’ll find both wordings in the Spark documentation; I suggest you pick one, depending on your level of political correctness. I’ll overcome my inherently French indecisiveness and pick one for this article: worker.
In our scenario, you have three workers. Distributed ingestion means you’ll ask our three workers to ingest at the same time.
At t2, the master tells the workers to load the file, as coded in listing 2. Normal questions would be if you have three workers, which one is loading the file? Or, if they load simultaneously, how do they know where to start and finish? Spark ingests the CSV file in a distributed way. The file must be on a shared drive, distributed file system, or shared via a shared file system mechanism like Dropbox, Box, Nextcloud/Owncloud, etc.
In this context, a partition is a dedicated area in the worker’s memory.
Listing 2 Reading the authors file
Dataset<Row> df = spark.read()
Let’s take a second to look at our CSV file (see listing 3). It’s a simple file with two columns:
lname for last name and
fname for the first name. The first line of the file is a header. It contains six more lines, which become six rows in our dataframe.
Listing 3 A good ol’ CSV file
The workers create tasks to read the file. Each worker has access to the node’s memory and assigns a memory partition to the task.
At t4, each task continues by reading a part of the CSV file as illustrated in figure 4. As the task is ingesting its rows, it stores them in a dedicated partition.
Why should you care about partitions and their location?
When your operations are simple, like concatenating two fields into a third one, Spark is fast.
Spark can join data between datasets and can perform aggregate, exactly as you do those operations with your relational database. Now imagine that you’re joining data in the first partition of worker #1 with the second partition of worker #2: all that data must be transferred, which is a costly operation.
Figure 4 shows the record being copied from the CSV file to the partition during the ingestion process, within the R ➧ P (Record to Partition) box. The memory box, in purple, shows which records are in which partition on each partition. In this example, record #1, which contains
Blaise Pascal, is in the first partition of the first worker.
Transforming your data
After the data has been loaded, at t5, you can process the records. Your operation is fairly simple: add a new column to the dataframe, called
name. The full name (column
name) is a concatenation of the last name (column
lname), a comma, a space, and the first name (in
fname). For example,
Jean Georges (first name) and
Perrin (last name) becomes
Perrin, Jean Georges. Listing 4 is describing the process and figure 6 illustrates the process.
Spark is lazy.
As Seth Rogen said: “I am lazy, but for some reason, I am so paranoid that I end up working hard.” This is how Spark acts. At this time, you told Spark to concatenate the fields, but it didn’t do anything.
Spark is lazy: it only works when asked. Spark stacks up all your requests and, when it needs to, it optimizes the operations and does the hard work. Like Seth, when you ask nicely, Spark works hard.
In this situation, you’re using the
withColumn() method, which is a transformation. Spark only starts processing when he sees an action, such as the
write() method in listing 5.
Listing 4 Adding a column to our dataframe
df = df.withColumn(
concat(df.col("lname"), lit(", "), df.col("fname")));
You’re now ready for the last operation: saving the result in the database.
Saving the work done in our dataframe to a database
It’s about time to save your result in the database, after ingestion of the CSV file and transforming the data in the dataframe. The code responsible for this operation is in listing 5 and illustrated in figure 6.
Listing 5 Saving the data to the database
String dbConnectionUrl = "jdbc:postgresql://localhost/spark_labs";
Properties prop = new Properties();
.jdbc(dbConnectionUrl, "ch02", prop);
As you’re certainly familiar with JDBC (Java Database Connectivity), you’ve probably noticed that Spark expects similar information:
- a JDBC connection URL
- the name of a driver
- a user
- and a password
write() method returns a
DataFrameWriter object on which you can chain a
mode() method to specify how to write: here, you’ll overwrite the data in the table.
Figure 7 represents your application’s full mental model. It’s important to remember:
- The whole dataset never hit our application (driver): the dataset was split between the partitions on the workers, not on the driver.
- The entire processing took place in the workers.
- The workers saved the data in their partition to the database. In this scenario, you had four partitions; this means four connections to the database when you saved the data. Imagine a similar scenario with 200k tasks trying first to connect to the database and then inserting data. A fine-tuned database server refuses too many connections, which requires more control in the application. A solution to this load issue involves repartitioning and the options allowed when exporting to a database.
And that’s where we will stop for now.
About the Author:
An experienced consultant and entrepreneur passionate about all things data, Jean Georges Perrin was the first IBM Champion in France, an honor he’s now held for ten consecutive years. Jean Georges has also managed many teams of software and data engineers.
Originally published at freecontent.manning.com.