In Spark foreachPartition()
is used when you have a heavy initialization (like database connection) and wanted to initialize once per partition where as foreach() is used to apply a function on every element of a RDD/DataFrame/Dataset partition.
In this Spark Dataframe article, you will learn what is foreachPartiton used for and the differences with its sibling foreach (foreachPartiton vs foreach) function.
Spark foreachPartition is an action operation and is available in RDD, DataFrame, and Dataset. This is different than other actions as foreachPartition() function doesn’t return a value instead it executes input function on each partition.
- DataFrame foreachPartition() Usage
- DataFrame foreach() Usage
- RDD foreachPartition() Usage
- RDD foreach() Usage
1. DataFrame foreachPartition() Usage
On Spark DataFrame foreachPartition()
is similar to foreach()
action which is used to manipulate the accumulators, write to a database table or external data sources but the difference being foreachPartiton() gives you an option to do heavy initializations per each partition and is consider most efficient.
1.1 Syntax
foreachPartition(f : scala.Function1[scala.Iterator[T], scala.Unit]) : scala.Unit
When foreachPartition() applied on Spark DataFrame, it executes a function specified in foreach() for each partition on DataFrame. This operation is mainly used if you wanted to save the DataFrame result to RDBMS tables, or produce it to kafka topics e.t.c
Example
In this example, to make it simple we just print the DataFrame to console.
// ForeachPartition DataFrame
val df = spark.createDataFrame(data).toDF("Product","Amount","Country")
df.foreachPartition(partition => {
// Initialize database connection or kafka
partition.foreach(fun=>{
// Apply the function to insert the database
// Or produce kafka topic
})
// If you have batch inserts, do here.
})
2. DataFrame foreach() Usage
When foreach()
applied on Spark DataFrame, it executes a function specified in for each element of DataFrame/Dataset. This operation is mainly used if you wanted to <a href="https://sparkbyexamples.com/spark/spark-accumulators/">manipulate accumulators</a>
, and any other operations which doesn’t have heavy initializations.
// DataFrame foreach() Usage
val longAcc = spark.sparkContext.longAccumulator("SumAccumulator")
df.foreach(f=> {
longAcc.add(f.getInt(1))
})
println("Accumulator value:"+longAcc.value)
3. RDD foreachPartition() Usage
foreach() on RDD behaves similarly to DataFrame equivalent hence, it has the same syntax.
Syntax
foreachPartition(f : scala.Function1[scala.Iterator[T], scala.Unit]) : scala.Unit
Example
// ForeachPartition DataFrame
val rdd = spark.sparkContext.parallelize(Seq(1,2,3,4,5,6,7,8,9))
rdd.foreachPartition(partition => {
// Initialize any database connection
partition.foreach(fun=>{
// Apply the function
})
})
4. Spark RDD foreach() Usage
rdd foreach() is equivalent to DataFrame foreach() action.
// Rdd accumulator
val rdd2 = spark.sparkContext.parallelize(Seq(1,2,3,4,5,6,7,8,9))
val longAcc2 = spark.sparkContext.longAccumulator("SumAccumulator2")
rdd .foreach(f=> {
longAcc2.add(f)
})
println("Accumulator value:"+longAcc2.value)
Conclusion
You should use foreachPartition action operation when using heavy initialization like database connections or Kafka producer etc where it initializes one per partition rather than one per element(foreach). foreach() transformation mostly used to update accumulator variables.
Related Articles
- Spark foreach() Usage With Examples
- Spark Word Count Explained with Example
- Spark Get DataType & Column Names of DataFrame
- Apache Spark Interview Questions
- Spark Partitioning & Partition Understanding
- Spark Get Current Number of Partitions of DataFrame
- Spark map() vs mapPartitions() with Examples
- Spark SQL Shuffle Partitions
Reference:
Happy Learning !!