Problem Statement:
Given a file of text, output what all words appear in the file and how many times.
In typical, not distributed environment, to solve this problem, one would need to process the file may be a line at a time and track word counts in a data-structure like a map.
Here we will see how we can solve this problem the Spark way. Well! Spark ways. We will see how we can solve this with :
- RDD
- DataSet
- DataSet with SQL
RDD
Let’s first jump into the code.
We first start by creating a Spark Config mentioning the Spark Application Name and Master
as local to run locally. When you run this in some cluster, you need to change Mater
value.
Then we create Spark Context with JavaSparkContext
which is Java specific way to create a new Spark Context.
Spark Context acts as a gateway to Spark World. It is created on the driver. It helps Driver to talk to underlying cluster for resource management and task scheduling.
Once we have SparkContext, we use textFile
to read a text file from given path. This reads the file into RDD where each RDD item is a line from the file.
Then we use flatMapToPair
to convert each line from the file into words. So the output JavaPairRDD has each item as word as its first element and count 1 as its second element.
There are similar transformations to flatMapToPair
like:
- map
- Transformation which converts each RDD item to different item. More like Java Stream
map
function.
- Transformation which converts each RDD item to different item. More like Java Stream
- mapToPair:
- Like map, but instead of JavaRDD it converts RDD items to JavaPairRDD items, each with two sub items like a tuple
- flatMap
- Transformation which converts each RDD item to 0 or more Different items.
- flatMapToPair:
- Like flatMap, but instead of JavaRDD it converts RDD items to 0 or more JavaPairRDD items, each with two sub items like a tuple
Then we use reduceByKey
to perform sum of count reduction on the pair RDD.
The PairRDD returned from last flatMapToPair
transformation has count as 1 and we now reduce by the word and sum up the count to get total count of each word.
There is similar transformation to reduceByKey
called groupByKey
but there is a subtle difference in how the internally shuffle the data.
groupByKey
when performed on the RDD causes entire data on all the partitions to be shuffled across all the executors to bring all data of each key into single new partition. This is a lot of data shuffling and causes performance bottleneck.
reduceByKey
also causes data to be shuffled but before data is shuffled, the reduce operation is performed on the data in each partition by the keys resulting in lesser data being shuffled as it calculates reduction and then shuffles the intermediate results only and not all the records.
Following diagram shows the behavioral difference between ReduceByKey and GroupByKey.
DataSet
DataSet/DataFrame is new way to write Spark Code which is more friendly and is more flexible. It is more SQL friendly as to have SQL like methods to manipulate data.
Code is here:
The code starts of by setting some properties
- Hadoop home directory: for I was running the code on Windows machine.
- Setting apache logs to warn level to reduce the noise.
Next we create SparkSession, which is used where we used to use SparkContext in RDD. Spark session incorporates legacy SparkContext and other contexts; and performs all the things SparkContext was used for. Just like context, the SparkSession also takes the app name and master configurations.
Then we use SparkSession to read a text file from given path to read lines into DataSet of Rows
, where each row represents a line from the file.
Each Row DataSet needs to have schema mentioning the fields and types they have. This DataSet of Row returned after reading a file has a schema as
value
field of type String.
Next we need to split each line into words. That transformation should return another DataSet of Row. But this new Row DataSet needs to have two columns (fields) as
- word: String column to hold the word
- count: Long column to hold the count, initialized to 1.
For this transformation we use the FlatMap converting each line into words with count as 1. But for this method, we need to provide the meta data as to how to encode these values when they are transferred between the executors.
We have inbuilt Encoders for well known types like String, Long etc. Also we can build ones for Specific Class with Encoders.bean()
and Encoders.kyro()
.
But for this simple use case we don’t need to create a new class. We can just create an RowEncoder from schema which we define with StructType
mentioning the column names and types.
In our case we would have word
String and count
Long as defined in the code.
Once we have Row DataSet with word and count, we use SQLish methods from DataSet to perform groupBy word
and sum aggregation on count
with groupBy
and agg
methods.
We can define columns like word, count with function.col
helper methods for building Column objects. There exists overloaded methods which takes string names of columns or index of columns.
After this aggregation we get the result we want that we print to output with show
.
Then we close the Spark Session.
This DataSet ways of writing Spark code is new one but behind the scene it uses RDD only. DataSet operations are way more friendly that RDD.
DataSet with SQL
This approach is based on DataSet only but explicitly uses SQL queries to be run on DataSet instead of DataSet methods.
Reading data and basic transformations are same as that of earlier method.
Once we have data in DataSet rows with word and 1 as count then we create a temp view from it with createOrReplaceTempView
with table name as words
.
After that we just run SQL
to get the data we want by using SparkSession
.
This SQL query returns us the Dataset of Rows in correct schema.
Then follows the print and closing the session.
Conclusion
Of the three approaches discussed above, 2nd one with DataSet is what I personally prefer as it is more friendly and not a black-box like 3rd one.
SQL way is best suitable when we want to run ad hoc queries on data or to perform queries based on user configured inputs as it is easier to convert user requirements into SQL than RDD or DataSet manipulation code.