Spark DataFrame Where Filter | Multiple Conditions

Spark filter() or where() function is used to filter the rows from DataFrame or Dataset based on the given one or multiple conditions or SQL expression. You can use where() operator instead of the filter if you are coming from SQL background. Both these functions operate exactly the same.

If you wanted to ignore rows with NULL values, please refer to Spark filter Rows with NULL values article.

In this Spark article, you will learn how to apply where filter on primitive data types, arrays, struct using single and multiple conditions on DataFrame with Scala examples.

1. Spark DataFrame filter() Syntaxes


1) filter(condition: Column): Dataset[T]
2) filter(conditionExpr: String): Dataset[T] //using SQL expression
3) filter(func: T => Boolean): Dataset[T]
4) filter(func: FilterFunction[T]): Dataset[T]

Using the first signature you can refer Column names using one of the following syntaxes $colname, col("colname"), 'colname and df("colname") with condition expression.

The second signature will be used to provide SQL expressions to filter() rows.

The third signature is used with SQL functions where the function applied on each row.

The fourth signature is used with FilterFunction class.

Before we start with examples, first let’s create a DataFrame.


val arrayStructureData = Seq(
    Row(Row("James","","Smith"),List("Java","Scala","C++"),"OH","M"),
    Row(Row("Anna","Rose",""),List("Spark","Java","C++"),"NY","F"),
    Row(Row("Julia","","Williams"),List("CSharp","VB"),"OH","F"),
    Row(Row("Maria","Anne","Jones"),List("CSharp","VB"),"NY","M"),
    Row(Row("Jen","Mary","Brown"),List("CSharp","VB"),"NY","M"),
    Row(Row("Mike","Mary","Williams"),List("Python","VB"),"OH","M")
)

val arrayStructureSchema = new StructType()
    .add("name",new StructType()
    .add("firstname",StringType)
    .add("middlename",StringType)
    .add("lastname",StringType))
    .add("languages", ArrayType(StringType))
    .add("state", StringType)
    .add("gender", StringType)

val df = spark.createDataFrame(
    spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
df.printSchema()
df.show()

This yields below schema and DataFrame results.


root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    [James, , Smith]|[Java, Scala, C++]|   OH|     M|
|      [Anna, Rose, ]|[Spark, Java, C++]|   NY|     F|
| [Julia, , Williams]|      [CSharp, VB]|   OH|     F|
|[Maria, Anne, Jones]|      [CSharp, VB]|   NY|     M|
|  [Jen, Mary, Brown]|      [CSharp, VB]|   NY|     M|
|[Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+

2. DataFrame filter() with Column condition

Use Column with the condition to filter the rows from DataFrame, using this you can express complex condition by referring column names using col(name), $"colname" dfObject("colname") , this approach is mostly used while working with DataFrames. Use “===” for comparison.


df.filter(df("state") === "OH").show(false)

This yields below DataFrame results.


+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|[James, , Smith]      |[Java, Scala, C++]|OH   |M     |
|[Julia, , Williams]   |[CSharp, VB]      |OH   |F     |
|[Mike, Mary, Williams]|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+

Alternatively, you can also write this statement as follows. All these functions return the same result and performance.


df.filter('state === "OH").show(false)
df.filter($state === "OH").show(false)
df.filter(col("state") === "OH").show(false)
df.where(df("state") === "OH").show(false)
df.where('state === "OH").show(false)
df.where($state === "OH").show(false)
df.where(col("state") === "OH").show(false)

3. DataFrame filter() with SQL Expression

If you are coming from SQL background, you can use that knowledge in Spark to filter DataFrame rows with SQL expressions.


df.filter("gender == 'M'").show(false)
df.where("gender == 'M'").show(false)

This yields below DataFrame results.


+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|[James, , Smith]      |[Java, Scala, C++]|OH   |M     |
|[Maria, Anne, Jones]  |[CSharp, VB]      |NY   |M     |
|[Jen, Mary, Brown]    |[CSharp, VB]      |NY   |M     |
|[Mike, Mary, Williams]|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+

4. Filter with Multiple Conditions

To filter() rows on Spark DataFrame based on multiple conditions using AND(&&), OR(||), and NOT(!), you case use either Column with a condition or SQL expression as explained above. Below is just a simple example, you can extend this with AND(&&), OR(||), and NOT(!) conditional expressions as needed.


//multiple condition
df.filter(df("state") === "OH" && df("gender") === "M")
    .show(false)

This yields below DataFrame results.


+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|[James, , Smith]      |[Java, Scala, C++]|OH   |M     |
|[Mike, Mary, Williams]|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+

5. Filter on an Array Column

When you want to filter rows from DataFrame based on value present in an array collection column, you can use the first syntax. The below example uses array_contains() Spark SQL function which checks if a value contains in an array if present it returns true otherwise false.


import org.apache.spark.sql.functions.array_contains
df.filter(array_contains(df("languages"),"Java"))
    .show(false)

This yields below DataFrame results.


+----------------+------------------+-----+------+
|name            |languages         |state|gender|
+----------------+------------------+-----+------+
|[James, , Smith]|[Java, Scala, C++]|OH   |M     |
|[Anna, Rose, ]  |[Spark, Java, C++]|NY   |F     |
+----------------+------------------+-----+------+

6. Filter on Nested Struct columns

If your DataFrame consists of nested struct columns, you can use any of the above syntaxes to filter the rows based on the nested column.


  //Struct condition
df.filter(df("name.lastname") === "Williams")
    .show(false)

This yields below DataFrame results


+----------------------+------------+-----+------+
|name                  |languages   |state|gender|
+----------------------+------------+-----+------+
|[Julia, , Williams]   |[CSharp, VB]|OH   |F     |
|[Mike, Mary, Williams]|[Python, VB]|OH   |M     |
+----------------------+------------+-----+------+

7. Source code of Spark DataFrame Where Filter


package com.sparkbyexamples.spark.dataframe

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{ArrayType, StringType, StructType}
import org.apache.spark.sql.functions.array_contains

object FilterExample extends App{
  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  val arrayStructureData = Seq(
    Row(Row("James","","Smith"),List("Java","Scala","C++"),"OH","M"),
    Row(Row("Anna","Rose",""),List("Spark","Java","C++"),"NY","F"),
    Row(Row("Julia","","Williams"),List("CSharp","VB"),"OH","F"),
    Row(Row("Maria","Anne","Jones"),List("CSharp","VB"),"NY","M"),
    Row(Row("Jen","Mary","Brown"),List("CSharp","VB"),"NY","M"),
    Row(Row("Mike","Mary","Williams"),List("Python","VB"),"OH","M")
  )

  val arrayStructureSchema = new StructType()
    .add("name",new StructType()
      .add("firstname",StringType)
      .add("middlename",StringType)
      .add("lastname",StringType))
    .add("languages", ArrayType(StringType))
    .add("state", StringType)
    .add("gender", StringType)

  val df = spark.createDataFrame(
   spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
  df.printSchema()
  df.show()

  //Condition
  df.filter(df("state") === "OH")
    .show(false)

  //SQL Expression
  df.filter("gender == 'M'")
    .show(false)

  //multiple condition
  df.filter(df("state") === "OH" && df("gender") === "M")
    .show(false)

  //Array condition
  df.filter(array_contains(df("languages"),"Java"))
    .show(false)

  //Struct condition
  df.filter(df("name.lastname") === "Williams")
    .show(false)
}

Examples explained here are also available at GitHub project for reference.

8. Conclusion

In this tutorial, I’ve explained how to filter rows from Spark DataFrame based on single or multiple conditions and SQL expression, also learned filtering rows by providing conditions on the array and struct column with Scala examples.

Alternatively, you can also use where() function to filter the rows on DataFrame.

Thanks for reading. If you like it, please do share the article by following the below social links and any comments or suggestions are welcome in the comments sections! 

Happy Learning !!

Related Articles

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

This Post Has 3 Comments

  1. sam

    Thanks

  2. Andrea Bisello

    Hi
    using where it looks like it making a full scan on a huge dynamodb table, avoiding using the indexes to find and filter the rows. is this true? any suggestion? thanks

  3. Anonymous

    Is it possible to change the change column type from string to number and filter based on values greater than or lesser than?

Leave a Reply