You are currently viewing Spark Write DataFrame into Single CSV File (merge multiple part files)

In this article, I will explain how to save/write Spark DataFrame, Dataset, and RDD contents into a Single File (file format can be CSV, Text, JSON e.t.c) by merging all multiple part files into one file using Scala example.

Advertisements

By design, when you save an RDD, DataFrame, or Dataset, Spark creates a folder with the name specified in a path and writes data as multiple part files in parallel (one-part file for each partition). Each part file will have an extension of the format you write (for example .csv, .json, .txt e.t.c)


//Spark Read CSV File
val df = spark.read.option("header",true).csv("address.csv")
//Write DataFrame to address directory
df.write.csv("address")

This writes multiple part files in address directory.

Spark Save Single CSV File

Spark also create _SUCCESS and multiple hidden files along with the data part files, For example, for each part file, it creates a CRC file and additional _SUCCESS.CRC file as shown in the above picture.

Sometimes you may need to save your dataset as a single file without a directory, and remove all these hidden files, this can be done in several ways. The below examples explain this by using a CSV file.

1. Write a Single file using Spark coalesce() & repartition()

When you are ready to write a DataFrame, first use Spark repartition() and coalesce() to merge data from all partitions into a single partition and then save it to a file. This still creates a directory and write a single part file inside a directory instead of multiple part files.


df.coalesce(1).write.csv("address")
df.repartition(1).write.csv("address")

Both coalesce() and repartition() are Spark Transformation operations that shuffle the data from multiple partitions into a single partition. Use coalesce() as it performs better and uses lesser resources compared with repartition().

Note: You have to be very careful when using Spark coalesce() and repartition() methods on larger datasets as they are expensive operations and could throw OutOfMemory errors.

Using this approach, Spark still creates a directory and write a single partition file along with CRC files and _SUCCESS file. If you wanted to remove these use below Hadoop file system library code.


import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}

// Copy the actual file from Directory and Renames to custom name
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)

val srcPath=new Path("c:/tmp/address")
val destPath= new Path("c:/tmp/address_merged.csv")
val srcFile=FileUtil.listFiles(new File("c:/tmp/address"))
           .filterNot(f=>f.getPath.endsWith(".csv"))(0)
//Copy the CSV file outside of Directory and rename 
FileUtil.copy(srcFile,hdfs,destPath,true,hadoopConfig)
//Removes CRC File that create from above statement
hdfs.delete(new Path(".address_merged.csv.crc"),true)
//Remove Directory created by df.write()
hdfs.delete(srcPath,true)

2. Write Single File using Hadoop FileSystem Library

Since Spark natively supports Hadoop, you can also use Hadoop File system library to merge multiple part files and write a single CSV file.


import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}

val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
val srcPath=new Path("address")
val destPath= new Path("address_merged.csv")
FileUtil.copyMerge(hdfs, srcPath, hdfs, destPath, true, hadoopConfig, null)

//Remove hidden CRC file if not needed.
hdfs.delete(new Path(".address_merged.csv.crc"),true)

Note: In Hadoop 3.0 and later versions, FileUtil.copyMerge() has been removed and recommends using -getmerge option of the HDFS command.

3. Merge Using HDFS getMerge()

If you are using Hadoop 3.0 version, use hadoop fs -getmerge HDFS command to merge all partition files into a single CSV file.

Unlike FileUtil.copyMerge(), this copies the merged file to local file system from HDFS. You have to copy the file back to HDFS if needed.


hadoop fs -getmerge  /address-tmp /address.csv

4. Write a Single File in Databricks

If you are using Databricks, you can still use Spark repartition() or coalesce() to write a single file and use dbutils API to remove the hidden CRC & _SUCCESS files and copy the actual file from a directory.


df.coalesce(1).write.csv("address-temp")

//Copy partfile from folder to a File and remove a Folder
//This remove all CRC files
val file_path= dbutils.fs.ls("address-temp/")
     .filter(file=>file.name.endsWith(".csv"))(0).path
dbutils.fs.cp(file_path,"address.csv")
dbutils.fs.rm("address-temp",recurse=true)

5. Complete Example

You can file complete example @ GitHub for reference


package com.sparkbyexamples.spark.dataframe.examples

import java.io.File

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}

object SaveSingleFile extends App{

  val spark:SparkSession = SparkSession.builder()
    .master("local[3]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  val df = spark.read.option("header",true)
    .csv("src/main/resources/address.csv")
  df.repartition(1)
    .write.mode(SaveMode.Overwrite).csv("/tmp/address")


  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig)

  val srcPath=new Path("/tmp/address")
  val destPath= new Path("/tmp/address_merged.csv")
  val srcFile=FileUtil.listFiles(new File("c:/tmp/address"))
    .filterNot(f=>f.getPath.endsWith(".csv"))(0)
  //Copy the CSV file outside of Directory and rename
  FileUtil.copy(srcFile,hdfs,destPath,true,hadoopConfig)
  //Remove Directory created by df.write()
  hdfs.delete(srcPath,true)
  //Removes CRC File
  hdfs.delete(new Path("/tmp/.address_merged.csv.crc"),true)

  // Merge Using Haddop API
  df.repartition(1).write.mode(SaveMode.Overwrite)
    .csv("/tmp/address-tmp")
  val srcFilePath=new Path("/tmp/address-tmp")
  val destFilePath= new Path("/tmp/address_merged2.csv")
  FileUtil.copyMerge(hdfs, srcFilePath, hdfs, destFilePath, true, hadoopConfig, null)
  //Remove hidden CRC file if not needed.
  hdfs.delete(new Path("/tmp/.address_merged2.csv.crc"),true)
}

Conclusion

In this article, you have learned to save/write a Spark DataFrame into a Single file using coalesce(1) and repartition(1), how to merge multiple part files into a single file using FileUtil.copyMerge() function from the Hadoop File system library, Hadoop HDFS command hadoop fs -getmerge and many more.

Happy Learning !!

Leave a Reply

This Post Has One Comment

  1. Anonymous

    How to write into single text flle from partitioned file in azure databricks using pyspark