While working with a huge dataset Python pandas DataFrame is not good enough to perform complex transformation operations on big data set, hence if you have a Spark cluster, it’s better to convert pandas to PySpark DataFrame, apply the complex transformations on Spark cluster, and convert it back.
In this article, I will explain the steps in converting pandas to PySpark DataFrame and how to Optimize the pandas to PySpark DataFrame Conversion by enabling Apache Arrow.
Key Points –
- Ensure PySpark is installed on your system to utilize its DataFrame functionalities.
- Import necessary modules such as
pandas
andpyspark.sql
to facilitate the conversion process. - Utilize the
createDataFrame()
method to convert the Pandas DataFrame into a PySpark DataFrame. - Create a SparkSession object to interact with Spark and handle DataFrame operations.
- Be mindful of potential differences in data handling and performance between Pandas and PySpark when converting, ensuring compatibility with your workflow and computational requirements.
Create Pandas DataFrame
To convert pandas to PySpark DataFrame first, let’s create Pandas DataFrame with some test data. To use pandas you have to import it first using import pandas as pd
import pandas as pd
data = [['Scott', 50], ['Jeff', 45], ['Thomas', 54],['Ann',34]]
# Create the pandas DataFrame
pandasDF = pd.DataFrame(data, columns = ['Name', 'Age'])
# print dataframe.
print(pandasDF)
# Prints below Pandas DataFrame
Name Age
0 Scott 50
1 Jeff 45
2 Thomas 54
3 Ann 34
Convert Pandas to PySpark (Spark) DataFrame
Spark provides a createDataFrame(pandas_dataframe)
method to convert pandas to Spark DataFrame, Spark by default infers the schema based on the pandas data types to PySpark data types.
from pyspark.sql import SparkSession
#Create PySpark SparkSession
spark = SparkSession.builder \
.master("local[1]") \
.appName("SparkByExamples.com") \
.getOrCreate()
#Create PySpark DataFrame from Pandas
sparkDF=spark.createDataFrame(pandasDF)
sparkDF.printSchema()
sparkDF.show()
#Outputs below schema & DataFrame
root
|-- Name: string (nullable = true)
|-- Age: long (nullable = true)
+------+---+
| Name|Age|
+------+---+
| Scott| 50|
| Jeff| 45|
|Thomas| 54|
| Ann| 34|
+------+---+
If you want all data types to String use spark.createDataFrame(pandasDF.astype(str))
.
Change Column Names & DataTypes while Converting
If you want to change the schema (column name & data type) while converting pandas to PySpark DataFrame, create a PySpark Schema using StructType and use it for the schema.
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
#Create User defined Custom Schema using StructType
mySchema = StructType([ StructField("First Name", StringType(), True)\
,StructField("Age", IntegerType(), True)])
#Create DataFrame by changing schema
sparkDF2 = spark.createDataFrame(pandasDF,schema=mySchema)
sparkDF2.printSchema()
sparkDF2.show()
#Outputs below schema & DataFrame
root
|-- First Name: string (nullable = true)
|-- Age: integer (nullable = true)
+----------+---+
|First Name|Age|
+----------+---+
| Scott| 50|
| Jeff| 45|
| Thomas| 54|
| Ann| 34|
+----------+---+
Use Apache Arrow to Convert pandas to Spark DataFrame
Apache Spark uses Apache Arrow which is an in-memory columnar format to transfer the data between Python and JVM. You need to enable to use of Arrow as this is disabled by default and have Apache Arrow (PyArrow) install on all Spark cluster nodes using pip install pyspark[sql]
or by directly downloading from Apache Arrow for Python.
spark.conf.set("spark.sql.execution.arrow.enabled","true")
sparkDF=spark.createDataFrame(pandasDF)
sparkDF.printSchema()
sparkDF.show()
You need to have Spark-compatible Apache Arrow installed to use the above statement, In case you have not installed Apache Arrow you get the below error.
\apps\Anaconda3\lib\site-packages\pyspark\sql\pandas\conversion.py:289: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed by the reason below:
PyArrow >= 0.15.1 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
When an error occurs, Spark automatically fallback to non-Arrow optimization implementation, this can be controlled by spark.sql.execution.arrow.pyspark.fallback.enabled
.
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
Note: Apache Arrow currently support all Spark SQL data types except MapType
, ArrayType
of TimestampType
, and nested <a href="https://sparkbyexamples.com/pyspark/pyspark-structtype-and-structfield/">StructType</a>
.
Complete Example of Convert Pandas to Spark DataFrame
import pandas as pd
data = [['Scott', 50], ['Jeff', 45], ['Thomas', 54],['Ann',34]]
# Create the pandas DataFrame
pandasDF = pd.DataFrame(data, columns = ['Name', 'Age'])
# print dataframe.
print(pandasDF)
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[1]") \
.appName("SparkByExamples.com") \
.getOrCreate()
sparkDF=spark.createDataFrame(pandasDF)
sparkDF.printSchema()
sparkDF.show()
#sparkDF=spark.createDataFrame(pandasDF.astype(str))
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
mySchema = StructType([ StructField("First Name", StringType(), True)\
,StructField("Age", IntegerType(), True)])
sparkDF2 = spark.createDataFrame(pandasDF,schema=mySchema)
sparkDF2.printSchema()
sparkDF2.show()
# Enable Apache Arrow to convert Pandas to PySpark DataFrame
spark.conf.set("spark.sql.execution.arrow.enabled","true")
sparkDF2=spark.createDataFrame(pandasDF)
sparkDF2.printSchema()
sparkDF2.show()
#Convert PySpark DataFrame to Pandas
pandasDF2=sparkDF2.select("*").toPandas
print(pandasDF2)
Frequently Asked Questions
Converting Pandas DataFrame to PySpark DataFrame allows you to leverage the distributed computing capabilities of PySpark, enabling the processing of large datasets that may not fit into memory on a single machine.
Pandas DataFrame operates on a single machine, suitable for smaller datasets, while PySpark DataFrame distributes data across a cluster, making it efficient for handling large-scale datasets. Additionally, PySpark DataFrame supports parallel processing, enabling faster computations compared to Pandas.
You can use PySpark’s createDataFrame()
function, passing the Pandas DataFrame as an argument to create a PySpark DataFrame. This function handles the conversion seamlessly.
It’s essential to consider the performance implications, especially when dealing with large datasets. PySpark DataFrame operations are optimized for distributed computing, whereas Pandas operations are primarily designed for single-machine processing. Be mindful of the potential differences in performance and resource utilization.
Conclusion
In this article, you have learned how easy to convert pandas to Spark DataFrame and optimize the conversion using Apache Arrow (in-memory columnar format).
Happy Learning!!
Thank you ma’am/sir. This is exactly what I needed.