In Spark, foreach()
is an action operation that is available in RDD, DataFrame, and Dataset to iterate/loop over each element in the dataset, It is similar to for
with advance concepts. This is different than other actions as foreach()
function doesn’t return a value instead it executes input function on each element of an RDD, DataFrame, and Dataset.
Spark DataFrame foreach() Usage
When foreach()
applied on Spark DataFrame, it executes a function specified in for each element of DataFrame/Dataset. This operation is mainly used if you wanted to <a href="https://sparkbyexamples.com/spark/spark-accumulators/">manipulate accumulators</a>
, save the DataFrame results to RDBMS tables, Kafka topics, and other external sources.
Syntax
foreach(f : scala.Function1[T, scala.Unit]) : scala.Unit
DataFrame foreach() Example
In this example, to make it simple we just print the DataFrame to console.
import org.apache.spark.sql.SparkSession
object ForEachExample extends App {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
val data = Seq(("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"),
("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"),
("Carrots",1200,"China"),("Beans",1500,"China"))
//DataFrame
val df = spark.createDataFrame(data).toDF("Product","Amount","Country")
df.foreach(f=> println(f))
}
Using foreach() to update accumulator.
val longAcc = spark.sparkContext.longAccumulator("SumAccumulator")
df.foreach(f=> {
longAcc.add(f.getInt(1))
})
println("Accumulator value:"+longAcc.value)
Spark RDD foreach() Usage
foreach() on RDD behaves similarly to DataFrame equivalent, hence the same syntax and it also used to manipulate accumulators from RDD, and write external data sources.
Syntax
foreach(f : scala.Function1[T, scala.Unit]) : scala.Unit
RDD foreach() Example
import org.apache.spark.sql.SparkSession
object ForEachExample extends App {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
//rdd
val rdd = spark.sparkContext.parallelize(Seq(1,2,3,4,5,6,7,8,9))
val longAcc = spark.sparkContext.longAccumulator("SumAccumulator")
rdd .foreach(f=> {
longAcc.add(f)
})
println("Accumulator value:"+longAcc.value)
}
Conclusion
In conclusion, Spark foreach() is an action operation of RDD, DataFrame, Dataset which doesn’t have any return type and is used to manipulate the accumulator and write any external data sources.
Related Articles
- Spark foreachPartition vs foreach | what to use?
- Spark Accumulators Explained
- Apache Spark Interview Questions
- Spark foreachPartition vs foreach | what to use?
- Spark map() Transformation
- Spark Accumulators Explained
- Spark Shell Command Usage with Examples
- Usage of Spark flatMap() Transformation
Reference
Happy Learning !!
your examples are crisp and clear!!!Keep the good work!!!
Thank you.