Spark Create DataFrame with Examples

  • Post author:
  • Post category:Apache Spark

In Spark, DataFrame can be create using createDataFrame() and toDF() methods, this post explains different approaches of creating DataFrame in Spark like creating it from an RDD, List, Seq, TXT, CSV, JSON, XML files, Database e.t.c using Scala example.

DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

-Databricks

First, let’s import spark implicits as it needed for our examples ( for example when we want to use .toDF() function) and create the data for our examples.


import spark.implicits._
val columns = Seq("language","users_count")
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))

1. Create Spark DataFrame from RDD

One easy way to create Spark DataFrame is from an existing RDD. first, let’s create an RDD from a collection Seq by calling parallelize(). I wil use this “rdd” object for all our examples below.


val rdd = spark.sparkContext.parallelize(data)

1.a) Using toDF() functions

Once we have an RDD, let’s use toDF() to create DataFrame in Spark. By default, it creates column names as “_1” and “_2” as we have two columns for each row.


val dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()

Since we have not given the names to the DataFrame, by default it create with _1 and _2 and so on for column names.


root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)

toDF() has another signature that takes arguments for column names as shown below.


val dfFromRDD1 = rdd.toDF("language","users_count")
dfFromRDD1.printSchema()

Here, we are passing column names toDF() method.


root
 |-- language: string (nullable = true)
 |-- users: string (nullable = true)

By default, the datatype of these columns infers to the type of data. We can change this behavior by supplying schema – where we can specify a column name, data type and nullable for each field/column.

1.b) Using Spark createDataFrame() from SparkSession

Using createDataFrame() from SparkSession is another way to create and it takes rdd object as an argument. and chain with toDF() to specify names to the columns.


val dfFromRDD2 = spark.createDataFrame(rdd).toDF(columns:_*)

1.c) Using createDataFrame() with the Row type

createDataFrame() has another signature which takes the RDD[Row] type and schema for column names as arguments. To use this first we need to convert our “rdd” object from RDD[T] to RDD[Row].


val schema = StructType( Array(StructField("language", StringType,true),
                             StructField("language", StringType,true)))
val rowRDD = rdd.map(attributes => Row(attributes._1, attributes._2))
val dfFromRDD3 = spark.createDataFrame(rowRDD,schema)

2. Create Spark DataFrame from List and Seq Collection

In this section, we will see several approaches to create Spark DataFrame from collection Seq[T] or List[T]. These examples would be similar to what we have seen in the above section with RDD, but we use “data” object instead of “rdd” object.

2.a) Using toDF() on List or Seq collection

toDF() on collection (Seq, List) object creates a DataFrame. make sure importing import spark.implicits._ to use toDF()


val dfFromData1 = data.toDF() 

2.b) Using createDataFrame() from SparkSession

Calling createDataFrame() from SparkSession is another way to create and it takes collection object (Seq or List) as an argument. and chain with toDF() to specify names to the columns.


//From Data (USING createDataFrame)
var dfFromData2 = spark.createDataFrame(data).toDF(columns:_*)

2.c) Using createDataFrame() with the Row type

createDataFrame() has another signature in Spark which takes the collection of Row type and schema for column names as arguments. To use this first we need to convert our “data” object from Seq[T] to Seq[Row].


//From Data (USING createDataFrame and Adding schema using StructType)
val data = Seq(Row("Java", "20000"), 
               Row("Python", "100000"), 
               Row("Scala", "3000"))
var dfFromData3 = spark.createDataFrame(rowData,schema)

3. Creating Spark DataFrame from CSV

Here, will see how to create from a CSV file.


val df2 = spark.read.csv("/src/resources/file.csv")

4. Creating from text (TXT) file

Here, will see how to create from a TXT file.


val df2 = spark.read
.text("/src/resources/file.txt")

5. Creating from JSON file

Here, will see how to create from a JSON file.


val df2 = spark.read
.json("/src/resources/file.json")

6. Creating from an XML file

To create DataFrame by parse XML, we should use DataSource "com.databricks.spark.xml" spark-xml api from Databricks.

<dependency>
     <groupId>com.databricks</groupId>
     <artifactId>spark-xml_2.11</artifactId>
     <version>0.6.0</version>
 </dependency>

val df = spark.read
      .format("com.databricks.spark.xml")
      .option("rowTag", "person")
      .xml("src/main/resources/persons.xml")

7. Creating from Hive


val hiveContext = new org.apache.spark.sql.hive.HiveContext(spark.sparkContext)
val hiveDF = hiveContext.sql(“select * from emp”)

8. Creating from the Database table (RDBMS)

8.a) From Mysql table

Make sure you have MySQL library as a dependency in your pom.xml file or MySQL jars in your classpath.


val df_mysql = spark.read.format(“jdbc”)
   .option(“url”, “jdbc:mysql://localhost:port/db”)
   .option(“driver”, “com.mysql.jdbc.Driver”)
   .option(“dbtable”, “tablename”) 
   .option(“user”, “user”) 
   .option(“password”, “password”) 
   .load()

8.b) From DB2 table

Make sure you have DB2 library as a dependency in your pom.xml file or DB2 jars in your classpath.


val df_db2 = spark.read.format(“jdbc”)
   .option(“url”, “jdbc:db2://localhost:50000/dbname”)
   .option(“driver”, “com.ibm.db2.jcc.DB2Driver”)
   .option(“dbtable”, “tablename”) 
   .option(“user”, “user”) 
   .option(“password”, “password”) 
   .load()

Similarly, we can create DataFrame in Spark from most of the relational databases which I’ve not covered here and I will leave this to you to explore.

9. Create DataFrame from HBase table

To create Spark DataFrame from the HBase table, we should use DataSource defined in Spark HBase connectors. for example use DataSource “org.apache.spark.sql.execution.datasources.hbase” from Hortonworks or use “org.apache.hadoop.hbase.spark“from spark HBase connector.


    val hbaseDF = sparkSession.read
      .options(Map(HBaseTableCatalog.tableCatalog -> catalog))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .load()

Detail example explained at Generating DataFrame from HBase table

10. Other sources (Avro, Parquet, Kafka)

We can also create DataFrame from Avro, Parquet, HBase and reading data from Kafka which I’ve explained in the below articles, I would recommend reading these when you have time.

The complete code can be downloaded from GitHub

Happy Learning !!

NNK

SparkByExamples.com is a BigData and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment using Scala and Maven.

This Post Has One Comment

  1. Rach

    hi,
    In creating df from hive: i hive we must have multiple data bases, so how can we connected to the particular database?

    in sql we are adding the option to connect to the DB but in hive how can we set the specific database to work with tables?

Leave a Reply