PySpark Save DataFrame to Hive Table

To save a PySpark DataFrame to Hive table use saveAsTable() function or use SQL CREATE statement on top of the temporary view. In order to save DataFrame as a Hive table in PySpark, you need to create a SparkSession with enableHiveSupport().

This method is available pyspark.sql.SparkSession.builder.enableHiveSupport() which enables Hive support, including connectivity to a persistent Hive metastore, support for Hive SerDes, and Hive user-defined functions.

Following are the Steps to Save PySpark DataFrame to Hive Table.

  • Step 1 – Create SparkSession with hive enabled
  • Step 2 – Create PySpark DataFrame
  • Step 3 – Save PySpark DataFrame to Hive table
  • Step 4 – Confirm Hive table is created

1. Create SparkSession with Hive Enabled

The first step to save a PySpark DataFrame to a Hive table is to Create a PySpark SparkSession with Hive support enabled,


from os.path import abspath
from pyspark.sql import SparkSession

# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

# Create spark session with hive enabled
spark = SparkSession \
    .builder \
    .appName("SparkByExamples.com") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

2. PySpark Save DataFrame to Hive Table

By using saveAsTable() from DataFrameWriter you can save or write a PySpark DataFrame to a Hive table. Pass the table name you wanted to save as an argument to this function and make sure the table name is in the form of database.tablename. If the database doesn’t exist, you will get an error. To start with you can also try just the table name without a database.

You can use this to write PySpark DataFrame to a new Hive table or overwrite an existing table. PySpark writes the data to the default Hive warehouse location which is /user/hive/warehouse when you use a Hive cluster. But on local it creates in the current directory. You can change this behavior, using the spark.sql.warehouse.dir configuration while creating a SparkSession .

Since we are running it locally from IntelliJ, it creates a metadata database metastore_db and spark-warehouse under the current directory.

Related: What is Hive Metastore and Data Warehouse Location?

2.1 Save DataFrame as Internal Table from PySpark

By default saveAsTable() method saves PySpark DataFrame as a managed Hive table. Managed tables are also known as internal tables that are owned and managed by Hive. By default, Hive creates a table as an Internal table and owned the table structure and the files. When you drop an internal table, it drops the data and also drops the metadata of the table.

Let’s create a PySpark DataFrame and then use it to create a table from Spark.


columns = ["id", "name","age","gender"]

# Create DataFrame 
data = [(1, "James",30,"M"), (2, "Ann",40,"F"),
    (3, "Jeff",41,"M"),(4, "Jennifer",20,"F")]
sampleDF = spark.sparkContext.parallelize(data).toDF(columns)

# Create Hive Internal table
sampleDF.write.mode('overwrite') \
         .saveAsTable("employee")

# Read Hive table
df = spark.read.table("employee")
df.show()

As described above, it creates the Hive metastore metastore_db and Hive warehouse location spark-warehouse in the current directory (you can see this in IntelliJ). The employee table is created inside the warehouse directory.

pyspark save hive table

Also, note that by default it creates files in parquet format with snappy compression.

Related: PySpark Read & Write Parquet Files

If you wanted to create a table within a Database, use the prefix database name. If you don’t have the database, you can create one.


# Create database 
spark.sql("CREATE DATABASE IF NOT EXISTS emp")

# Create Hive Internal table
sampleDF.write.mode('overwrite') \
    .saveAsTable("emp.employee")

2.2 Save as External Table

To create an external table use the path of your choice using option(). The data in External tables are not owned or managed by Hive. Dropping an external table just drops the metadata but not the actual data. The actual data is still accessible outside of Hive.


# Create Hive External table
sampleDF.write.mode(SaveMode.Overwrite)
        .option("path", "/path/to/external/table")
        .saveAsTable("emp.employee")

The above two examples create a DataFrame and create the emp.employee table. If a table already exists, it overwrites the table.

3. Using PySpark SQL Temporary View to Save Hive Table

Use SparkSession.sql() method and CREATE TABLE statement to create a table in Hive from PySpark temporary view. Above we have created a temporary view “sampleView“. Now we shall create a Database and Table using SQL in Hive Metastore and insert data into the Hive table using the view we created above.

Use createOrReplaceTempView() to create a temporary view.


# Create temporary view
sampleDF.createOrReplaceTempView("sampleView")

# Create a Database CT
spark.sql("CREATE DATABASE IF NOT EXISTS ct")

# Create a Table naming as sampleTable under CT database.
spark.sql("CREATE TABLE ct.sampleTable (id Int, name String, age Int, gender String)")

# Insert into sampleTable using the sampleView. 
spark.sql("INSERT INTO TABLE ct.sampleTable  SELECT * FROM sampleView")

# Lets view the data in the table
spark.sql("SELECT * FROM ct.sampleTable").show()

If you run this on IntelliJ, you will see the database ct and table sampletable as shown below.

For more information on types of tables see: Spark Types of Tables and Views

5. Complete Example

Following is a complete example of how to write PySpark DataFrame to Hive table.


#!/usr/bin/env python3

from os.path import abspath
from pyspark.sql import SparkSession

# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

# Create spark session with hive enabled
spark = SparkSession \
    .builder \
    .appName("SparkByExamples.com") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("spark.sql.catalogImplementation", "hive") \
    .enableHiveSupport() \
    .getOrCreate()

columns = ["id", "name","age","gender"]

# Create DataFrame 
data = [(1, "James",30,"M"), (2, "Ann",40,"F"),
    (3, "Jeff",41,"M"),(4, "Jennifer",20,"F")]
sampleDF = spark.sparkContext.parallelize(data).toDF(columns)

# Create database 
spark.sql("CREATE DATABASE IF NOT EXISTS emp")

# Create Hive Internal table
sampleDF.write.mode('overwrite') \
          .saveAsTable("emp.employee")

# Spark read Hive table
df = spark.read.table("emp.employee")
df.show()

6. Conclusion

In this article, you have learned how to save or write a PySpark DataFrame to a Hive table. We can also specify while saving a table whether to manage only the table or data and table combined (by creating an internal or external table).

You can find the complete working example at GitHub PySpark Hive Example

Related Articles

References

Leave a Reply