Spark Create DataFrame with Examples

In Spark, createDataFrame() and toDF() methods are used to create a DataFrame manually, using these methods you can create a Spark DataFrame from already existing RDD, DataFrame, Dataset, List, Seq data objects, here I will explain these with Scala examples.

You can also create a DataFrame from different sources like Text, CSV, JSON, XML, Parquet, Avro, ORC, Binary files, RDBMS Tables, Hive, HBase, and many more.

DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

-Databricks

First, let’s import spark implicits as it needed for our examples ( for example when we want to use .toDF() function) and create the sample data.


// Import Data
import org.apache.spark.sql.SparkSession

// Create SparkSession and Prepare Data
val spark:SparkSession = SparkSession.builder()
   .master("local[1]").appName("SparkByExamples.com")
   .getOrCreate()

// Create data
val columns = Seq("language","users_count")
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))

1. Spark Create DataFrame from RDD

One easy way to create Spark DataFrame manually is from an existing RDD. First, let’s create an RDD from a collection Seq by calling parallelize().

I will be using this rdd object for all our examples below.


// Spark Create DataFrame from RDD
val rdd = spark.sparkContext.parallelize(data)

In PySpark, parallelize(data) is used to create an RDD (Resilient Distributed Dataset) from a local collection or iterable data. This function distributes the data across the Spark cluster, allowing parallel processing of the elements within the RDD. It is a fundamental operation for leveraging the distributed computing capabilities of Apache Spark.

spark create DataFrame

1.1 Using toDF() function

Use toDF() on RDD object to create a DataFrame in Spark. By default, it creates column names as “_1” and “_2” as we have two columns for each row.


// Create DataFrame from RDD
import spark.implicits._
val dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()
dfFromRDD1.show()

// Output:
//+------+------+
//|    _1|    _2|
//+------+------+
//|  Java| 20000|
//|Python|100000|
//| Scala|  3000|
//+------+------+

//root
// |-- _1: string (nullable = true)
// |-- _2: string (nullable = true)

Since RDD is schema-less without column names and data type, converting from RDD to DataFrame gives you default column names as _1, _2 and so on and data type as String. Use DataFrame printSchema() to print the schema to console.

toDF() has another signature to assign a column name, this takes a variable number of arguments for column names as shown below.


// Create DataFrame with custom column names
val dfFromRDD1 = rdd.toDF("language","users_count")
dfFromRDD1.show()
dfFromRDD1.printSchema()

// Output:
//+--------+-----------+
//|language|users_count|
//+--------+-----------+
//|    Java|      20000|
//|  Python|     100000|
//|   Scala|       3000|
//+--------+-----------+

//root
// |-- language: string (nullable = true)
// |-- users_count: string (nullable = true)

Remember, here, we just assigned column names. The data types are still Strings.

By default, the datatype of these columns is assigned to String. We can change this behavior by supplying schema – where we can specify a column name, data type and nullable for each field/column.

1.2 Using Spark createDataFrame() from SparkSession

Using createDataFrame() from SparkSession is another way to create. This signature also takes rdd object as an argument and chain with toDF() to specify column names.


// Using createDataFrame()
val dfFromRDD2 = spark.createDataFrame(rdd).toDF(columns:_*)

Here, toDF(columns: _*): assigns column names to the DataFrame using the provided columns list or sequence. The _* is a syntax to pass a variable number of arguments. It facilitates converting the elements in columns into separate arguments for the toDF method.

1.3 Using createDataFrame() with the Row type

createDataFrame() has another signature that takes the RDD[Row] type and schema for column names as arguments. To use this first, we need to convert our “rdd” object from RDD[T] to RDD[Row] and define a schema using StructType & StructField.


// Additional Imports
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.Row

// Create StructType Schema
val schema = StructType( Array(
                 StructField("language", StringType,true),
                 StructField("users", StringType,true)
             ))

// Use map() transformation to get Row type
val rowRDD = rdd.map(attributes => Row(attributes._1, attributes._2))
val dfFromRDD3 = spark.createDataFrame(rowRDD,schema)

Here, attributes._1 and attributes._2 represent the first and second components of each element in the original RDD. The transformation maps each element of rdd to a Row object with two fields, essentially converting a pair of attributes into a structured row.

2. Create Spark DataFrame from List and Seq Collection

In this section, we will see several approaches of how to create Spark DataFrame from collection Seq[T] or List[T]. These examples would be similar to what we have seen in the above section with RDD, but we use “data” object instead of “rdd” object.

2.1 Using toDF() on List or Seq collection

The toDF() on collection (Seq, List) object creates a Spark DataFrame. Make sure importing import spark.implicits._ to use toDF(). In Apache Spark using Scala, import spark.implicits._ enables implicit conversions to Spark’s Dataset and DataFrame API.


// Import implicits
import spark.implicits._

// Create DF from data object
val dfFromData1 = data.toDF() 

Here, val dfFromData1 = data.toDF() creates a DataFrame (dfFromData1) from a local collection or Seq data. The toDF() method converts the collection into a DataFrame, automatically assigning default column names. The import statement is necessary for the implicit conversion to work.

2.2 Using createDataFrame() from SparkSession

Calling createDataFrame() from SparkSession is another way to create a Spark DataFrame, and it takes collection object (Seq or List) as an argument. and chain with toDF() to specify column names.


// From Data (USING createDataFrame)
var dfFromData2 = spark.createDataFrame(data).toDF(columns:_*)

Here, toDF(columns: _*): assigns column names to the DataFrame using the provided columns list or sequence. The _* is a syntax to pass a variable number of arguments. It facilitates converting the elements in columns into separate arguments for the toDF method.

2.3 Using createDataFrame() with the Row type

createDataFrame() has another signature in Spark that takes the util.List of Row type and schema for column names as arguments. To use this first, we need to import scala.collection.JavaConversions._


// Import
import scala.collection.JavaConversions._

// From Data (USING createDataFrame and Adding schema using StructType)
val rowData= Seq(Row("Java", "20000"), 
               Row("Python", "100000"), 
               Row("Scala", "3000"))
var dfFromData3 = spark.createDataFrame(rowData,schema)

3. Create Spark DataFrame from CSV

In all the above examples, you have learned how to create DataFrame from RDD and data collection objects. In real-time, these are less used. Hence, in this and the following sections, you will learn how to create DataFrame from data sources like CSV, text, JSON, Avro e.t.c

Spark, by default, provides an API to read delimiter files like comma, pipe, tab-separated files, and it also provides several options on handling with a header, without header, double quotes, data types, etc.

For a detailed example, refer to create DataFrame from a CSV file.


// Create Spark DataFrame from CSV
val df2 = spark.read.csv("/resources/file.csv")

4. Creating from text (TXT) file

Use spark.read.text() to read a text file and create a DataFrame from it.


// Creating from text (TXT) file
val df2 = spark.read
    .text("/resources/file.txt")

5. Creating from JSON file

Here, will see how to create from a JSON file.


val df2 = spark.read
    .json("/resources/file.json")

6. Creating from an XML file

To create DataFrame by parse XML, we should use DataSource "com.databricks.spark.xml" spark-xml api from Databricks.


// Creating from an XML file
<dependency>
     <groupId>com.databricks</groupId>
     <artifactId>spark-xml_2.11</artifactId>
     <version>0.6.0</version>
 </dependency>

val df = spark.read
      .format("com.databricks.spark.xml")
      .option("rowTag", "person")
      .xml("src/main/resources/persons.xml")

7. Creating from Hive


// Creating from Hive
val hiveContext = new org.apache.spark.sql.hive.HiveContext(spark.sparkContext)
val hiveDF = hiveContext.sql(“select * from emp”)

8. Spark Create DataFrame from RDBMS Database

8.1 From Mysql table

Make sure you have MySQL library as a dependency in your pom.xml file or MySQL jars in your classpath.


// From Mysql table
val df_mysql = spark.read.format(“jdbc”)
   .option(“url”, “jdbc:mysql://localhost:port/db”)
   .option(“driver”, “com.mysql.jdbc.Driver”)
   .option(“dbtable”, “tablename”) 
   .option(“user”, “user”) 
   .option(“password”, “password”) 
   .load()

8.2 From DB2 table

Make sure you have DB2 library as a dependency in your pom.xml file or DB2 jars in your classpath.


// From DB2 table
val df_db2 = spark.read.format(“jdbc”)
   .option(“url”, “jdbc:db2://localhost:50000/dbname”)
   .option(“driver”, “com.ibm.db2.jcc.DB2Driver”)
   .option(“dbtable”, “tablename”) 
   .option(“user”, “user”) 
   .option(“password”, “password”) 
   .load()

Similarly, we can create DataFrame in Spark from most of the relational databases which I’ve not covered here and I will leave this to you to explore.

9. Create DataFrame from HBase table

To create Spark DataFrame from the HBase table, we should use DataSource defined in Spark HBase connectors. for example use DataSource “org.apache.spark.sql.execution.datasources.hbase” from Hortonworks or use “org.apache.hadoop.hbase.spark” from spark HBase connector.


// Create DataFrame from HBase table
val hbaseDF = sparkSession.read
   .options(Map(HBaseTableCatalog.tableCatalog -> catalog))
   .format("org.apache.spark.sql.execution.datasources.hbase")
   .load()

A detailed example is explained in Generating DataFrame from HBase table

10. Other sources (Avro, Parquet, Kafka)

We can also create Spark DataFrame from Avro, Parquet, HBase and reading data from Kafka which I’ve explained in the below articles, I would recommend reading these when you have time.

The complete code can be downloaded from GitHub

Conclusion

In this article, you have learned different ways to create Spark DataFrame, for example: 1. From RDD: Convert an RDD using spark.createDataFrame(rdd). 2. From Local Collections: Use spark.createDataFrame(data: Seq[_], schema: StructType). 3. From Existing DataFrames: Employ operations like union or join on existing DataFrames. 4. From External Sources: Load data from sources like CSV, Parquet, or JSON files using spark.read.

Happy Learning !!

Naveen (NNK)

Naveen (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ @ LinkedIn

Leave a Reply

This Post Has 7 Comments

  1. Anonymous

    Excellent

  2. Anonymous

    Excellent

  3. Soni

    I am new to Spark but have worked a lot in SQL, this site is a life saver, thanks a lot…everything at one place to get hands on, very very thankful to you sir.

    1. NNK

      Thanks Sono for your comments. Since you worked a lot on SQL, wondering if you would like to share your knowledge with the community by writing guest articles.

  4. RDD

    really nice explanation
    spark data frame examples

  5. Rach

    hi,
    In creating df from hive: i hive we must have multiple data bases, so how can we connected to the particular database?

    in sql we are adding the option to connect to the DB but in hive how can we set the specific database to work with tables?