Spark foreach() Usage With Examples

In Spark, foreach() is an action operation that is available in RDD, DataFrame, and Dataset to iterate/loop over each element in the dataset, It is similar to for with advance concepts. This is different than other actions as foreach() function doesn’t return a value instead it executes input function on each element of an RDD, DataFrame, and Dataset.

Spark DataFrame foreach() Usage

When foreach() applied on Spark DataFrame, it executes a function specified in for each element of DataFrame/Dataset. This operation is mainly used if you wanted to <a href="https://sparkbyexamples.com/spark/spark-accumulators/">manipulate accumulators</a>, save the DataFrame results to RDBMS tables, Kafka topics, and other external sources.

Syntax


foreach(f : scala.Function1[T, scala.Unit]) : scala.Unit

DataFrame foreach() Example

In this example, to make it simple we just print the DataFrame to console.


import org.apache.spark.sql.SparkSession

object ForEachExample extends App {

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  val data = Seq(("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"),
    ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"),
    ("Carrots",1200,"China"),("Beans",1500,"China"))

  //DataFrame
  val df = spark.createDataFrame(data).toDF("Product","Amount","Country")
  df.foreach(f=> println(f))

}

Using foreach() to update accumulator.


  val longAcc = spark.sparkContext.longAccumulator("SumAccumulator")
  df.foreach(f=> {
    longAcc.add(f.getInt(1))
  })
  println("Accumulator value:"+longAcc.value)

Spark RDD foreach() Usage

foreach() on RDD behaves similarly to DataFrame equivalent, hence the same syntax and it also used to manipulate accumulators from RDD, and write external data sources.

Syntax


foreach(f : scala.Function1[T, scala.Unit]) : scala.Unit

RDD foreach() Example


import org.apache.spark.sql.SparkSession

object ForEachExample extends App {

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  //rdd
  val rdd = spark.sparkContext.parallelize(Seq(1,2,3,4,5,6,7,8,9))
  val longAcc = spark.sparkContext.longAccumulator("SumAccumulator")
  rdd .foreach(f=> {
    longAcc.add(f)
  })
  println("Accumulator value:"+longAcc.value)
}

Conclusion

In conclusion, Spark foreach() is an action operation of RDD, DataFrame, Dataset which doesn’t have any return type and is used to manipulate the accumulator and write any external data sources.

References

Happy Learning !!

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

This Post Has 2 Comments

  1. Anonymous

    your examples are crisp and clear!!!Keep the good work!!!

    1. NNK

      Thank you.

Leave a Reply

Spark foreach() Usage With Examples