In Spark, createDataFrame() and toDF() methods are used to create a DataFrame manually, using these methods you can create a Spark DataFrame from already existing RDD, DataFrame, Dataset, List, Seq data objects, here I will explain these with Scala examples.
You can also create a DataFrame from different sources like Text, CSV, JSON, XML, Parquet, Avro, ORC, Binary files, RDBMS Tables, Hive, HBase, and many more.
DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.
-Databricks
- Spark Create DataFrame from RDD
- Create DataFrame from List and Seq collection
- Creating Spark DataFrame from CSV file
- Creating from TXT file
- Creating from JSON file
- Creating from an XML file
- Creating from HIVE
- Creating from RDBMS Database table
- Creating from HBase table
- Other sources (Avro, Parquet e.t.c)
First, let’s import spark implicits as it needed for our examples ( for example when we want to use .toDF() function
) and create the sample data.
val spark:SparkSession = SparkSession.builder()
.master("local[1]").appName("SparkByExamples.com")
.getOrCreate()
import spark.implicits._
val columns = Seq("language","users_count")
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
1. Spark Create DataFrame from RDD
One easy way to create Spark DataFrame manually is from an existing RDD. first, let’s create an RDD from a collection Seq by calling parallelize().
I will be using this rdd object for all our examples below.
// Spark Create DataFrame from RDD
val rdd = spark.sparkContext.parallelize(data)
1.1 Using toDF() function
Once we have an RDD, let’s use toDF()
to create DataFrame in Spark. By default, it creates column names as “_1” and “_2” as we have two columns for each row.
val dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()
Since RDD is schema-less without column names and data type, converting from RDD to DataFrame gives you default column names as _1
, _2
and so on and data type as String. Use DataFrame printSchema() to print the schema to console.
root
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)
toDF()
has another signature to assign a column name, this takes a variable number of arguments for column names as shown below.
val dfFromRDD1 = rdd.toDF("language","users_count")
dfFromRDD1.printSchema()
yields below output. Remember here we just assigned column names still it takes all data types as Strings.
root
|-- language: string (nullable = true)
|-- users: string (nullable = true)
By default, the datatype of these columns assigns to String. We can change this behavior by supplying schema – where we can specify a column name, data type and nullable for each field/column.
1.2 Using Spark createDataFrame() from SparkSession
Using createDataFrame()
from SparkSession is another way to create and it takes rdd object as an argument. and chain with toDF() to specify names to the columns.
val dfFromRDD2 = spark.createDataFrame(rdd).toDF(columns:_*)
1.3 Using createDataFrame() with the Row type
createDataFrame()
has another signature which takes the RDD[Row] type and schema for column names as arguments. To use this first we need to convert our “rdd” object from RDD[T]
to RDD[Row]
and define a schema using StructType & StructField.
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.Row
val schema = StructType( Array(
StructField("language", StringType,true),
StructField("users", StringType,true)
))
val rowRDD = rdd.map(attributes => Row(attributes._1, attributes._2))
val dfFromRDD3 = spark.createDataFrame(rowRDD,schema)
2. Create Spark DataFrame from List and Seq Collection
In this section, we will see several approaches to create Spark DataFrame from collection Seq[T]
or List[T]
. These examples would be similar to what we have seen in the above section with RDD, but we use “data” object instead of “rdd” object.
2.1 Using toDF() on List or Seq collection
toDF()
on collection (Seq, List) object creates a DataFrame. make sure importing import spark.implicits._
to use toDF()
import spark.implicits._
val dfFromData1 = data.toDF()
2.2 Using createDataFrame() from SparkSession
Calling createDataFrame()
from SparkSession
is another way to create and it takes collection object (Seq or List) as an argument. and chain with toDF() to specify names to the columns.
// From Data (USING createDataFrame)
var dfFromData2 = spark.createDataFrame(data).toDF(columns:_*)
2.3 Using createDataFrame() with the Row type
createDataFrame()
has another signature in Spark which takes the util.List
of Row type and schema for column names as arguments. To use this first we need to import scala.collection.JavaConversions._
import scala.collection.JavaConversions._
// From Data (USING createDataFrame and Adding schema using StructType)
val rowData= Seq(Row("Java", "20000"),
Row("Python", "100000"),
Row("Scala", "3000"))
var dfFromData3 = spark.createDataFrame(rowData,schema)
3. Create Spark DataFrame from CSV
In all the above examples, you have learned Spark to create DataFrame from RDD and data collection objects. In real-time these are less used, In this and following sections, you will learn how to create DataFrame from data sources like CSV, text, JSON, Avro e.t.c
Spark by default provides an API to read a delimiter files like comma, pipe, tab separated files and it also provides several options on handling with header, with out header, double quotes, data types e.t.c.
For detailed example, refer to create DataFrame from a CSV file.
// Create Spark DataFrame from CSV
val df2 = spark.read.csv("/src/resources/file.csv")
4. Creating from text (TXT) file
Here, will see how to create from a TXT file.
// Creating from text (TXT) file
val df2 = spark.read
.text("/src/resources/file.txt")
5. Creating from JSON file
Here, will see how to create from a JSON file.
val df2 = spark.read
.json("/src/resources/file.json")
6. Creating from an XML file
To create DataFrame by parse XML, we should use DataSource "com.databricks.spark.xml"
spark-xml api from Databricks.
// Creating from an XML file
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-xml_2.11</artifactId>
<version>0.6.0</version>
</dependency>
val df = spark.read
.format("com.databricks.spark.xml")
.option("rowTag", "person")
.xml("src/main/resources/persons.xml")
7. Creating from Hive
// Creating from Hive
val hiveContext = new org.apache.spark.sql.hive.HiveContext(spark.sparkContext)
val hiveDF = hiveContext.sql(“select * from emp”)
8. Spark Create DataFrame from RDBMS Database
8.a) From Mysql table
Make sure you have MySQL library as a dependency in your pom.xml file or MySQL jars in your classpath.
// From Mysql table
val df_mysql = spark.read.format(“jdbc”)
.option(“url”, “jdbc:mysql://localhost:port/db”)
.option(“driver”, “com.mysql.jdbc.Driver”)
.option(“dbtable”, “tablename”)
.option(“user”, “user”)
.option(“password”, “password”)
.load()
8.1 From DB2 table
Make sure you have DB2 library as a dependency in your pom.xml file or DB2 jars in your classpath.
// From DB2 table
val df_db2 = spark.read.format(“jdbc”)
.option(“url”, “jdbc:db2://localhost:50000/dbname”)
.option(“driver”, “com.ibm.db2.jcc.DB2Driver”)
.option(“dbtable”, “tablename”)
.option(“user”, “user”)
.option(“password”, “password”)
.load()
Similarly, we can create DataFrame in Spark from most of the relational databases which I’ve not covered here and I will leave this to you to explore.
9. Create DataFrame from HBase table
To create Spark DataFrame from the HBase table, we should use DataSource defined in Spark HBase connectors. for example use DataSource “org.apache.spark.sql.execution.datasources.hbase
” from Hortonworks or use “org.apache.hadoop.hbase.spark
” from spark HBase connector.
// Create DataFrame from HBase table
val hbaseDF = sparkSession.read
.options(Map(HBaseTableCatalog.tableCatalog -> catalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
Detail example explained at Generating DataFrame from HBase table
10. Other sources (Avro, Parquet, Kafka)
We can also create DataFrame from Avro, Parquet, HBase and reading data from Kafka which I’ve explained in the below articles, I would recommend reading these when you have time.
- Creating DataFrame from Parquet source
- Creating DataFrame from Avro source
- Creating DataFrame by Streaming data from Kafka
The complete code can be downloaded from GitHub
Happy Learning !!
hi,
In creating df from hive: i hive we must have multiple data bases, so how can we connected to the particular database?
in sql we are adding the option to connect to the DB but in hive how can we set the specific database to work with tables?
really nice explanation
spark data frame examples
I am new to Spark but have worked a lot in SQL, this site is a life saver, thanks a lot…everything at one place to get hands on, very very thankful to you sir.
Thanks Sono for your comments. Since you worked a lot on SQL, wondering if you would like to share your knowledge with the community by writing guest articles.
Excellent
Excellent