In this article, you will learn different ways to create DataFrame in PySpark (Spark with Python), for e.g creating DataFrame from an RDD, Array, TXT, CSV, JSON, files, Database e.t.c.
DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.-Databricks
- Create PySpark DataFrame from RDD
- Create PySpark DataFrame from array of data
- Creating PySpark DataFrame from CSV file
- Creating from TXT file
- Creating from JSON file
- Other sources (Avro, Parquet e.t.c)
First, let’s create the data and the columns for our DataFrame we going to create.
columns = ["language","users_count"] data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
1. Create PySpark DataFrame from RDD
One easy way to create PySpark DataFrame is from an existing RDD. first, let’s create an RDD from a collection Seq by calling parallelize() function from SparkContext . We would need this “rdd” object for all our examples below.
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate() rdd = spark.sparkContext.parallelize(data)
1.a) Using toDF() functions
Once we have an RDD, let’s use toDF() to create DataFrame in PySpark. By default, it creates column names as “_1” and “_2” as we have two columns for each row.
dfFromRDD1 = rdd.toDF() dfFromRDD1.printSchema()
root |-- _1: string (nullable = true) |-- _2: string (nullable = true)
toDF() has another signature which takes arguments for custom column names as shown below.
dfFromRDD1 = rdd.toDF(columns) dfFromRDD1.printSchema()
root |-- language: string (nullable = true) |-- users: string (nullable = true)
By default, the datatype of these columns infers to the type of data. We can change this behavior by supplying schema – where we can specify a column name, data type and nullable for each field/column.
1.b) Using PySpark createDataFrame() from PySparkSession
dfFromRDD2 = spark.createDataFrame(rdd).toDF(*columns)
2. Create PySpark DataFrame from List and Seq Collection
In this section, we will see several approaches to create PySpark DataFrame from an array. These examples would be similar to what we have seen in the above section with RDD, but we use the array data object instead of “rdd” object.
2.a) Using createDataFrame() from SparkSession
SparkSession is another way to create and it takes array object as an argument. and chain with toDF() to specify names to the columns.
dfFromData2 = spark.createDataFrame(data).toDF(*columns)
2.b) Using createDataFrame() with the Row type
createDataFrame() has another signature in PySpark which takes the collection of Row type and schema for column names as arguments. To use this first we need to convert our “data” object from array to array of Row.
rowData = map(lambda x: Row(*x), data) dfFromData3 = spark.createDataFrame(rowData,columns)
3. Creating PySpark DataFrame from CSV
Here, we will see how to create DataFrame from a CSV file.
df2 = spark.read.csv("/src/resources/file.csv")
4. Creating from text (TXT) file
Here, we will see how to create from a TXT file.
df2 = spark.read.text("/src/resources/file.txt")
5. Creating from JSON file
Here, we will see how to create DataFrame from a JSON file.
df2 = spark.read.json("/src/resources/file.json")
Similarly, we can create DataFrame in PySpark from most of the relational databases which I’ve not covered here and I will leave this to you to explore.
6. Other sources (Avro, Parquet, Kafka)
We can also create DataFrame from Avro, Parquet, HBase and reading data from Kafka which I’ve explained in the below articles, I would recommend reading these when you have time.
- Creating DataFrame from Parquet source
- Creating DataFrame from Avro source
- Creating DataFrame by Streaming data from Kafka
The complete code can be downloaded from GitHub
Happy Learning !!