Working with Spark DataFrame Filter

  • Post Author:
  • Post Category:Apache Spark

Spark filter() function is used to filter the rows from DataFrame or Dataset based on the given condition or SQL expression, alternatively, you can also use where() operator instead of the filter if you are coming from SQL background. Both these functions are exactly the same.

In this article, you will learn how to apply filter conditions on primitive data types, arrays, struct using single and multiple conditions on DataFrame with Scala examples.

Spark DataFrame filter() Syntaxes


1) filter(condition: Column): Dataset[T]
2) filter(conditionExpr: String): Dataset[T] //using SQL expression
3) filter(func: T => Boolean): Dataset[T]
4) filter(func: FilterFunction[T]): Dataset[T]

The first signature is used with condition with Column names using $colname, col("colname"), 'colname and df("colname") with condition expression.

The second signature will be used to provide SQL expressions to filter() rows.

The third signature can be used to SQL functions where function applied on each row and the result with “true” are returned.

The fourth signature is used with FilterFunction class.

Before we start with examples, first let’s create a DataFrame.


  val arrayStructureData = Seq(
    Row(Row("James","","Smith"),List("Java","Scala","C++"),"OH","M"),
    Row(Row("Anna","Rose",""),List("Spark","Java","C++"),"NY","F"),
    Row(Row("Julia","","Williams"),List("CSharp","VB"),"OH","F"),
    Row(Row("Maria","Anne","Jones"),List("CSharp","VB"),"NY","M"),
    Row(Row("Jen","Mary","Brown"),List("CSharp","VB"),"NY","M"),
    Row(Row("Mike","Mary","Williams"),List("Python","VB"),"OH","M")
  )

  val arrayStructureSchema = new StructType()
    .add("name",new StructType()
      .add("firstname",StringType)
      .add("middlename",StringType)
      .add("lastname",StringType))
    .add("languages", ArrayType(StringType))
    .add("state", StringType)
    .add("gender", StringType)

  val df = spark.createDataFrame(
    spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
  df.printSchema()
  df.show()

This yields below schema and DataFrame results.


root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    [James, , Smith]|[Java, Scala, C++]|   OH|     M|
|      [Anna, Rose, ]|[Spark, Java, C++]|   NY|     F|
| [Julia, , Williams]|      [CSharp, VB]|   OH|     F|
|[Maria, Anne, Jones]|      [CSharp, VB]|   NY|     M|
|  [Jen, Mary, Brown]|      [CSharp, VB]|   NY|     M|
|[Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+

DataFrame filter() with Column condition

Use Column with the condition to filter the rows from DataFrame, using this you can express complex condition by referring column names using col(name), $"colname" dfObject("colname") , this approach is mostly used while working with DataFrames. Use “===” for comparison.


  df.filter(df("state") === "OH")
    .show(false)

This yields below DataFrame results.


+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|[James, , Smith]      |[Java, Scala, C++]|OH   |M     |
|[Julia, , Williams]   |[CSharp, VB]      |OH   |F     |
|[Mike, Mary, Williams]|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+

DataFrame filter() with SQL Expression

If you are coming from SQL background, you can use that knowledge in Spark to filter DataFrame rows with SQL expressions.


  df.filter("gender == 'M'")
    .show(false)

This yields below DataFrame results.


+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|[James, , Smith]      |[Java, Scala, C++]|OH   |M     |
|[Maria, Anne, Jones]  |[CSharp, VB]      |NY   |M     |
|[Jen, Mary, Brown]    |[CSharp, VB]      |NY   |M     |
|[Mike, Mary, Williams]|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+

Filtering with multiple conditions

To filter() rows on DataFrame based on multiple conditions, you case use either Column with a condition or SQL expression. Below is just a simple example, you can extend this with AND(&&), OR(||), and NOT(!) conditional expressions as needed.


  //multiple condition
  df.filter(df("state") === "OH" && df("gender") === "M")
    .show(false)

This yields below DataFrame results.


+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|[James, , Smith]      |[Java, Scala, C++]|OH   |M     |
|[Mike, Mary, Williams]|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+

Filtering on an Array column

When you want to filter rows from DataFrame based on value present in an array collection column, you can use the first syntax. The below example uses array_contains() SQL function which checks if a value contains in an array if present it returns true otherwise false.


  df.filter(array_contains(df("languages"),"Java"))
    .show(false)

This yields below DataFrame results.


+----------------+------------------+-----+------+
|name            |languages         |state|gender|
+----------------+------------------+-----+------+
|[James, , Smith]|[Java, Scala, C++]|OH   |M     |
|[Anna, Rose, ]  |[Spark, Java, C++]|NY   |F     |
+----------------+------------------+-----+------+

Filtering on Nested Struct columns

If your DataFrame consists of nested struct columns, you can use any of the above syntaxes to filter the rows based on the nested column.


  //Struct condition
  df.filter(df("name.lastname") === "Williams")
    .show(false)

This yields below DataFrame results


+----------------------+------------+-----+------+
|name                  |languages   |state|gender|
+----------------------+------------+-----+------+
|[Julia, , Williams]   |[CSharp, VB]|OH   |F     |
|[Mike, Mary, Williams]|[Python, VB]|OH   |M     |
+----------------------+------------+-----+------+

Source code of Spark DataFrame Filter


package com.sparkbyexamples.spark.dataframe

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{ArrayType, StringType, StructType}
import org.apache.spark.sql.functions.array_contains
object FilterExample extends App{

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  val arrayStructureData = Seq(
    Row(Row("James","","Smith"),List("Java","Scala","C++"),"OH","M"),
    Row(Row("Anna","Rose",""),List("Spark","Java","C++"),"NY","F"),
    Row(Row("Julia","","Williams"),List("CSharp","VB"),"OH","F"),
    Row(Row("Maria","Anne","Jones"),List("CSharp","VB"),"NY","M"),
    Row(Row("Jen","Mary","Brown"),List("CSharp","VB"),"NY","M"),
    Row(Row("Mike","Mary","Williams"),List("Python","VB"),"OH","M")
  )

  val arrayStructureSchema = new StructType()
    .add("name",new StructType()
      .add("firstname",StringType)
      .add("middlename",StringType)
      .add("lastname",StringType))
    .add("languages", ArrayType(StringType))
    .add("state", StringType)
    .add("gender", StringType)

  val df = spark.createDataFrame(
    spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
  df.printSchema()
  df.show()

  //Condition
  df.filter(df("state") === "OH")
    .show(false)

  //SQL Expression
  df.filter("gender == 'M'")
    .show(false)

  //multiple condition
  df.filter(df("state") === "OH" && df("gender") === "M")
    .show(false)

  //Array condition
  df.filter(array_contains(df("languages"),"Java"))
    .show(false)

  //Struct condition
  df.filter(df("name.lastname") === "Williams")
    .show(false)

}

Examples explained here are also available at GitHub project for reference.

Conclusion

In this tutorial, I’ve explained how to filter rows from Spark DataFrame based on single or multiple conditions and SQL expression, also learned filtering rows by providing conditions on the array and struct column with Scala examples.

Alternatively, you also use where() function to filter the rows on DataFrame.

Thanks for reading. If you like it, please do share the article by following the below social links and any comments or suggestions are welcome in the comments sections! 

Happy Learning !!

NNK

SparkByExamples.com is a BigData and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment using Scala and Maven.

Leave a Reply