ARTICLE

Consuming records with Spark

From Spark in Action, Second Edition by Jean Georges Perrin

This article explores consuming records in files with Spark.

_________________________________________________________________

Take 42% off Spark in Action. Just enter slperrin into the discount code box at checkout at manning.com.
_________________________________________________________________

Consuming the records

Let’s imagine that you have a folder full of records (of some kind) and you want to ingest them with Spark. What do you do? You’ll first look at how records are displayed, then dig in the code.

The application is ReadLinesFromFileStreamApp.java. It’s lab #100. It ingests the records, stores them in a dataframe, and shows the result on the console. Figure 1 illustrates this process.

Image for post
Image for post

The output of the application is in listing 1.

Listing 1 Output from ReadLinesFromFileStreamApp

2018-11-16 14:13:54.924 -DEBUG --- [           main] tart(ReadLinesFromFileStreamApp.java:29): -> start()
-------------------------------------------
Batch: 0 #A
-------------------------------------------
+----------------------------------+
|value | #B
+----------------------------------+
|Mara,Jamison,Acy,52,492-23-4955 | #C
|Ariel,Raegan,Abend,104,007-31-2841| #C
|Kynlee,Ari,Bevier,106,439-70-9576 | #C
+----------------------------------+
only showing top 3 rows

-------------------------------------------
Batch: 1 #D
-------------------------------------------
+----------------------------------+
|value |
+----------------------------------+
|Conrad,Alex,Gutwein,34,998-04-4584|
|Aldo,Adam,Ballard,6,996-95-8983 |
+----------------------------------+

2018-11-16 14:14:59.648 -DEBUG --- [ main] tart(ReadLinesFromFileStreamApp.java:58): <- start()

A Batch #0

B Only one column in our dataframe is called “value”

C Three rows of your dataframe

D Batch #1

To start the ingestion application, you can run it directly in the IDE, Eclipse in my case, or via Maven. In the same directory where you cloned the project, in another terminal, run:

$ cd /Users/jgp/Workspaces/eclipse-2018-09/net.jgp.books.sparkWithJava.ch10
$ mvn clean compile
$ mvn exec:exec@lab100

Note that here, you’re using exec:exec, not exec:java. By using exec:exec, Maven is starting a new JVM to run your application. This way, you can pass arguments to the JVM. I needed this to pass the logger’s property file as a command line argument. Listing 2 shows you the section of the pom.xml responsible for the execution of the application.

Listing 2 Section of pom.xml to execute lab #100, ReadLinesFromFileStreamApp


<execution>
<id>lab100</id>
<configuration>
<executable>java</executable>
<arguments>
<argument>-Dlog4j.configuration=[CA]
file:src/main/java/log4j.properties</argument>
<argument>-classpath</argument>
<classpath />
<argument>net.jgp.books.sparkInAction.ch10.lab100ReadStream.[CA]
ReadLinesFromFileStreamApp</argument>
</arguments>
</configuration>
</execution>

Let’s analyze the code in the ReadLinesFromFileStreamApp application, in the net.jgp.books.sparkInAction.ch10.lab100ReadStream package in listing 3. I know that importing this big block at the beginning of a source code isn’t always appealing, but with the various evolution of the underlying framework (in this case, Apache Spark), I like to make sure that you’re using the right packages.

Listing 3 ReadLinesFromFileStreamApp.java

package net.jgp.books.sparkInAction.ch10.lab100ReadStream;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.slf4j.Logger; #A
import org.slf4j.LoggerFactory; #A

import net.jgp.books.sparkInAction.ch10.x.utils.streaming.lib.
[CA]StreamingUtils;

public class ReadLinesFromFileStreamApp {
private static transient Logger log = LoggerFactory.getLogger(
ReadLinesFromFileStreamApp.class); #B

public static void main(String[] args) {
ReadLinesFromFileStreamApp app = new ReadLinesFromFileStreamApp();
app.start();
}

private void start() {
log.debug("-> start()"); #C

SparkSession spark = SparkSession.builder() #D
.appName("Read lines over a file stream") #D
.master("local") #D
.getOrCreate(); #D

Dataset<Row> df = spark
.readStream() #E
.format("text") #F
.load(StreamingUtils.getInputDirectory()); #G

StreamingQuery query = df
.writeStream() #H
.outputMode(OutputMode.Update()) #I
.format("console") #J
.option("truncate", false) #K
.option("numRows", 3) #K
.start(); #L

try {
query.awaitTermination(60000); #M
} catch (StreamingQueryException e) {
log.error(
"Exception while waiting for query to end {}.",
e.getMessage(),
e);
}

log.debug("<- start()"); #N
}
}

A Logging imports for SLF4J

B Initialization of the logger

C Log, at a debug level, that you’re getting in the start() method

D Creating a Spark session, as you did previously

E Specify that you’re going to read from a stream

F The file’s format is text

G And this is the directory to read from

H You’re now ready to write in a stream

I As an update

J The output being the console

K With options, records aren’t truncated and, at most, three will be displayed

L And you start

M Waits for data to come, here, for one minute

N Log message for leaving the start() method

From this point on in this article, I’ll use more logging (the SLF4J package) rather than println. Logging is an industry standard, though println may scare some of us (like when dumping on the console information you wouldn’t like the user to see). I won’t describe the initialization of logging in each example to keep code clarity a priority as I describe it. In the repository you’ll find the initialization for each example (otherwise, it won’t work, right?).

Whether you plan on using streaming or batch data processing, there’s no difference when creating a Spark session.

Once your session is created, you can ask the session to read from a stream using the readStream() method. Based on the type of stream, it requires additional parameters. Here you’re reading a text file (as specified with the format() method) and from a directory (as defined by the load() method). Note that the format()’s parameter is a String value, not an Enum, but nothing forbids you from having a little utility class somewhere (with, for example, constants).

This far, it’s fairly easy, no? You start a session and, in order to build a dataframe, you read it from a stream. In a stream, data may or may not be there and may or may not come. The application needs to wait for data to come, a bit like a server waits for requests. Writing your data is done through the dataframe’s writeStream() method and the StreamingQuery object.

You first define your streaming query object from the dataframe used as your stream. The query starts to fill a result table, as illustrated in figure 2. The result table grows as the data comes in.

Image for post
Image for post

To build your query, you need to specify:

  • The output mode (see appendix N for a list of output modes). In this lab, you’re only displaying the updates between two receptions.
  • The format, which says what you’re going to do with the data you receive. In this lab, you display it on the console (and not through logging). In the literature, you’ll often read about sink when referring to the output format. You can also refer to appendix N for the different sinks and their description.
  • Options: in the case of displaying to the console, truncate set to false means that the records won’t be truncated to a certain length and numRows specifies that you display, at most, three records. It’s the equivalent of calling the show(3, false) on a dataframe in non-streaming (batch) mode.
  • After specifying the output mode, format, and options, you can start the query.

Now your application needs to wait for data to come in. It does this through the awaitTermination() method on the query. awaitTermination() is a blocking method. It’s available with and without parameter: without a parameter, the method waits forever, with a parameter, you can specify the duration the method waits. In those labs, I used one minute consistently.

You’ve achieved your first ingestion from a stream. In the next section, you’ll extract a record from the stream, not only a raw line of data.

Getting records, not lines

In the previous example, you ingested lines, like “Conrad,Alex,Gutwein,34,998–04–4584”. Although the data is in Spark, it isn’t convenient to use. Let’s turn the raw line into a record.

Getting an output such as in listing 4, with a clear separation of records, is a fairly easy operation.

Listing 4 Output from ReadRecordFromFileStreamApp showing structured records


-------------------------------------------
Batch: 0
-------------------------------------------
+---------+--------+------------+---+-----------+
| fname| mname| lname|age| ssn|
+---------+--------+------------+---+-----------+
| Daniela| Lara| Clayton| 65|853-73-5075|
| Niko| Romeo| Dunmore| 37|400-54-1312|
| Austin| Aliya| Thierry| 44|988-42-0723|
| Taliyah| Caiden| Hyson| 47|781-05-7373|
| Roselyn| Juelz| Whidbee|102|463-55-3667|
| Amani| Brendan| Massey|110|576-90-3460|

This lab is lab #110, in the net.jgp.books.sparkInAction.ch10.lab110ReadRecordFromStream package. The app is ReadRecordFromFileStreamApp.

You can execute this lab directly in Eclipse (or your favorite IDE) or on the command line via:

$ mvn clean compile install
$ mvn exec:exec@lab110

The record ingestion application, in listing 5, is a bit different than the raw line ingestion application from listing 3: you need to tell Spark that you want records and specify the schema.

Listing 5 ReadRecordFromFileStreamApp.java


SparkSession spark = SparkSession.builder()
.appName("Read records over a file stream")
.master("local")
.getOrCreate();

StructType recordSchema = new StructType() #A
.add("fname", "string")
.add("mname", "string")
.add("lname", "string")
.add("age", "integer")
.add("ssn", "string");

Dataset<Row> df = spark
.readStream()
.format("csv") #B
.schema(recordSchema) #C
.load(StreamingUtils.getInputDirectory());

StreamingQuery query = df
.writeStream()
.outputMode(OutputMode.Update())
.format("console")
.start();

A Schema definition

B The record’s a CSV file

C Specify the schema

The schema must match the one you are using for the generator, or your real schema on your system.

And that’s it! You’ve ingested data into Spark successfully.

If you want to learn more about the book, check it out on liveBook here and see this slide deck.

Written by

Follow Manning Publications on Medium for free content and exclusive discounts.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store