Spark Save DataFrame to Hive Table

How to save or write a Spark DataFrame to a Hive table? Spark SQL supports writing DataFrame to Hive tables, there are two ways to write a DataFrame as a Hive table in Spark: the saveAsTable() method of DataFrameWriter class and the SQL CREATE statement on top of the temporary view.

In order to save DataFrame as a Hive table, you need to create a SparkSession with enableHiveSupport(). This method is available spark.sql.SparkSession.builder.enableHiveSupport() which enables Hive support, including connectivity to a persistent Hive metastore, support for Hive SerDes, and Hive user-defined functions.

Following are the Steps to Save Spark DataFrame to Hive Table.

  • Step 1 – Use spark-hive dependency
  • Step 2 – Create SparkSession with hive enabled
  • Step 3 – Create Spark DataFrame
  • Step 4 – Save Spark DataFrame to Hive table
  • Step 5 – Confirm Hive table is created

1. Spark Hive Dependencies

To enable Hive support you would need the following dependencies in Maven pom.xml file. If you are using sbt use the following dependencies accordingly.


<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-core_2.13</artifactId>
   <version>3.2.1</version>
   <scope>compile</scope>
</dependency>

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-sql_2.13</artifactId>
   <version>3.2.1</version>
   <scope>compile</scope>
</dependency>

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-hive_2.13</artifactId>
   <version>3.2.1</version>
</dependency>

2. Create Spark Session with Hive Enabled

Create a SparkSession with Hive support enabled,


//enableHiveSupport() -> enables sparkSession to connect with Hive
val spark = SparkSession.builder()
    .master("local[*]")
    .appName("SparkCreateTableExample")
    .enableHiveSupport()
    .getOrCreate()

3. Spark Save DataFrame to Hive Table

By using saveAsTable() from DataFrameWriter you can save or write a Spark DataFrame to a Hive table. Pass the table name you wanted to save as an argument to this function and make sure the table name is in the form of database.tablename. If the database doesn’t exist, you will get an error. To start with you can also try just the table name without a database.

You can use this to write Spark DataFrame to a new Hive table or overwrite an existing table. Spark writes the data to the default Hive warehouse location which is /user/hive/warehouse when you use a Hive cluster. But on local it creates in the current directory. You can change this behavior, using the spark.sql.warehouse.dir configuration while creating a SparkSession .

Since we are running it locally from IntelliJ, it creates a metadata database metastore_db and spark-warehouse under the current directory.

Related: What is Hive Metastore and Data Warehouse Location?

3.1 Save DataFrame as Internal Table from Spark

By default saveAsTable() method saves DataFrame as a managed Hive table. Managed tables are also known as internal tables that are owned and managed by Hive. By default, Hive creates a table as an Internal table and owned the table structure and the files. When you drop an internal table, it drops the data and also drops the metadata of the table.

Let’s create a DataFrame and then use it to create a table from Spark.


import spark.implicits._

// Create DataFrame
val sampleDF = Seq(
    (1, "James",30,"M"),
    (2, "Ann",40,"F"),
    (3, "Jeff",41,"M"),
    (4, "Jennifer",20,"F")
  ).toDF("id", "name","age","gender")

// Create Hive Internal table
sampleDF.write.mode(SaveMode.Overwrite)
        .saveAsTable("emp.employee")

As described above, it creates the Hive metastore metastore_db and Hive warehouse location spark-warehouse in the current directory (you can see this in IntelliJ). The employee table is created inside the warehouse directory.

Also, note that by default it creates files in parquet format with snappy compression.

Related: Spark Read & Write Parquet File

spark save hive table

If you wanted to create a table within a Database, use the prefix database name. If you don’t have the database, you can create one.


// Create database 
spark.sql("CREATE DATABASE IF NOT EXISTS emp")

// Create Hive Internal table
sampleDF.write.mode(SaveMode.Overwrite)
    .saveAsTable("emp.employee")

To change the default warehouse directory and metastore either you can do it while creating the SparkSession or using the conf. For hive.metastore.uris make sure you have the database running on the specified port.


// Change default wareshouse location and metastore 
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession.builder().master("local[*]")
    .appName("SparkCreateTableExample")
    .config("spark.sql.warehouse.dir", "/hive/warehouse/dir")
    .config("hive.metastore.uris", "thrift://localhost:9083")
    .enableHiveSupport()
    .getOrCreate()

// or Use the below
// Change using conf
spark.sparkContext().conf().set("spark.sql.warehouse.dir", "/user/hive/warehouse");
spark.sparkContext().conf().set("hive.metastore.uris", "thrift://localhost:9083");

3.2 Save as External Table

To create an external table use the path of your choice using option(). The data in External tables are not owned or managed by Hive. Dropping an external table just drops the metadata but not the actual data. The actual data is still accessible outside of Hive.


// Create Hive External table
sampleDF.write.mode(SaveMode.Overwrite)
        .option("path", "/path/to/external/table")
        .saveAsTable("emp.employee")

The above two examples create a DataFrame and create the emp.employee table. If a table already exists, it overwrites the table.

3.3 List of Tables

To get the list of tables use the following method.


// List tables
spark.catalog.listTables()

4. Using Spark SQL Temporary View

Use spark.sql() method and CREATE TABLE statement to create a table in Hive from Spark temporary view. Above we have created a temporary view “sampleView“. Now we shall create a Database and Table using SQL in Hive Metastore and insert data into the Hive table using the view we created above.


import spark.implicits._

// Create DataFrame
val sampleDF = Seq(
    (1, "Tiger"),
    (2, "Lion"),
    (3, "Monkey")
  ).toDF("id", "animal")

// Create temporary view
sampleDF.createOrReplaceTempView("sampleView")

//Create a Database CT
spark.sql("CREATE DATABASE IF NOT EXISTS ct")

//Create a Table naming as sampleTable under CT database.
spark.sql("CREATE TABLE ct.sampleTable (number Int, word String)")

//Insert into sampleTable using the sampleView. 
spark.sql("INSERT INTO TABLE ct.sampleTable  SELECT * FROM sampleView")

//Lets view the data in the table
spark.sql("SELECT * FROM ct.sampleTable").show()

//Output
+---+------+
| id|animal|
+---+------+
|  1| Tiger|
|  2|  Lion|
|  3|Monkey|
+---+------+

If you run this on IntelliJ, you will see the database ct and table sampletable as shown below.

spark save hive table

For more information on types of tables see: Spark Types of Tables and Views

5. Conclusion

In this article, you have learned how to save or write a Spark DataFrame to a Hive table. We can also specify while saving a table whether to manage only the table or data and table combined (by creating an internal or external table).

You can find the complete working example at GitHub Spark Hive Example

Related Articles

References

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

Leave a Reply