You are currently viewing Spark foreach() Usage With Examples

In Spark, foreach() is an action operation that is available in RDD, DataFrame, and Dataset to iterate/loop over each element in the dataset, It is similar to for with advance concepts. This is different than other actions as foreach() function doesn’t return a value instead it executes input function on each element of an RDD, DataFrame, and Dataset.

1. Spark DataFrame foreach() Usage

When foreach() applied on Spark DataFrame, it executes a function specified in for each element of DataFrame/Dataset. This operation is mainly used if you wanted to <a href="https://sparkbyexamples.com/spark/spark-accumulators/">manipulate accumulators</a>, save the DataFrame results to RDBMS tables, Kafka topics, and other external sources.

1.1 Syntax


foreach(f : scala.Function1[T, scala.Unit]) : scala.Unit

1.2 DataFrame foreach() Example

In this example, to make it simple we just print the DataFrame to console.


import org.apache.spark.sql.SparkSession

object ForEachExample extends App {

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  val data = Seq(("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"),
    ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"),
    ("Carrots",1200,"China"),("Beans",1500,"China"))

  // DataFrame
  val df = spark.createDataFrame(data).toDF("Product","Amount","Country")
  df.foreach(f=> println(f))

}

Using foreach() to update accumulator.


  val longAcc = spark.sparkContext.longAccumulator("SumAccumulator")
  df.foreach(f=> {
    longAcc.add(f.getInt(1))
  })
  println("Accumulator value:"+longAcc.value)

2. Spark RDD foreach() Usage

foreach() on RDD behaves similarly to DataFrame equivalent, hence the same syntax and it also used to manipulate accumulators from RDD, and write external data sources.

2.1 Syntax


foreach(f : scala.Function1[T, scala.Unit]) : scala.Unit

2.2 RDD foreach() Example


import org.apache.spark.sql.SparkSession

object ForEachExample extends App {

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  // Rdd
  val rdd = spark.sparkContext.parallelize(Seq(1,2,3,4,5,6,7,8,9))
  val longAcc = spark.sparkContext.longAccumulator("SumAccumulator")
  rdd .foreach(f=> {
    longAcc.add(f)
  })
  println("Accumulator value:"+longAcc.value)
}

Conclusion

In conclusion, Spark foreach() is an action operation of RDD, DataFrame, Dataset which doesn’t have any return type and is used to manipulate the accumulator and write any external data sources.

Reference

Happy Learning !!

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

This Post Has 2 Comments

  1. NNK

    Thank you.

  2. Anonymous

    your examples are crisp and clear!!!Keep the good work!!!

Comments are closed.