Spark Read Text File from AWS S3 bucket

In this Spark sparkContext.textFile() and sparkContext.wholeTextFiles() methods to use to read test file from Amazon AWS S3 into RDD and spark.read.text() and spark.read.textFile() methods to read from Amazon AWS S3 into DataFrame.

Using these methods we can also read all files from a directory and files with a specific pattern on the AWS S3 bucket.

Amazon S3 bucket and dependency

In order to interact with Amazon AWS S3 from Spark, we need to use the third party library. And this library has 3 different options.

GenerationUsageDescription
First – s3s3:\\s3 which is also called classic (s3: filesystem for reading from or storing objects in Amazon S3 This has been deprecated and recommends using either the second or third generation library.
Second – s3ns3n:\\s3n uses native s3 object and makes easy to use it with Hadoop and other files systems. This is also not the recommended option.
Third – s3as3a:\\s3a – This is a replacement of s3n which supports larger files and improves in performance.

In this example, we will use the latest and greatest Third Generation which is <strong>s3a:\\</strong> . Below are the Hadoop and AWS dependencies you would need in order Spark to read/write files into Amazon AWS S3 storage.

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-aws</artifactId>
            <version>3.0.0</version>
        </dependency>

You can find more details about these dependencies and use the one which is suitable for you. Regardless of which one you use, the steps of how to read/write to Amazon S3 would be exactly the same except s3a:\\.


val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()
 // Replace Key with your AWS account key (You can find this on IAM 
spark.sparkContext
     .hadoopConfiguration.set("fs.s3a.access.key", "awsaccesskey value")
service)
 // Replace Key with your AWS secret key (You can find this on IAM 
spark.sparkContext
     .hadoopConfiguration.set("fs.s3a.secret.key", "aws secretkey value")
spark.sparkContext
      .hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")

In case if you are using s3n: file system


spark.sparkContext
     .hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "awsAccessKeyId value")
spark.sparkContext
     .hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "awsSecretAccessKey value")
spark.sparkContext
      .hadoopConfiguration.set("fs.s3n.endpoint", "s3.amazonaws.com")

1. Spark read a text file from S3 into RDD

We can read a single text file, multiple files and all files from a directory located on S3 bucket into Spark RDD by using below two functions that are provided in SparkContext class.

Before we start, let’s assume we have the following file names and file contents at folder “csv” on S3 bucket and I use these files here to explain different ways to read text files with examples.

File Name File Contents
text01.txtOne,1
Eleven,11
text02.txt Two,2
text03.txt Three,3
text04.txt Four,4
invalid.txtInvalid,I

1.1 textFile() – Read text file from S3 into RDD

sparkContext.textFile() method is used to read a text file from S3 (use this method you can also read from several data sources) and any Hadoop supported file system, this method takes the path as an argument and optionally takes a number of partitions as the second argument.


  println("##spark read text files from a directory into RDD")
  val rddFromFile = spark.sparkContext.textFile("s3a://sparkbyexamples/csv/text01.txt")
  println(rddFromFile.getClass)

  println("##Get data Using collect")
  rddFromFile.collect().foreach(f=>{
    println(f)
  })

Here, it reads every line in a "text01.txt" file as an element into RDD and prints below output.


##spark read text files from a directory into RDD
class org.apache.spark.rdd.MapPartitionsRDD
##Get data Using collect
One,1
Eleven,11

1.2 wholeTextFiles() – Read text files from S3 into RDD of Tuple.

sparkContext.wholeTextFiles() reads a text file into PairedRDD of type RDD[(String,String)] with the key being the file path and value being contents of the file. This method also takes the path as an argument and optionally takes a number of partitions as the second argument.

Let’s see a similar example with wholeTextFiles() method.


  println("##read whole text files")
  val rddWhole = spark.sparkContext.wholeTextFiles("s3a://sparkbyexamples/csv/text01.txt")

  

1.3 Reading multiple files at a time

When you know the names of the multiple files you would like to read, just input all file names with comma separator and just a folder if you want to read all files from a folder in order to create an RDD and both methods mentioned above supports this.


  println("##read multiple text files into a RDD")
  val rdd4 = spark.sparkContext.textFile("s3a://sparkbyexamples/csv/text01.txt," +
    "src/main/resources/csv/text02.txt")
  rdd4.foreach(f=>{
    println(f)
  })

This read file text01.txt & text02.txt files.


##read multiple text files into a RDD
One,1
Eleven,11
Two,2

1.4 Read all text files matching a pattern

textFile() and wholeTextFiles() methods also accepts pattern matching and wild characters. For example below snippet read all files start with text and with the extension “.txt” and creates single RDD.


  println("##read text files base on wildcard character")
  val rdd3 = spark.sparkContext.textFile("s3a://sparkbyexamples/csv/text*.txt")
  rdd3.foreach(f=>{
    println(f)
  })

Yields below output.


##read text files base on wildcard character
One,1
Eleven,11
Two,2
Three,3
Four,4

1.5 Read files from multiple directories on S3 bucket into single RDD

It also supports reading files and multiple directories combination.


  println("##read all text files from a directory to single RDD")
  val rdd2 = spark.sparkContext.textFile("s3a://sparkbyexamples/csv/*")
  rdd2.foreach(f=>{
    println(f)
  })

Yields below output


##read all text files from a directory to single RDD
Invalid,I
One,1
Eleven,11
Two,2
Three,3
Four,4

1.6 Reading text files from nested directories into Single RDD

textFile() and wholeTextFile() returns an error when it finds a nested folder hence, first using scala, Java, Python languages create a file path list by traversing all nested folders and pass all file names with comma separator in order to create a single RDD. I will leave it to you to research and come up with an example.

1.7 Reading all text files separately and union to create a Single RDD

You can also read each text file into a separate RDD’s and union all these to create a single RDD. Again, I will leave this to you to explore.

1.8 Spark RDD with S3 Complete code


package com.sparkbyexamples.spark.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object ReadTextFiles extends App {

  val spark:SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()
 // Replace Key with your AWS account key (You can find this on IAM 
spark.sparkContext
     .hadoopConfiguration.set("fs.s3a.access.key", "awsaccesskey value")
service)
 // Replace Key with your AWS secret key (You can find this on IAM 
spark.sparkContext
     .hadoopConfiguration.set("fs.s3a.secret.key", "aws secretkey value")
spark.sparkContext
      .hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")
  spark.sparkContext.setLogLevel("ERROR")

  println("##spark read text files from a directory into RDD")
  val rddFromFile = spark.sparkContext.textFile("s3a://sparkbyexamples/csv/text01.txt")
  println(rddFromFile.getClass)

  println("##Get data Using collect")
  rddFromFile.collect().foreach(f=>{
    println(f)
  })

  println("##read multiple text files into a RDD")
  val rdd4 = spark.sparkContext.textFile("s3a://sparkbyexamples/csv/text01.txt," +
    "s3a://sparkbyexamples/csv/text02.txt")
  rdd4.foreach(f=>{
    println(f)
  })

  println("##read text files base on wildcard character")
  val rdd3 = spark.sparkContext.textFile("s3a://sparkbyexamples/csv/text*.txt")
  rdd3.foreach(f=>{
    println(f)
  })

  println("##read all text files from a directory to single RDD")
  val rdd2 = spark.sparkContext.textFile("s3a://sparkbyexamples/csv/*")
  rdd2.foreach(f=>{
    println(f)
  })

  println("##read whole text files")
  val rddWhole:RDD[(String,String)] = spark.sparkContext.wholeTextFiles("s3a://sparkbyexamples/csv/text01.txt")
  println(rddWhole.getClass)
  rddWhole.foreach(f=>{
    println(f._1+"=>"+f._2)
  })
}

This complete code is also available at GitHub for reference

2. Spark read text file into DataFrame and Dataset

Using spark.read.text() and spark.read.textFile() We can read a single text file, multiple files and all files from a directory on S3 bucket into Spark DataFrame and Dataset. Let’s see examples with scala language.

Note: These methods don’t take an argument to specify the number of partitions.

2.1 text() – Read text file from S3 into DataFrame

spark.read.text() method is used to read a text file from S3 into DataFrame. like in RDD, we can also use this method to read multiple files at a time, reading patterns matching files and finally reading all files from a directory.


    //returns DataFrame
    val df:DataFrame = spark.read.text("s3a://sparkbyexamples/csv/text01.txt")
    df.printSchema()
    df.show(false)

Yields below output


root
 |-- value: string (nullable = true)

+---------+
|value    |
+---------+
|One,1    |
|Eleven,11|
+---------+

As you see, each line in a text file represents a record in DataFrame with just one column “value”. In case if you want to convert into multiple columns, you can use map transformation and split method to transform, the below example demonstrates this.


    //converting to columns by splitting
    import spark.implicits._
    val df2 = df.map(f=>{
      val elements = f.getString(0).split(",")
      (elements(0),elements(1))
    })
    df2.printSchema()
    df2.show(false)

This splits all elements in a DataFrame by delimiter and converts into a DataFrame of Tuple2


root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)


+------+---+
|_1    |_2 |
+------+---+
|One   |1  |
|Eleven|11 |
+------+---+

2.2 textFile() – Read text file from S3 into Dataset

spark.read.textFile() method returns a Dataset[String], like text(), we can also use this method to read multiple files at a time, reading patterns matching files and finally reading all files from a directory on S3 bucket into Dataset.


    // returns Dataset[String]
    val ds:Dataset[String] = spark.read.textFile("s3a://sparkbyexamples/csv/text01.txt")
    ds.printSchema()
    ds.show(false)

Yields below output


root
 |-- value: string (nullable = true)

+---------+
|value    |
+---------+
|One,1    |
|Eleven,11|
+---------+

Now let’s convert each element in Dataset into multiple columns by splitting with delimiter “,”


    //converting to columns by splitting
    import spark.implicits._
    val ds2 = ds.map(f=> {
     val elements = f.split(",")
      (elements(0),elements(1))
    })

    ds2.printSchema()
    ds2.show(false)

Yields below output. This splits all elements in a Dataset by delimiter and converts into a Dataset[Tuple2]


root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)

+------+---+
|_1    |_2 |
+------+---+
|One   |1  |
|Eleven|11 |
+------+---+
+------------+

2.3 Spark with AWS S3 complete example


package com.sparkbyexamples.spark.dataframe

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object FromTextFile {

  def main(args:Array[String]):Unit= {

    val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()
 // Replace Key with your AWS account key (You can find this on IAM 
spark.sparkContext
     .hadoopConfiguration.set("fs.s3a.access.key", "awsaccesskey value")
service)
 // Replace Key with your AWS secret key (You can find this on IAM 
spark.sparkContext
     .hadoopConfiguration.set("fs.s3a.secret.key", "aws secretkey value")
spark.sparkContext
      .hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")

    //returns DataFrame
    val df:DataFrame = spark.read.text("s3a://sparkbyexamples/csv/text01.txt")
    df.printSchema()
    df.show(false)

    //converting to columns by splitting
    import spark.implicits._
    val df2 = df.map(f=>{
      val elements = f.getString(0).split(",")
      (elements(0),elements(1))
    })

    df2.printSchema()
    df2.show(false)

    // returns Dataset[String]
    val ds:Dataset[String] = spark.read.textFile("s3a://sparkbyexamples/csv/text01.txt")
    ds.printSchema()
    ds.show(false)

    //converting to columns by splitting
    import spark.implicits._
    val ds2 = ds.map(f=> {
     val elements = f.split(",")
      (elements(0),elements(1))
    })

    ds2.printSchema()
    ds2.show(false)
  }
}

This complete code is also available at GitHub for reference.

Conclusion

In this tutorial, you have learned how to read a text file from AWS S3 into DataFrame and RDD by using different methods available from SparkContext and Spark SQL. Also, you learned how to read multiple text files, by pattern matching and finally reading all files from a folder.

Happy Learning !!

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

Leave a Reply

Spark Read Text File from AWS S3 bucket
Photo by Nemichandra Hombannavar on Unsplash