PySpark provides csv("path")
on DataFrameReader to read a CSV file into PySpark DataFrame and dataframeObj.write.csv("path")
to save or write to the CSV file. In this tutorial, you will learn how to read a single file, multiple files, all files from a local directory into DataFrame, applying some transformations, and finally writing DataFrame back to CSV file using PySpark example.
PySpark supports reading a CSV file with a pipe, comma, tab, space, or any other delimiter/separator files.
Note: PySpark out of the box supports reading files in CSV, JSON, and many more file formats into PySpark DataFrame.
Table of contents:
- PySpark Read CSV file into DataFrame
- Options while reading CSV file
- Read CSV files with a user-specified schema
- Applying DataFrame transformations
- Write DataFrame to CSV file
1. PySpark Read CSV File into DataFrame
Using csv("path")
or format("csv").load("path")
of DataFrameReader, you can read a CSV file into a PySpark DataFrame, These methods take a file path to read from as an argument. When you use format("csv")
method, you can also specify the Data sources by their fully qualified name, but for built-in sources, you can simply use their short names (csv
,json
, parquet
, jdbc
, text
e.t.c).
Refer dataset zipcodes.csv at GitHub
spark = SparkSession.builder().master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
df = spark.read.csv("/tmp/resources/zipcodes.csv")
df.printSchema()
Using fully qualified data source name, you can alternatively do the following.
df = spark.read.format("csv")
.load("/tmp/resources/zipcodes.csv")
// or
df = spark.read.format("org.apache.spark.sql.csv")
.load("/tmp/resources/zipcodes.csv")
df.printSchema()
This example reads the data into DataFrame columns "_c0"
for the first column and "_c1"
for the second and so on. and by default data type for all these columns is treated as String.
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
1.1 Using Header Record For Column Names
If you have a header with column names on your input file, you need to explicitly specify True
for header option using option("header",True)
not mentioning this, the API treats header as a data record.
df2 = spark.read.option("header",True) \
.csv("/tmp/resources/zipcodes.csv")
As mentioned earlier, PySpark reads all columns as a string (StringType) by default. I will explain in later sections on how to read the schema (inferschema
) from the header record and derive the column type based on the data.
1.2 Read Multiple CSV Files
Using the read.csv()
method you can also read multiple csv files, just pass all file names by separating comma as a path, for example :
df = spark.read.csv("path1,path2,path3")
1.3 Read all CSV Files in a Directory
We can read all CSV files from a directory into DataFrame just by passing directory as a path to the csv()
method.
df = spark.read.csv("Folder path")
2. Options While Reading CSV File
PySpark CSV dataset provides multiple options to work with CSV files. Below are some of the most important options explained with examples.
You can either use chaining option(self, key, value)
to use multiple options or use alternate options(self, **options)
method.
2.1 delimiter
delimiter
option is used to specify the column delimiter of the CSV file. By default, it is comma (,) character, but can be set to any character like pipe(|), tab (\t), space using this option.
df3 = spark.read.options(delimiter=',') \
.csv("C:/apps/sparkbyexamples/src/pyspark-examples/resources/zipcodes.csv")
2.2 inferSchema
The default value set to this option is False
when setting to true
it automatically infers column types based on the data. Note that, it requires reading the data one more time to infer the schema.
df4 = spark.read.options(inferSchema='True',delimiter=',') \
.csv("src/main/resources/zipcodes.csv")
Alternatively you can also write this by chaining option()
method.
df4 = spark.read.option("inferSchema",True) \
.option("delimiter",",") \
.csv("src/main/resources/zipcodes.csv")
2.3 header
This option is used to read the first line of the CSV file as column names. By default the value of this option is False
, and all column types are assumed to be a string.
df3 = spark.read.options(header='True', inferSchema='True', delimiter=',') \
.csv("/tmp/resources/zipcodes.csv")
2.4 quotes
When you have a column with a delimiter that used to split the columns, use quotes
option to specify the quote character, by default it is ” and delimiters inside quotes are ignored. but using this option you can set any character.
2.5 nullValues
Using nullValues
option you can specify the string in a CSV to consider as null. For example, if you want to consider a date column with a value "1900-01-01"
set null on DataFrame.
2.6 dateFormat
dateFormat
option to used to set the format of the input DateType and TimestampType columns. Supports all java.text.SimpleDateFormat
formats.
Note: Besides the above options, PySpark CSV API also supports many other options, please refer to this article for details.
3. Reading CSV files with a user-specified custom schema
If you know the schema of the file ahead and do not want to use the inferSchema
option for column names and types, use user-defined custom column names and type using schema
option.
schema = StructType() \
.add("RecordNumber",IntegerType(),True) \
.add("Zipcode",IntegerType(),True) \
.add("ZipCodeType",StringType(),True) \
.add("City",StringType(),True) \
.add("State",StringType(),True) \
.add("LocationType",StringType(),True) \
.add("Lat",DoubleType(),True) \
.add("Long",DoubleType(),True) \
.add("Xaxis",IntegerType(),True) \
.add("Yaxis",DoubleType(),True) \
.add("Zaxis",DoubleType(),True) \
.add("WorldRegion",StringType(),True) \
.add("Country",StringType(),True) \
.add("LocationText",StringType(),True) \
.add("Location",StringType(),True) \
.add("Decommisioned",BooleanType(),True) \
.add("TaxReturnsFiled",StringType(),True) \
.add("EstimatedPopulation",IntegerType(),True) \
.add("TotalWages",IntegerType(),True) \
.add("Notes",StringType(),True)
df_with_schema = spark.read.format("csv") \
.option("header", True) \
.schema(schema) \
.load("/tmp/resources/zipcodes.csv")
4. Applying DataFrame transformations
Once you have created DataFrame from the CSV file, you can apply all transformation and actions DataFrame support. Please refer to the link for more details.
5. Write PySpark DataFrame to CSV file
Use the write()
method of the PySpark DataFrameWriter object to write PySpark DataFrame to a CSV file.
df.write.option("header",True) \
.csv("/tmp/spark_output/zipcodes")
5.1 Options
While writing a CSV file you can use several options. for example, header
to output the DataFrame column names as header record and delimiter
to specify the delimiter on the CSV output file.
df2.write.options(header='True', delimiter=',') \
.csv("/tmp/spark_output/zipcodes")
Other options available quote
,escape
,nullValue
,dateFormat
,quoteMode
.
5.2 Saving modes
PySpark DataFrameWriter also has a method mode() to specify saving mode.
overwrite
– mode is used to overwrite the existing file.
append
– To add the data to the existing file.
ignore
– Ignores write operation when the file already exists.
error
– This is a default option when the file already exists, it returns an error.
df2.write.mode('overwrite').csv("/tmp/spark_output/zipcodes")
//you can also use this
df2.write.format("csv").mode('overwrite').save("/tmp/spark_output/zipcodes")
6. PySpark Read CSV Complete Example
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
df = spark.read.csv("/tmp/resources/zipcodes.csv")
df.printSchema()
df2 = spark.read.option("header",True) \
.csv("/tmp/resources/zipcodes.csv")
df2.printSchema()
df3 = spark.read.options(header='True', delimiter=',') \
.csv("/tmp/resources/zipcodes.csv")
df3.printSchema()
schema = StructType() \
.add("RecordNumber",IntegerType(),True) \
.add("Zipcode",IntegerType(),True) \
.add("ZipCodeType",StringType(),True) \
.add("City",StringType(),True) \
.add("State",StringType(),True) \
.add("LocationType",StringType(),True) \
.add("Lat",DoubleType(),True) \
.add("Long",DoubleType(),True) \
.add("Xaxis",IntegerType(),True) \
.add("Yaxis",DoubleType(),True) \
.add("Zaxis",DoubleType(),True) \
.add("WorldRegion",StringType(),True) \
.add("Country",StringType(),True) \
.add("LocationText",StringType(),True) \
.add("Location",StringType(),True) \
.add("Decommisioned",BooleanType(),True) \
.add("TaxReturnsFiled",StringType(),True) \
.add("EstimatedPopulation",IntegerType(),True) \
.add("TotalWages",IntegerType(),True) \
.add("Notes",StringType(),True)
df_with_schema = spark.read.format("csv") \
.option("header", True) \
.schema(schema) \
.load(/tmp/resources/zipcodes.csv")
df_with_schema.printSchema()
df2.write.option("header",True) \
.csv("/tmp/spark_output/zipcodes123")
7. Conclusion:
In this tutorial, you have learned how to read a CSV file, multiple CSV files and all files from a local folder into PySpark DataFrame, using multiple options to change the default behavior and write CSV files back to DataFrame using different save options.
Happy Learning !!
Thanks for the example. could you please explain how to define/initialise the “spark” in the above example (e.g. spark.read.csv)?
Thank you so much for this article 🙂
Glad you like it.
Very much helpful!! Thank you for the article!!
Thank you!!
Really very helpful pyspark example..Thanks for the details!!
where is zipcode CSV file ?
You can find it at zipcodes.csv @ GitHub