Spark Read CSV file into DataFrame

Apache Spark provides a DataFrame API that allows an easy and efficient way to read a CSV file into DataFrame. DataFrames are distributed collections of data organized into named columns. Use spark.read.csv("path") from the API to read a CSV file. Spark supports reading files with pipe, comma, tab, or any other delimiter/separator files.

In this tutorial, you will learn how to read a single file, multiple files, and all files from a local directory into Spark DataFrame, apply some transformations, and finally write DataFrame back to a CSV file using Scala.

Note: Spark out of the box supports reading files in CSV, JSON, TEXT, Parquet, Avro, ORC and many more file formats into Spark DataFrame. 

Table of contents:

Spark Read CSV file into DataFrame

Spark reads CSV files in parallel, leveraging its distributed computing capabilities. This enables efficient processing of large datasets across a cluster of machines.

Using spark.read.csv("path") or spark.read.format("csv").load("path") you can read a CSV file into a Spark DataFrame. These methods take a file path as an argument.

The CSV file I used in this article can be found at GitHub. You can download it from the below command.

 
// Test CSV file
wget https://github.com/spark-examples/spark-scala-examples/blob/3ea16e4c6c1614609c2bd7ebdffcee01c0fe6017/src/main/resources/zipcodes.csv

Note: Spark uses lazy evaluation, which means that the actual reading of data doesn’t happen until an action is triggered. This allows for optimizations in the execution plan.

 
// Import
import org.apache.spark.sql.SparkSession

// Create SparkSession
val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()

// Read CSV file into DataFrame
val df = spark.read.csv("src/main/resources/zipcodes.csv")
df.printSchema()

Here, the spark is a SparkSession object. read is an object of DataFrameReader class and csv() is a method in DataFrameReader.

This example reads the data into DataFrame column names “_c0” for the first column and “_c1” for the second, and so on. By default, the data type of all these columns would be String.

spark read csv file

When you use format("csv") method, you can also specify the Data sources by their fully qualified name (i.e., org.apache.spark.sql.csv), but for built-in sources, you can also use their short names (csv,jsonparquetjdbctext e.t.c). For example:

 
// Using format()
val df2 = spark.read.format("CSV").load("src/main/resources/zipcodes.csv")
df2.printSchema()

Spark Read CSV with Header

If you have a header with column names on the CSV file, you need to explicitly specify header=true option using option("header",true). Not mentioning this, the API treats the header as a data record while reading CSV file.

 
// Specify header to true to get column names from CSV file
val df3 = spark.read.option("header",true)
   .csv("src/main/resources/zipcodes.csv")
df3.printSchema()

Not that it still reads all columns as a string (StringType) by default. 

read csv file into DataFrame

Read with Schema using inferSchema

Read CSV with Schema – Read the schema (inferschema) from the header record and derive the column type based on the data. Use option("inferSchema", true) to automatically detect the column type based on data. The default value set to this option is false.

Note: This option requires reading the data again to infer the schema.

 
// User inferSchema option to get right data type
val df4 = spark.read.option("inferSchema",true)
  .csv("src/main/resources/zipcodes.csv")

You should see appropriate datatypes assigned to columns.

You can use the options() method to specify multiple options at a time.

 
// User multiple options together
val options = Map("inferSchema"->"true","delimiter"->",","header"->"true")
val df5 = spark.read.options(options)
      .csv("src/main/resources/zipcodes.csv")
df5.printSchema()

Read CSV with Custom Schema

While reading CSV files into Spark DataFrame, you can either infer the schema automatically or explicitly specify it. Specifying the schema helps in avoiding schema inference overhead and ensures accurate data types.

If you know the schema of the file ahead and do not want to use the inferSchema option for column names and types, use user-defined custom column names and type using schema option.

 
// Import types
import org.apache.spark.sql.types._

// Read with custom schema
val schema = new StructType()
      .add("RecordNumber",IntegerType,true)
      .add("Zipcode",IntegerType,true)
      .add("ZipCodeType",StringType,true)
      .add("City",StringType,true)
      .add("State",StringType,true)
      .add("LocationType",StringType,true)
      .add("Lat",DoubleType,true)
      .add("Long",DoubleType,true)
      .add("Xaxis",IntegerType,true)
      .add("Yaxis",DoubleType,true)
      .add("Zaxis",DoubleType,true)
      .add("WorldRegion",StringType,true)
      .add("Country",StringType,true)
      .add("LocationText",StringType,true)
      .add("Location",StringType,true)
      .add("Decommisioned",BooleanType,true)
      .add("TaxReturnsFiled",StringType,true)
      .add("EstimatedPopulation",IntegerType,true)
      .add("TotalWages",IntegerType,true)
      .add("Notes",StringType,true)

// Read CSV file with custom schema
val df_with_schema = spark.read.format("csv")
      .option("header", "true")
      .schema(schema)
      .load("src/main/resources/zipcodes.csv")
df_with_schema.printSchema()
df_with_schema.show(false)

Read Multiple CSV files

Using the spark.read.csv() method, you can also read multiple CSV files, just pass all file names by separating commas as a path, for example : 

 
// Read multiple files
val df8 = spark.read.csv("path1,path2,path3")

Read all CSV files in a directory

We can read all CSV files from a directory into DataFrame just by passing the directory as a path to the csv() method.

 
// Read all files from directory
val df8 = spark.read.csv("Folder path")

Caching & Persistence

After reading the CSV file, you can choose to persist the DataFrame in memory or on disk using caching mechanisms. This enhances the performance of subsequent operations by avoiding redundant reads from the file. It is always a best practice to persist/cache the Spark DataFrame after reading from the CSV file.

 
// Caching & Persistence
val df6 = spark.read.option("inferSchema",true)
  .csv("src/main/resources/zipcodes.csv")

//  Cache DataFrame
val df7 = df6.cache()

Using SQL to Query

In Spark SQL, you can use the spark.sql() method to execute SQL queries on DataFrames. To query DataFrame using Spark SQL, you can follow these steps:


// Create Temporary table
df7.createOrReplaceTempView("ZipCodes")

// Query table
spark.sql("select RecordNumber, Zipcode, ZipcodeType, City, State from ZipCodes")
     .show()

Options while reading CSV file

Spark CSV dataset provides multiple options while reading such as setting delimiter, handling header and footer, customizing null values, and more. Below are some of the most important options explained with examples. These options are specified using the option() or options() method.

delimiter

delimiter option is used to specify the column delimiter of the CSV file. By default, it is comma (,) character, but can be set to pipe (|), tab, space, or any character using this option.

 
// Using delimiter option
val df2 = spark.read.options(Map("delimiter"->","))
  .csv("src/main/resources/zipcodes.csv")

inferSchema

The default value set to this option is false when setting to true it automatically infers column types based on the data. Note that, it requires reading the data one more time to infer the schema.

 
// Using inferSchema option
val df2 = spark.read.options(Map("inferSchema"->"true","delimiter"->","))
  .csv("src/main/resources/zipcodes.csv")

This option is used to read the first line of the CSV file as column names. By default the value of this option is false , and all column types are assumed to be a string.

 
// Using header
val df2 = spark.read.options(Map("inferSchema"->"true","delimiter"->",","header"->"true"))
  .csv("src/main/resources/zipcodes.csv")

quotes

When you have a column with a delimiter that used to split the columns, use quotes option to specify the quote character, by default it is ” and delimiters inside quotes are ignored. but using this option you can set any character.

nullValues

Using nullValues option you can specify the string in a CSV to consider as null. For example, if you want to consider a date column with a value “1900-01-01” to set null on DataFrame.

dateFormat

dateFormat option to be used to set the format of the input DateType and TimestampType columns. Supports all java.text.SimpleDateFormat formats.

charset

Pay attention to the character encoding of the CSV file, especially when dealing with internationalization. Spark’s CSV reader allows specifying encoding options to handle different character sets. By default, it uses ‘UTF-8‘ but can be set to other valid charset names.

Note: Besides the above options, Spark CSV dataset also supports many other options, please refer to this article for details.

Data Cleansing and Transformation

After reading the CSV file, incorporate necessary data cleansing and transformation steps in Spark to handle missing values, outliers, or any other data quality issues specific to your use case.

Once you have created DataFrame from the CSV file, you can apply transformations and actions to DataFrame. Please refer to the link for more details. 

Write Spark DataFrame to CSV file

Use the write() method of the Spark DataFrameWriter object to write Spark DataFrame to a CSV file. For detailed example refer to Writing Spark DataFrame to CSV File using Options.

 
// Write DataFrame to CSV file
df2.write.option("header","true")
 .csv("/tmp/spark_output/zipcodes")

Options

While writing a CSV file you can use several options. for example, header to output the DataFrame column names as header record and delimiter to specify the delimiter on the CSV output file.

 
// Using write options
df2.write.options("header",true)
 .csv("/tmp/spark_output/zipcodes")
 

Other options available quote,escape,nullValue,dateFormat,quoteMode .

Saving modes

Spark DataFrameWriter also has a method mode() to specify SaveMode; the argument to this method either takes below string or a constant from SaveMode class.

overwrite – mode is used to overwrite the existing file, alternatively, you can use SaveMode.Overwrite.

append – To add the data to the existing file, alternatively, you can use SaveMode.Append.

ignore – Ignores write operation when the file already exists, alternatively you can use SaveMode.Ignore.

errorifexists – This is a default option when the file already exists, it returns an error, alternatively, you can use SaveMode.ErrorIfExists.

 
// Import
import org.apache.spark.sql.SaveMode

// Using Saving modes
df2.write.mode(SaveMode.Overwrite).csv("/tmp/spark_output/zipcodes")

Conclusion:

In this tutorial, you have learned how to read a CSV file, multiple csv files and all files from a local folder into Spark DataFrame. Use options to change the default behavior and write CSV files back to DataFrame using different save options.

References:

Happy Learning !!

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

Leave a Reply

This Post Has 21 Comments

  1. jsp

    Hi,
    Where can i find the data files like zipcodes.csv

  2. K.T. Wong

    Hi,

    Great website, and extremely helpfull. My appreciation and gratitude 😉

    I want to ingest data from a folder containing csv files, but upon ingestion I want one column containing the filename of the data that is being ingested. Any ideas on how to accomplish this?

    Regards,
    K.T. Wong

    1. NNK

      Hi Wong, Thanks for your kind words.
      After reading a CSV file into DataFrame use the below statement to add a new column. please comment if this works.

      import org.apache.spark.sql.functions.lit
      df.withColumn(“fileName”, lit(“file-name”)).

  3. Bindhu

    Hello,

    I want to rename a part of file name in a folder. example: XXX_07_08 to XXX_0700008. Please guide

    1. NNK

      In order to rename file name you have to use hadoop file system API

  4. Anonymous

    Hi, nice article! I am wondering how to read from CSV file which has more than 22 columns and create a data frame using this data

  5. Anonymous

    hi there. I did the schema and got the appropriate types bu i cannot use the describe function.

    1. NNK

      May I know where are you using the describe function?

  6. Syahirah

    i get it can read multiple files, but may i know if the CSV files have the same attributes/column or not? is it possible to have multiple files such as CSV1 is personal data, CSV2 is the call usage, CSV3 is the data usage and combined it together to put in dataframe.

    1. NNK

      When you reading multiple CSV files from a folder, all CSV files should have the same attributes and columns. You can’t read different CSV files into the same DataFrame.

  7. Boris

    Thank you for the information and explanation!

    1. NNK

      Thanks Boris for reading.

  8. Suraj Nepram

    all the column values are coming as null when csv is read with schema
    val df_with_schema = spark.read.format(“csv”)
    .option(“header”, “true”)
    .schema(schema)
    .load(“zipcodes.csv”)
    df_with_schema.printSchema()
    df_with_schema.show(false)

    How do I fix this? I am using a window system. reading the csv without schema works fine

  9. Ashwin s

    Hi NNK,
    I’m getting an error while trying to read a csv file from github using above mentioned process.
    “Py4JJavaError: An error occurred while calling o100.csv.
    : java.io.IOException: No FileSystem for scheme: ……..”.
    Kindly help.Thanks in Advance.

    1. NNK

      Hi Dhinesh, By default Spark-CSV can’t handle it, however, you can do it by custom code as mentioned below.

      1) Read the CSV file using spark-csv as if there is no header
      2) use filter on DataFrame to filter out header row
      3) used the header row to define the columns of the DataFrame
      4) finally assign the columns to DataFrame

      Hope it gives you some idea.

  10. Divyesh

    Hello NNK,

    Huge fan of the website. I was trying to read multiple csv files located in different folders as:

    spark.read.csv([“path_1″,”path_2″,”path_3”], header = True)

    and was successfully able to do that. However, when running the program from spark-submit says that spark module not found.

    Can you help.

    1. NNK

      Thanks Divyesh for your comments. Could you please share your complete stack trace error? If you have already resolved the issue, please comment here, others would get benefit from your solution.

      Thanks again. Happy Learning !!