Spark – Read multiple text files into single RDD?

Spark core provides textFile() & wholeTextFiles() methods in SparkContext class which is used to read single and multiple text or csv files into a single Spark RDD. Using this method we can also read all files from a directory and files with a specific pattern.

textFile() – Read single or multiple text, csv files and returns a single Spark RDD [String]

wholeTextFiles() – Reads single or multiple files and returns a single RDD[Tuple2[String, String]], where first value (_1) in a tuple is a file name and second value (_2) is content of the file.

In this article let’s see some examples with both of these methods using Scala and PySpark languages.

Before we start, let’s assume we have the following file names and file contents at folder “c:/tmp/files” and I use these files to demonstrate the examples.

File Name File Contents
text01.txtOne,1
text02.txt Two,2
text03.txt Three,3
text04.txt Four,4
invalid.txtInvalid,I

1. Spark Read all text files from a directory into a single RDD

In Spark, by inputting path of the directory to the textFile() method reads all text files and creates a single RDD. Make sure you do not have a nested directory If it finds one Spark process fails with an error.


// Spark Read all text files from a directory into a single RDD
val rdd = spark.sparkContext.textFile("C:/tmp/files/*")
rdd.foreach(f=>{
    println(f)
})

This example reads all files from a directory, creates a single RDD and prints the contents of the RDD.


// Output:
Invalid,I
One,1
Two,2
Three,3
Four,4

If you are running on a cluster you should first collect the data in order to print on a console as shown below.


// Collect the data
rdd.collect.foreach(f=>{
println(f)
})

Let’s see a similar example with wholeTextFiles() method. note that this returns an RDD[Tuple2]. where first value (_1) in a tuple is a file name and second value (_2) is content of the file.


// Using wholeTextFiles() to load the data
val rddWhole = spark.sparkContext.wholeTextFiles("C:/tmp/files/*")
rddWhole.foreach(f=>{
    println(f._1+"=>"+f._2)}) 

Yields below output.


// Output:
file:/C:/tmp/files/invalid.txt=>Invalid,I
file:/C:/tmp/files/text01.txt=>One,1
file:/C:/tmp/files/text02.txt=>Two,2
file:/C:/tmp/files/text03.txt=>Three,3
file:/C:/tmp/files/text04.txt=>Four,4

2. Spark Read multiple text files into a single RDD

When you know the names of the multiple files you would like to read, just input all file names with comma separator in order to create a single RDD.


// Read multiple textfiles (with comma separated) into Single RDD 
val rdd3 = spark.sparkContext.textFile("C:/tmp/files/text01.txt,C:/tmp/files/text02.txt")
rdd3.foreach(f=>{println(f)})

This read file text01.txt & text02.txt files and outputs below content.


// Output:
One,1
Two,2

3. Read all text files matching a pattern to single RDD

textFile() method also accepts pattern matching and wild characters. For example below snippet read all files start with text and with the extension “.txt” and creates single RDD.


// Read Textfiles matching a pattern
val rdd2 = spark.sparkContext.textFile("C:/tmp/files/text*.txt")
rdd2.foreach(f=>{println(f)})

Yields below output.


// Output:
One,1
Two,2
Three,3
Four,4

4. Read files from multiple directories into single RDD

It also supports reading files and multiple directories combination.


// Read files from multiple directories into Single RDD
val rdd2 = spark.sparkContext.textFile("C:/tmp/dir1/*,C:/tmp/dir2/*,c:/tmp/files/text01.txt")
rdd2.foreach(f=>{println(f)})

Yields below output


// Output:
One,1
Two,2
Invalid,I
One,1
Two,2
Three,3
Four,4

5. Reading text files from nested directories into Single RDD

textFile() and wholeTextFile() returns an error when it finds a nested folder hence, first using scala, Java, Python languages create a file path list by traversing all nested folders and pass all file names with comma separator in order to create a single RDD.

6. Reading all text files separately and union to create a Single RDD

You can also read all text files into a separate RDD’s and union all these to create a single RDD.


// Union multiple textfi;es into Single RDD 
val rdd1 = spark.sparkContext.textFile("C:/tmp/files/txt01.txt")
val rdd2 = spark.sparkContext.textFile("C:/tmp/files/txt02.txt")
val rdd3 = Seq(rdd1,rdd2)
val finalRdd = spark.sparkContext.union(rdd3)
finalRdd.foreach(x=>{println(x)})

We will see the below output after executing the above snippet.


// Output:
One,1
Two,2

7. Reading multiple CSV files into RDD

Spark RDD’s doesn’t have a method to read csv file formats hence we will use textFile() method to read csv file like any other text file into RDD and split the record based on comma, pipe or any other delimiter.


// Read multiple CSV files into RDD
val rdd5 = spark.sparkContext.textFile("C:/tmp/files/*")
val rdd6 = rdd5.map(f=>{
    f.split(",")
  })

rdd6.foreach(f => {
  println("Col1:"+f(0)+",Col2:"+f(1))
  })

Here, we read all csv files in a directory into RDD, we apply map transformation to split the record on comma delimiter and a map returns another RDD “rdd6” after transformation. finally, we iterate rdd6, reads the column based on an index.

Note: You can’t update RDD as they are immutable. this example yields the below output.


// Output:
Col1:Invalid,Col2:I
Col1:One,Col2:1
Col1:Two,Col2:2
Col1:Three,Col2:3
Col1:Four,Col2:4

8. Complete code


package com.sparkbyexamples.spark.rdd

import org.apache.spark.sql.SparkSession

object ReadMultipleFiles extends App {

  val spark:SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  println("read all text files from a directory to single RDD")
  val rdd = spark.sparkContext.textFile("C:/tmp/files/*")
  rdd.foreach(f=>{
    println(f)
  })

  println("read text files base on wildcard character")
  val rdd2 = spark.sparkContext.textFile("C:/tmp/files/text*.txt")
  rdd2.foreach(f=>{
    println(f)
  })

  println("read multiple text files into a RDD")
  val rdd3 = spark.sparkContext.textFile("C:/tmp/files/text01.txt,C:/tmp/files/text02.txt")
  rdd3.foreach(f=>{
    println(f)
  })

  println("Read files and directory together")
  val rdd4 = spark.sparkContext.textFile("C:/tmp/files/text01.txt,C:/tmp/files/text02.txt,C:/tmp/files/*")
  rdd4.foreach(f=>{
    println(f)
  })


  val rddWhole = spark.sparkContext.wholeTextFiles("C:/tmp/files/*")
  rddWhole.foreach(f=>{
    println(f._1+"=>"+f._2)
  })

  val rdd5 = spark.sparkContext.textFile("C:/tmp/files/*")
  val rdd6 = rdd5.map(f=>{
    f.split(",")
  })

  rdd6.foreach(f => {
    println("Col1:"+f(0)+",Col2:"+f(1))
  })

}

This complete code is also available on GitHub for reference.

Happy Learning !!

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

Leave a Reply

This Post Has 13 Comments

  1. Ashish Anandkumar Singh

    Its really amazing and helpful tutorial of spark

  2. nitesh

    hello there
    first , i really appreciate what you have done , all this knowledge in such a concise form is nowhere available on the internet
    second it would be really nice if at the end of every page there was a button to the next immediate link this will really help.

  3. Anonymous

    Thank you very much

  4. riyaz

    this tutorial is very good,

    thank you,if i have any doubts i wil query to you,please help on this

    regards
    riyaz

  5. Anonymous

    wowwwwwww Great Tutorial with various Example, Thank you so much

    1. NNK

      Thank you for your comments!!

  6. Dharun

    i believe we need to collect the rdd before printing the contents by using foreach(println)

    instead of

    rdd.foreach(f=>{
    println(f)
    })

    it should be rdd.collect.foreach(f=>{
    println(f)
    })

    to have it printed in the console.

    1. NNK

      Hi Dharun, Thanks for the comment. If you are running on a cluster with multiple nodes then you should collect the data first. In case if you are running in standalone for testing you don’t need to collect the data in order to output on the console, this is just a quick way to validate your result on local testing.
      I’ve added your suggestion to the article. Thanks again !!

  7. john

    Thanks for the tutorial
    but I think its not good practice to not use parallel RDDs and to output entire file on print

    1. NNK

      Hi John, Thanks for reading and providing comments. I agree that it’s not a food practice to output the entire file on print for realtime production applications however, examples mentioned here are intended to be simple and easy to practice hence most of my examples outputs the DataFrame on console.

  8. karthik

    Wow, great tutorial … to spark … Great Thanks ….

    1. NNK

      Thank you, Karthik for your kind words and glad it helped you. Kind of words you posted is keeping me blogging more.

  9. rah

    Can you please give examples in java too