In this Spark sparkContext.textFile()
and sparkContext.wholeTextFiles()
methods to use to read test file from Amazon AWS S3 into RDD and spark.read.text()
and spark.read.textFile()
methods to read from Amazon AWS S3 into DataFrame.
Using these methods we can also read all files from a directory and files with a specific pattern on the AWS S3 bucket.
Amazon S3 bucket and dependency
In order to interact with Amazon AWS S3 from Spark, we need to use the third party library. And this library has 3 different options.
Generation | Usage | Description |
First – s3 | s3:\\ | s3 which is also called classic (s3: filesystem for reading from or storing objects in Amazon S3 This has been deprecated and recommends using either the second or third generation library. |
Second – s3n | s3n:\\ | s3n uses native s3 object and makes easy to use it with Hadoop and other files systems. This is also not the recommended option. |
Third – s3a | s3a:\\ | s3a – This is a replacement of s3n which supports larger files and improves in performance. |
In this example, we will use the latest and greatest Third Generation which is <strong>s3a:\\</strong>
. Below are the Hadoop and AWS dependencies you would need in order Spark to read/write files into Amazon AWS S3 storage.
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>3.0.0</version>
</dependency>
You can find more details about these dependencies and use the one which is suitable for you. Regardless of which one you use, the steps of how to read/write to Amazon S3 would be exactly the same except s3a:\\
.
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
// Replace Key with your AWS account key (You can find this on IAM
spark.sparkContext
.hadoopConfiguration.set("fs.s3a.access.key", "awsaccesskey value")
service)
// Replace Key with your AWS secret key (You can find this on IAM
spark.sparkContext
.hadoopConfiguration.set("fs.s3a.secret.key", "aws secretkey value")
spark.sparkContext
.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")
In case if you are using s3n:
file system
spark.sparkContext
.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "awsAccessKeyId value")
spark.sparkContext
.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "awsSecretAccessKey value")
spark.sparkContext
.hadoopConfiguration.set("fs.s3n.endpoint", "s3.amazonaws.com")
1. Spark read a text file from S3 into RDD
We can read a single text file, multiple files and all files from a directory located on S3 bucket into Spark RDD by using below two functions that are provided in SparkContext class.
Before we start, let’s assume we have the following file names and file contents at folder “csv” on S3 bucket and I use these files here to explain different ways to read text files with examples.
File Name | File Contents |
---|---|
text01.txt | One,1 Eleven,11 |
text02.txt | Two,2 |
text03.txt | Three,3 |
text04.txt | Four,4 |
invalid.txt | Invalid,I |
1.1 textFile() – Read text file from S3 into RDD
sparkContext.textFile()
method is used to read a text file from S3 (use this method you can also read from several data sources) and any Hadoop supported file system, this method takes the path as an argument and optionally takes a number of partitions as the second argument.
println("##spark read text files from a directory into RDD")
val rddFromFile = spark.sparkContext.textFile("s3a://sparkbyexamples/csv/text01.txt")
println(rddFromFile.getClass)
println("##Get data Using collect")
rddFromFile.collect().foreach(f=>{
println(f)
})
Here, it reads every line in a "text01.txt"
file as an element into RDD and prints below output.
##spark read text files from a directory into RDD
class org.apache.spark.rdd.MapPartitionsRDD
##Get data Using collect
One,1
Eleven,11
1.2 wholeTextFiles() – Read text files from S3 into RDD of Tuple.
sparkContext.wholeTextFiles()
reads a text file into PairedRDD of type RDD[(String,String)] with the key being the file path and value being contents of the file. This method also takes the path as an argument and optionally takes a number of partitions as the second argument.
Let’s see a similar example with wholeTextFiles()
method.
println("##read whole text files")
val rddWhole = spark.sparkContext.wholeTextFiles("s3a://sparkbyexamples/csv/text01.txt")
1.3 Reading multiple files at a time
When you know the names of the multiple files you would like to read, just input all file names with comma separator and just a folder if you want to read all files from a folder in order to create an RDD and both methods mentioned above supports this.
println("##read multiple text files into a RDD")
val rdd4 = spark.sparkContext.textFile("s3a://sparkbyexamples/csv/text01.txt," +
"src/main/resources/csv/text02.txt")
rdd4.foreach(f=>{
println(f)
})
This read file text01.txt & text02.txt files.
##read multiple text files into a RDD
One,1
Eleven,11
Two,2
1.4 Read all text files matching a pattern
textFile()
and wholeTextFiles()
methods also accepts pattern matching and wild characters. For example below snippet read all files start with text and with the extension “.txt” and creates single RDD.
println("##read text files base on wildcard character")
val rdd3 = spark.sparkContext.textFile("s3a://sparkbyexamples/csv/text*.txt")
rdd3.foreach(f=>{
println(f)
})
Yields below output.
##read text files base on wildcard character
One,1
Eleven,11
Two,2
Three,3
Four,4
1.5 Read files from multiple directories on S3 bucket into single RDD
It also supports reading files and multiple directories combination.
println("##read all text files from a directory to single RDD")
val rdd2 = spark.sparkContext.textFile("s3a://sparkbyexamples/csv/*")
rdd2.foreach(f=>{
println(f)
})
Yields below output
##read all text files from a directory to single RDD
Invalid,I
One,1
Eleven,11
Two,2
Three,3
Four,4
1.6 Reading text files from nested directories into Single RDD
textFile() and wholeTextFile() returns an error when it finds a nested folder hence, first using scala, Java, Python languages create a file path list by traversing all nested folders and pass all file names with comma separator in order to create a single RDD. I will leave it to you to research and come up with an example.
1.7 Reading all text files separately and union to create a Single RDD
You can also read each text file into a separate RDD’s and union all these to create a single RDD. Again, I will leave this to you to explore.
1.8 Spark RDD with S3 Complete code
package com.sparkbyexamples.spark.rdd
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object ReadTextFiles extends App {
val spark:SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
// Replace Key with your AWS account key (You can find this on IAM
spark.sparkContext
.hadoopConfiguration.set("fs.s3a.access.key", "awsaccesskey value")
service)
// Replace Key with your AWS secret key (You can find this on IAM
spark.sparkContext
.hadoopConfiguration.set("fs.s3a.secret.key", "aws secretkey value")
spark.sparkContext
.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")
spark.sparkContext.setLogLevel("ERROR")
println("##spark read text files from a directory into RDD")
val rddFromFile = spark.sparkContext.textFile("s3a://sparkbyexamples/csv/text01.txt")
println(rddFromFile.getClass)
println("##Get data Using collect")
rddFromFile.collect().foreach(f=>{
println(f)
})
println("##read multiple text files into a RDD")
val rdd4 = spark.sparkContext.textFile("s3a://sparkbyexamples/csv/text01.txt," +
"s3a://sparkbyexamples/csv/text02.txt")
rdd4.foreach(f=>{
println(f)
})
println("##read text files base on wildcard character")
val rdd3 = spark.sparkContext.textFile("s3a://sparkbyexamples/csv/text*.txt")
rdd3.foreach(f=>{
println(f)
})
println("##read all text files from a directory to single RDD")
val rdd2 = spark.sparkContext.textFile("s3a://sparkbyexamples/csv/*")
rdd2.foreach(f=>{
println(f)
})
println("##read whole text files")
val rddWhole:RDD[(String,String)] = spark.sparkContext.wholeTextFiles("s3a://sparkbyexamples/csv/text01.txt")
println(rddWhole.getClass)
rddWhole.foreach(f=>{
println(f._1+"=>"+f._2)
})
}
This complete code is also available at GitHub for reference
2. Spark read text file into DataFrame and Dataset
Using spark.read.text()
and spark.read.textFile()
We can read a single text file, multiple files and all files from a directory on S3 bucket into Spark DataFrame and Dataset. Let’s see examples with scala language.
Note: These methods don’t take an argument to specify the number of partitions.
2.1 text() – Read text file from S3 into DataFrame
spark.read.text()
method is used to read a text file from S3 into DataFrame. like in RDD, we can also use this method to read multiple files at a time, reading patterns matching files and finally reading all files from a directory.
//returns DataFrame
val df:DataFrame = spark.read.text("s3a://sparkbyexamples/csv/text01.txt")
df.printSchema()
df.show(false)
Yields below output
root
|-- value: string (nullable = true)
+---------+
|value |
+---------+
|One,1 |
|Eleven,11|
+---------+
As you see, each line in a text file represents a record in DataFrame with just one column “value”. In case if you want to convert into multiple columns, you can use map transformation and split method to transform, the below example demonstrates this.
//converting to columns by splitting
import spark.implicits._
val df2 = df.map(f=>{
val elements = f.getString(0).split(",")
(elements(0),elements(1))
})
df2.printSchema()
df2.show(false)
This splits all elements in a DataFrame by delimiter and converts into a DataFrame of Tuple2
root
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)
+------+---+
|_1 |_2 |
+------+---+
|One |1 |
|Eleven|11 |
+------+---+
2.2 textFile() – Read text file from S3 into Dataset
spark.read.textFile()
method returns a Dataset[String]
, like text(), we can also use this method to read multiple files at a time, reading patterns matching files and finally reading all files from a directory on S3 bucket into Dataset.
// returns Dataset[String]
val ds:Dataset[String] = spark.read.textFile("s3a://sparkbyexamples/csv/text01.txt")
ds.printSchema()
ds.show(false)
Yields below output
root
|-- value: string (nullable = true)
+---------+
|value |
+---------+
|One,1 |
|Eleven,11|
+---------+
Now let’s convert each element in Dataset into multiple columns by splitting with delimiter “,”
//converting to columns by splitting
import spark.implicits._
val ds2 = ds.map(f=> {
val elements = f.split(",")
(elements(0),elements(1))
})
ds2.printSchema()
ds2.show(false)
Yields below output. This splits all elements in a Dataset by delimiter and converts into a Dataset[Tuple2]
root
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)
+------+---+
|_1 |_2 |
+------+---+
|One |1 |
|Eleven|11 |
+------+---+
+------------+
2.3 Spark with AWS S3 complete example
package com.sparkbyexamples.spark.dataframe
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object FromTextFile {
def main(args:Array[String]):Unit= {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
// Replace Key with your AWS account key (You can find this on IAM
spark.sparkContext
.hadoopConfiguration.set("fs.s3a.access.key", "awsaccesskey value")
service)
// Replace Key with your AWS secret key (You can find this on IAM
spark.sparkContext
.hadoopConfiguration.set("fs.s3a.secret.key", "aws secretkey value")
spark.sparkContext
.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")
//returns DataFrame
val df:DataFrame = spark.read.text("s3a://sparkbyexamples/csv/text01.txt")
df.printSchema()
df.show(false)
//converting to columns by splitting
import spark.implicits._
val df2 = df.map(f=>{
val elements = f.getString(0).split(",")
(elements(0),elements(1))
})
df2.printSchema()
df2.show(false)
// returns Dataset[String]
val ds:Dataset[String] = spark.read.textFile("s3a://sparkbyexamples/csv/text01.txt")
ds.printSchema()
ds.show(false)
//converting to columns by splitting
import spark.implicits._
val ds2 = ds.map(f=> {
val elements = f.split(",")
(elements(0),elements(1))
})
ds2.printSchema()
ds2.show(false)
}
}
This complete code is also available at GitHub for reference.
Conclusion
In this tutorial, you have learned how to read a text file from AWS S3 into DataFrame and RDD by using different methods available from SparkContext and Spark SQL. Also, you learned how to read multiple text files, by pattern matching and finally reading all files from a folder.
Happy Learning !!