To save a PySpark DataFrame to Hive table use saveAsTable() function or use SQL CREATE statement on top of the temporary view. In order to save DataFrame as a Hive table in PySpark, you need to create a SparkSession with
This method is available
pyspark.sql.SparkSession.builder.enableHiveSupport() which enables Hive support, including connectivity to a persistent Hive metastore, support for Hive SerDes, and Hive user-defined functions.
Following are the Steps to Save PySpark DataFrame to Hive Table.
- Step 1 – Create SparkSession with hive enabled
- Step 2 – Create PySpark DataFrame
- Step 3 – Save PySpark DataFrame to Hive table
- Step 4 – Confirm Hive table is created
1. Create SparkSession with Hive Enabled
The first step to save a PySpark DataFrame to a Hive table is to Create a PySpark SparkSession with Hive support enabled,
from os.path import abspath from pyspark.sql import SparkSession # warehouse_location points to the default location for managed databases and tables warehouse_location = abspath('spark-warehouse') # Create spark session with hive enabled spark = SparkSession \ .builder \ .appName("SparkByExamples.com") \ .config("spark.sql.warehouse.dir", warehouse_location) \ .enableHiveSupport() \ .getOrCreate()
2. PySpark Save DataFrame to Hive Table
By using saveAsTable() from
DataFrameWriter you can save or write a PySpark DataFrame to a Hive table. Pass the table name you wanted to save as an argument to this function and make sure the table name is in the form of
database.tablename. If the database doesn’t exist, you will get an error. To start with you can also try just the table name without a database.
You can use this to write PySpark DataFrame to a new Hive table or overwrite an existing table. PySpark writes the data to the default Hive warehouse location which is
/user/hive/warehouse when you use a Hive cluster. But on local it creates in the current directory. You can change this behavior, using the
spark.sql.warehouse.dir configuration while creating a
Since we are running it locally from IntelliJ, it creates a metadata database
spark-warehouse under the current directory.
Related: What is Hive Metastore and Data Warehouse Location?
2.1 Save DataFrame as Internal Table from PySpark
By default saveAsTable() method saves PySpark DataFrame as a managed Hive table. Managed tables are also known as internal tables that are owned and managed by Hive. By default, Hive creates a table as an Internal table and owned the table structure and the files. When you drop an internal table, it drops the data and also drops the metadata of the table.
Let’s create a PySpark DataFrame and then use it to create a table from Spark.
columns = ["id", "name","age","gender"] # Create DataFrame data = [(1, "James",30,"M"), (2, "Ann",40,"F"), (3, "Jeff",41,"M"),(4, "Jennifer",20,"F")] sampleDF = spark.sparkContext.parallelize(data).toDF(columns) # Create Hive Internal table sampleDF.write.mode('overwrite') \ .saveAsTable("employee") # Read Hive table df = spark.read.table("employee") df.show()
As described above, it creates the Hive metastore
metastore_db and Hive warehouse location
spark-warehouse in the current directory (you can see this in IntelliJ). The
employee table is created inside the warehouse directory.
Also, note that by default it creates files in parquet format with snappy compression.
Related: PySpark Read & Write Parquet Files
If you wanted to create a table within a Database, use the prefix database name. If you don’t have the database, you can create one.
# Create database spark.sql("CREATE DATABASE IF NOT EXISTS emp") # Create Hive Internal table sampleDF.write.mode('overwrite') \ .saveAsTable("emp.employee")
2.2 Save as External Table
To create an external table use the path of your choice using
option(). The data in External tables are not owned or managed by Hive. Dropping an external table just drops the metadata but not the actual data. The actual data is still accessible outside of Hive.
# Create Hive External table sampleDF.write.mode(SaveMode.Overwrite) .option("path", "/path/to/external/table") .saveAsTable("emp.employee")
The above two examples create a DataFrame and create the
emp.employee table. If a table already exists, it overwrites the table.
3. Using PySpark SQL Temporary View to Save Hive Table
SparkSession.sql() method and
CREATE TABLE statement to create a table in Hive from PySpark temporary view. Above we have created a temporary view “
sampleView“. Now we shall create a Database and Table using SQL in Hive Metastore and insert data into the Hive table using the view we created above.
Use createOrReplaceTempView() to create a temporary view.
# Create temporary view sampleDF.createOrReplaceTempView("sampleView") # Create a Database CT spark.sql("CREATE DATABASE IF NOT EXISTS ct") # Create a Table naming as sampleTable under CT database. spark.sql("CREATE TABLE ct.sampleTable (id Int, name String, age Int, gender String)") # Insert into sampleTable using the sampleView. spark.sql("INSERT INTO TABLE ct.sampleTable SELECT * FROM sampleView") # Lets view the data in the table spark.sql("SELECT * FROM ct.sampleTable").show()
If you run this on IntelliJ, you will see the database
ct and table
sampletable as shown below.
For more information on types of tables see: Spark Types of Tables and Views
5. Complete Example
Following is a complete example of how to write PySpark DataFrame to Hive table.
#!/usr/bin/env python3 from os.path import abspath from pyspark.sql import SparkSession # warehouse_location points to the default location for managed databases and tables warehouse_location = abspath('spark-warehouse') # Create spark session with hive enabled spark = SparkSession \ .builder \ .appName("SparkByExamples.com") \ .config("spark.sql.warehouse.dir", warehouse_location) \ .config("spark.sql.catalogImplementation", "hive") \ .enableHiveSupport() \ .getOrCreate() columns = ["id", "name","age","gender"] # Create DataFrame data = [(1, "James",30,"M"), (2, "Ann",40,"F"), (3, "Jeff",41,"M"),(4, "Jennifer",20,"F")] sampleDF = spark.sparkContext.parallelize(data).toDF(columns) # Create database spark.sql("CREATE DATABASE IF NOT EXISTS emp") # Create Hive Internal table sampleDF.write.mode('overwrite') \ .saveAsTable("emp.employee") # Spark read Hive table df = spark.read.table("emp.employee") df.show()
In this article, you have learned how to save or write a PySpark DataFrame to a Hive table. We can also specify while saving a table whether to manage only the table or data and table combined (by creating an internal or external table).
You can find the complete working example at GitHub PySpark Hive Example
- PySpark SQL Read Hive Table
- PySpark Read and Write MySQL Database Table
- PySpark Read JDBC Table to DataFrame
- PySpark Read and Write SQL Server Table
- PySpark createOrReplaceTempView() Explained
- Pandas API on Spark | Explained With Examples
- PySpark Query Database Table using JDBC
- Read JDBC in Parallel using PySpark