You are currently viewing Spark Word Count Explained with Example

In this section, I will explain a few RDD Transformations with word count example in Spark with scala, before we start first, let’s create an RDD by reading a text file. The text file used here is available on the GitHub.


// Imports
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

// Create SparkSession 
val spark = SparkSession.builder()
      .master("local[3]")
      .appName("SparkByExamples.com")
      .getOrCreate()

// Prepare Data
data = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]

// Create RDD
rdd = spark.sparkContext.parallelize(data)
spark word count example
Spark Word Count RDD Transformation

1. flatMap() Transformation

flatMap() transformation flattens the RDD after applying the function and returns a new RDD. On the below example, first, it splits each record by space in an RDD and finally flattens it. Resulting RDD consists of a single word on each record.


val rdd2 = rdd.flatMap(f=>f.split(" "))

2. map() Transformation

map() transformation is used the apply any complex operations like adding a column, updating a column e.t.c, the output of map transformations would always have the same number of records as the input.

In our word count example, we are adding a new column with a value of 1 for each word, the result of the RDD is PairRDDFunctions which contains key-value pairs, word of type String as Key, and 1 of type Int as value. For your understanding, I’ve defined the rdd3 variable with type.


val rdd3:RDD[(String,Int)]= rdd2.map(m=>(m,1))

3. filter() Transformation

filter() transformation is used to filter the records in an RDD. In our example, we are filtering all words that start with “a”.


val rdd4 = rdd3.filter(a=> a._1.startsWith("a"))

4. reduceByKey() Transformation

reduceByKey() merges the values for each key with the function specified. In our example, it reduces the word string by applying the sum function to the value. The result of our RDD contains unique words and their count. 


val rdd5 = rdd3.reduceByKey(_ + _)

5. sortByKey() Transformation

sortByKey() transformation is used to sort RDD elements on key. In our example, first, we convert RDD[(String,Int]) to RDD[(Int,String]) using map transformation and apply sortByKey which ideally does sort on an integer value. And finally, foreach with println statement prints all words in RDD and their count as key-value pair to console.


val rdd6 = rdd5.map(a=>(a._2,a._1)).sortByKey()

//Print rdd6 result to console
rdd6.foreach(println)

6. Spark Word Count Example

Following is a complete example of a word count example in Scala by using several RDD transformations.


import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object WordCountExample {

  def main(args:Array[String]): Unit = {

    val spark:SparkSession = SparkSession.builder()
      .master("local[3]")
      .appName("SparkByExamples.com")
      .getOrCreate()

    val sc = spark.sparkContext

    val rdd:RDD[String] = sc.textFile("src/main/resources/test.txt")
    println("initial partition count:"+rdd.getNumPartitions)

    val reparRdd = rdd.repartition(4)
    println("re-partition count:"+reparRdd.getNumPartitions)

    //rdd.coalesce(3)

    rdd.collect().foreach(println)

    // rdd flatMap transformation
    val rdd2 = rdd.flatMap(f=>f.split(" "))
    rdd2.foreach(f=>println(f))

    //Create a Tuple by adding 1 to each word
    val rdd3:RDD[(String,Int)]= rdd2.map(m=>(m,1))
    rdd3.foreach(println)

    //Filter transformation
    val rdd4 = rdd3.filter(a=> a._1.startsWith("a"))
    rdd4.foreach(println)

    //ReduceBy transformation
    val rdd5 = rdd3.reduceByKey(_ + _)
    rdd5.foreach(println)

    //Swap word,count and sortByKey transformation
    val rdd6 = rdd5.map(a=>(a._2,a._1)).sortByKey()
    println("Final Result")

    //Action - foreach
    rdd6.foreach(println)

    //Action - count
    println("Count : "+rdd6.count())

    //Action - first
    val firstRec = rdd6.first()
    println("First Record : "+firstRec._1 + ","+ firstRec._2)

    //Action - max
    val datMax = rdd6.max()
    println("Max Record : "+datMax._1 + ","+ datMax._2)

    //Action - reduce
    val totalWordCount = rdd6.reduce((a,b) => (a._1+b._1,a._2))
    println("dataReduce Record : "+totalWordCount._1)
    //Action - take
    val data3 = rdd6.take(3)
    data3.foreach(f=>{
      println("data3 Key:"+ f._1 +", Value:"+f._2)
    })

    //Action - collect
    val data = rdd6.collect()
    data.foreach(f=>{
      println("Key:"+ f._1 +", Value:"+f._2)
    })

    //Action - saveAsTextFile
    rdd5.saveAsTextFile("c:/tmp/wordCount")
    
  }
}

Conclusion

In this Spark RDD Transformations tutorial, you have learned different transformation functions and their usage with scala examples and GitHub project for quick reference.

Happy Learning !!

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium