PySpark Read CSV file into DataFrame

PySpark provides csv("path") on DataFrameReader to read a CSV file into PySpark DataFrame and dataframeObj.write.csv("path") to save or write to the CSV file. In this tutorial, you will learn how to read a single file, multiple files, all files from a local directory into DataFrame, applying some transformations, and finally writing DataFrame back to CSV file using PySpark example.

PySpark supports reading a CSV file with a pipe, comma, tab, space, or any other delimiter/separator files.

Note: PySpark out of the box supports reading files in CSV, JSON, and many more file formats into PySpark DataFrame. 

Table of contents:

1. PySpark Read CSV File into DataFrame

Using csv("path") or format("csv").load("path") of DataFrameReader, you can read a CSV file into a PySpark DataFrame, These methods take a file path to read from as an argument. When you use format("csv") method, you can also specify the Data sources by their fully qualified name, but for built-in sources, you can simply use their short names (csv,jsonparquetjdbctext e.t.c). 

Refer dataset zipcodes.csv at GitHub

 
val spark = SparkSession.builder().master("local[1]")
          .appName("SparkByExamples.com")
          .getOrCreate()
df = spark.read.csv("/tmp/resources/zipcodes.csv")
df.printSchema()

Using fully qualified data source name, you can alternatively do the following.

 
df = spark.read.format("csv")
                  .load("/tmp/resources/zipcodes.csv")
//       or
df = spark.read.format("org.apache.spark.sql.csv")
                  .load("/tmp/resources/zipcodes.csv")
df.printSchema()

This example reads the data into DataFrame columns "_c0" for the first column and "_c1" for the second and so on. and by default data type for all these columns is treated as String.

 
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)

1.1 Using Header Record For Column Names

If you have a header with column names on your input file, you need to explicitly specify True for header option using <a href="#header">option("header",True)</a> not mentioning this, the API treats header as a data record.

 
df2 = spark.read.option("header",True) \
     .csv("/tmp/resources/zipcodes.csv")

As mentioned earlier, PySpark reads all columns as a string (StringType) by default. I will explain in later sections on how to read the schema (inferschema) from the header record and derive the column type based on the data.

1.2 Read Multiple CSV Files

Using the read.csv() method you can also read multiple csv files, just pass all file names by separating comma as a path, for example : 

 
df = spark.read.csv("path1,path2,path3")

1.3 Read all CSV Files in a Directory

 We can read all CSV files from a directory into DataFrame just by passing directory as a path to the csv() method.

 
df = spark.read.csv("Folder path")

2. Options While Reading CSV File

PySpark CSV dataset provides multiple options to work with CSV files. Below are some of the most important options explained with examples.

You can either use chaining option(self, key, value) to use multiple options or use alternate options(self, **options) method.

2.1 delimiter

delimiter option is used to specify the column delimiter of the CSV file. By default, it is comma (,) character, but can be set to any character like pipe(|), tab (\t), space using this option.

 
df3 = spark.read.options(delimiter=',') \
  .csv("C:/apps/sparkbyexamples/src/pyspark-examples/resources/zipcodes.csv")

2.2 inferSchema

The default value set to this option is False when setting to true it automatically infers column types based on the data. Note that, it requires reading the data one more time to infer the schema.

 
df4 = spark.read.options(inferSchema='True',delimiter=',') \
  .csv("src/main/resources/zipcodes.csv")

Alternatively you can also write this by chaining option() method.

 
df4 = spark.read.option("inferSchema",True) \
                .option("delimiter",",") \
  .csv("src/main/resources/zipcodes.csv")

This option is used to read the first line of the CSV file as column names. By default the value of this option is False , and all column types are assumed to be a string.

 
df3 = spark.read.options(header='True', inferSchema='True', delimiter=',') \
  .csv("/tmp/resources/zipcodes.csv")

2.4 quotes

When you have a column with a delimiter that used to split the columns, use quotes option to specify the quote character, by default it is ” and delimiters inside quotes are ignored. but using this option you can set any character.

2.5 nullValues

Using nullValues option you can specify the string in a CSV to consider as null. For example, if you want to consider a date column with a value "1900-01-01" set null on DataFrame.

2.6 dateFormat

dateFormat option to used to set the format of the input DateType and TimestampType columns. Supports all java.text.SimpleDateFormat formats.

Note: Besides the above options, PySpark CSV API also supports many other options, please refer to this article for details.

3. Reading CSV files with a user-specified custom schema

If you know the schema of the file ahead and do not want to use the inferSchema option for column names and types, use user-defined custom column names and type using schema option.

 
schema = StructType() \
      .add("RecordNumber",IntegerType(),True) \
      .add("Zipcode",IntegerType(),True) \
      .add("ZipCodeType",StringType(),True) \
      .add("City",StringType(),True) \
      .add("State",StringType(),True) \
      .add("LocationType",StringType(),True) \
      .add("Lat",DoubleType(),True) \
      .add("Long",DoubleType(),True) \
      .add("Xaxis",IntegerType(),True) \
      .add("Yaxis",DoubleType(),True) \
      .add("Zaxis",DoubleType(),True) \
      .add("WorldRegion",StringType(),True) \
      .add("Country",StringType(),True) \
      .add("LocationText",StringType(),True) \
      .add("Location",StringType(),True) \
      .add("Decommisioned",BooleanType(),True) \
      .add("TaxReturnsFiled",StringType(),True) \
      .add("EstimatedPopulation",IntegerType(),True) \
      .add("TotalWages",IntegerType(),True) \
      .add("Notes",StringType(),True)
      
df_with_schema = spark.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load("/tmp/resources/zipcodes.csv")

4. Applying DataFrame transformations

Once you have created DataFrame from the CSV file, you can apply all transformation and actions DataFrame support. Please refer to the link for more details. 

5. Write PySpark DataFrame to CSV file

Use the write() method of the PySpark DataFrameWriter object to write PySpark DataFrame to a CSV file.

 
df.write.option("header",True) \
 .csv("/tmp/spark_output/zipcodes")

5.1 Options

While writing a CSV file you can use several options. for example, header to output the DataFrame column names as header record and delimiter to specify the delimiter on the CSV output file.

 
df2.write.options(header='True', delimiter=',') \
 .csv("/tmp/spark_output/zipcodes")
 

Other options available quote,escape,nullValue,dateFormat,quoteMode .

5.2 Saving modes

PySpark DataFrameWriter also has a method mode() to specify saving mode.

overwrite – mode is used to overwrite the existing file.

append – To add the data to the existing file.

ignore – Ignores write operation when the file already exists.

error – This is a default option when the file already exists, it returns an error.

 
df2.write.mode('overwrite').csv("/tmp/spark_output/zipcodes")
//you can also use this
df2.write.format("csv").mode('overwrite').save("/tmp/spark_output/zipcodes")

6. PySpark Read CSV Complete Example

 
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

df = spark.read.csv("/tmp/resources/zipcodes.csv")

df.printSchema()

df2 = spark.read.option("header",True) \
     .csv("/tmp/resources/zipcodes.csv")
df2.printSchema()
   
df3 = spark.read.options(header='True', delimiter=',') \
  .csv("/tmp/resources/zipcodes.csv")
df3.printSchema()

schema = StructType() \
      .add("RecordNumber",IntegerType(),True) \
      .add("Zipcode",IntegerType(),True) \
      .add("ZipCodeType",StringType(),True) \
      .add("City",StringType(),True) \
      .add("State",StringType(),True) \
      .add("LocationType",StringType(),True) \
      .add("Lat",DoubleType(),True) \
      .add("Long",DoubleType(),True) \
      .add("Xaxis",IntegerType(),True) \
      .add("Yaxis",DoubleType(),True) \
      .add("Zaxis",DoubleType(),True) \
      .add("WorldRegion",StringType(),True) \
      .add("Country",StringType(),True) \
      .add("LocationText",StringType(),True) \
      .add("Location",StringType(),True) \
      .add("Decommisioned",BooleanType(),True) \
      .add("TaxReturnsFiled",StringType(),True) \
      .add("EstimatedPopulation",IntegerType(),True) \
      .add("TotalWages",IntegerType(),True) \
      .add("Notes",StringType(),True)
      
df_with_schema = spark.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load(/tmp/resources/zipcodes.csv")
df_with_schema.printSchema()

df2.write.option("header",True) \
 .csv("/tmp/spark_output/zipcodes123")

7. Conclusion:

In this tutorial, you have learned how to read a CSV file, multiple CSV files and all files from a local folder into PySpark DataFrame, using multiple options to change the default behavior and write CSV files back to DataFrame using different save options.

Happy Learning !!

References:

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

Leave a Reply

This Post Has 8 Comments

  1. Anonymous

    where is zipcode CSV file ?

  2. Anonymous

    Really very helpful pyspark example..Thanks for the details!!

  3. Anonymous

    Very much helpful!! Thank you for the article!!

    1. NNK

      Thank you!!

  4. Anonymous

    Thank you so much for this article 🙂

    1. NNK

      Glad you like it.

  5. Anonymous

    Thanks for the example. could you please explain how to define/initialise the “spark” in the above example (e.g. spark.read.csv)?

PySpark Read CSV file into DataFrame
Photo by Markus Spiske on Unsplash