Spark saveAsTextFile() is one of the methods that write the content into one or more text files (part files). In this article, we shall discuss in detail about Spark saveAsTextFile() method, and its configurations and illustrate its application using an example.

1. What is Spark saveAsTextFile() Method?

saveAsTextFile() is a method in Apache Spark’s RDD (Resilient Distributed Dataset) class that writes the content of an RDD to one or more text files in a given directory. The method takes a string argument that specifies the output directory where the text files will be written.

Here is the method signature in Scala:


// Method signature
def saveAsTextFile(path: String): Unit

The method writes each element of the RDD as a line in the text files. The format of each line is determined by the toString() method of the object, which can be overridden to provide a custom string representation.

Note that the saveAsTextFile() method is an action, which means it triggers the execution of the RDD lineage and writes the data to disk.

2. Spark saveAsTextFile() method properties

The saveAsTextFile() method in Apache Spark provides a number of properties that can be used to customize the output format and compression of the text files.

Here are some commonly used properties:

1. File compression: You can compress the output text files using one of the compression codecs supported by Spark. The compression codec can be specified as a string argument to the saveAsTextFile() method. For example, to compress the output text files using the Snappy codec, you can use the following code:


// Compress the output text files using the Snappy codec
myRDD.saveAsTextFile("output_directory", 
     classOf[org.apache.hadoop.io.compress.SnappyCodec])

2. Output format: By default, saveAsTextFile() writes the output files in plain text format. However, you can specify a different output format by providing a custom implementation of the OutputFormat interface. You can then use the saveAsNewAPIHadoopFile() method to write the output files using the custom output format. Here’s an example:


// Imports
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

// Initialize Configuration
val conf = new Configuration()

// Set outputformat as comma Separator
conf.set("mapreduce.output.textoutputformat.separator", ",")

// Use the above config to set the output format to comma seperator (CSV)
myRDD.saveAsNewAPIHadoopFile("output_directory",
    classOf[TextOutputFormat[Text, Text]],
    classOf[Text],
    classOf[Text],
    conf)

In this example, we specify that the output files should be in CSV format, with a comma as the separator.

3. The number of output files: By default, saveAsTextFile writes the output to one or more files in the specified directory, depending on the number of partitions in the RDD. You can control the number of output files by calling the coalesce or repartition methods on the RDD before calling saveAsTextFile. For example, to write the output to a single file, you can use the following code:


// Merge myRDD into single partition and write into output_directory
myRDD.coalesce(1).saveAsTextFile("output_directory")

3. Examples of Spark saveAsTextFile()

So far we learned the real definition of spark saveAsTextFile() and its properties, Now let us see how we can use this method in our Spark Application. Here are a few examples of using the saveAsTextFile method in Apache Spark:

Before we jump into the application of spark saveAsTextFile, Let us create a sample RDD and use it for other examples

Create RDD:

You can create a sample RDD of tuples (date, product, price) using the following code snippet:


// Create an RDD of strings
import org.apache.spark.{SparkConf, SparkContext}

// Create a SparkConf object with the app name
val conf = new SparkConf().setAppName("Sales RDD Example")

// Create a SparkContext object
val sc = new SparkContext(conf)

// Create a sample RDD of sales data
val salesRDD = sc.parallelize(Seq(
  ("2022-01-01", "Product A", 10.0),
  ("2022-01-01", "Product B", 20.0),
  ("2022-01-02", "Product C", 30.0),
  ("2022-01-02", "Product A", 40.0),
  ("2022-01-03", "Product B", 50.0),
  ("2022-01-03", "Product C", 60.0)
))

// Print the first few elements of the RDD
salesRDD.take(3).foreach(println)

The output of the above RDD looks like


// Output RDD
(2022-01-01,Product A,10.0)
(2022-01-01,Product B,20.0)
(2022-01-02,Product C,30.0)

Example 1: Writing a simple RDD to a text file

This will write the sales RDD using Spark saveAsTextFile() method into a directory named “FileStore/output_directory” in plain text format. Since I am using Databricks for this example I have mentioned “dbfs:” in the directory name.


// Write the RDD to a text file 
sales.saveAsTextFile("dbfs:/FileStore/output_directory")

Listing the output directory looks as below. We can see our RDD is written into multiple text Files.

Spark saveAsTextFile

Example 2: Writing RDD into a compressed text file

This will write the sales RDD using the Spark saveAsTextFile method into a directory named “dbfs:FileStore/output_directory_compressed” in compressed GZIP format.


// Write the sales RDD to a compressed text file
sales.saveAsTextFile("dbfs:/FileStore/output_directory_compressed", classOf[org.apache.hadoop.io.compress.GzipCodec])

Listing the output directory looks as below. We can see our RDD is written into multiple compressed text Files(.gz).

Example 3: Writing a CSV file using a custom output format

In Spark, sometimes it is required to have data in more structured files for better processing and optimizations. So along with writing data in text files, we can also publish the RDD context into CSV files using custom functions on top of the Spark saveAsTextFile method. The below custom method will write the sales RDD to a directory named “output_directory” in CSV format with quotes around each value.


// Imports for spark libs
import org.apache.hadoop.mapred.{RecordWriter, TextOutputFormat, LineRecordWriter}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.mapreduce.TaskAttemptContext

// Create a custom implementation of TextOutputFormat that writes output in CSV format
class CsvOutputFormat extends TextOutputFormat[Text, Text] {
  override def getRecordWriter(context: TaskAttemptContext): RecordWriter[Text, Text] = {
    val separator = ","
    val textOutput = new LineRecordWriter[Text, Text](super.getRecordWriter(context), separator)
    new RecordWriter[Text, Text] {
      override def write(key: Text, value: Text): Unit = {
        val csvKey = "\"" + key.toString + "\""
        val csvValue = "\"" + value.toString + "\""
        textOutput.write(new Text(csvKey), new Text(csvValue))
      }
      override def close(context: TaskAttemptContext): Unit = textOutput.close(context)
    }
  }
}

// Set the separator for the CSV output format
val conf = new Configuration()
conf.set("mapreduce.output.textoutputformat.separator", ",")

// Write the RDD to a text file using the custom CSV output format
sales.saveAsNewAPIHadoopFile(
  "output_directory",
  classOf[CsvOutputFormat],
  classOf[Text],
  classOf[Text],
  conf
)

The resulting output file will contain the following lines:


// Output:
"A","1"
"B","2"
"C","3"

4. Conclusion

In conclusion, the saveAsTextFile() method in Spark/PySpark is a simple and efficient way to save the contents of an RDD to a text file in a Hadoop-compatible file system. It writes the contents of the RDD as plain text, with one record per line.

This method can be useful for exporting data from Spark for further processing or analysis in other tools or systems. However, it has some limitations, such as not being able to handle nested data structures or different data types within a single RDD.

Furthermore, there are other file formats that can be used in Spark, such as Parquet, ORC, or Avro, which provide more efficient storage and querying capabilities for structured data. However, if the data is unstructured and simple, saveAsTextFile() is still a good option.

Related Articles

rimmalapudi

Data Engineer. I write about BigData Architecture, tools and techniques that are used to build Bigdata pipelines and other generic blogs.