Spark filter()
or where()
function filters the rows from DataFrame or Dataset based on the given one or multiple conditions. You can use where()
operator instead of the filter if you come from an SQL background. Both these functions operate exactly the same. Besides these DataFrame functions, you can also use SQL expression to filter rows.
If you want to ignore rows with NULL values, please refer to the Spark filter Rows with NULL values article.
In this Spark article, you will learn how to apply where filter on primitive data types, arrays, and struct using single and multiple conditions on DataFrame with Scala examples.
1. Spark filter() Syntaxes
Following are the multiple syntaxes of filter().
// Spark DataFrame filter() Syntaxes
1) filter(condition: Column): Dataset[T]
2) filter(conditionExpr: String): Dataset[T]
3) filter(func: T => Boolean): Dataset[T]
4) filter(func: FilterFunction[T]): Dataset[T]
Using the first signature, you can refer Column
names using one of the following syntaxes $colname
, col("colname")
, 'colname
and df("colname")
with condition expression.
The second signature will be used to provide SQL expressions to filter() rows.
The third signature is used with SQL functions where the function is applied on each row.
The fourth signature is used with FilterFunction
class.
Before we start with examples, let’s create a data frame.
// Import
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{ArrayType, StringType, StructType}
// Prepare Data
val arrayStructureData = Seq(
Row(Row("James","","Smith"),List("Java","Scala","C++"),"OH","M"),
Row(Row("Anna","Rose",""),List("Spark","Java","C++"),"NY","F"),
Row(Row("Julia","","Williams"),List("CSharp","VB"),"OH","F"),
Row(Row("Maria","Anne","Jones"),List("CSharp","VB"),"NY","M"),
Row(Row("Jen","Mary","Brown"),List("CSharp","VB"),"NY","M"),
Row(Row("Mike","Mary","Williams"),List("Python","VB"),"OH","M")
)
// Create StructType ofr schema
val arrayStructureSchema = new StructType()
.add("name",new StructType()
.add("firstname",StringType)
.add("middlename",StringType)
.add("lastname",StringType))
.add("languages", ArrayType(StringType))
.add("state", StringType)
.add("gender", StringType)
// Create DataFrame
val df = spark.createDataFrame(
spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
df.printSchema()
df.show()
This yields below schema and DataFrame results.
// Output:
root
|-- name: struct (nullable = true)
| |-- firstname: string (nullable = true)
| |-- middlename: string (nullable = true)
| |-- lastname: string (nullable = true)
|-- languages: array (nullable = true)
| |-- element: string (containsNull = true)
|-- state: string (nullable = true)
|-- gender: string (nullable = true)
+--------------------+------------------+-----+------+
| name| languages|state|gender|
+--------------------+------------------+-----+------+
| [James, , Smith]|[Java, Scala, C++]| OH| M|
| [Anna, Rose, ]|[Spark, Java, C++]| NY| F|
| [Julia, , Williams]| [CSharp, VB]| OH| F|
|[Maria, Anne, Jones]| [CSharp, VB]| NY| M|
| [Jen, Mary, Brown]| [CSharp, VB]| NY| M|
|[Mike, Mary, Will...| [Python, VB]| OH| M|
+--------------------+------------------+-----+------+
2. filter() with Column condition
Use Column with the condition to filter the rows from Spark DataFrame, using this, you can express complex conditions by referring to column names using col(name)
, $"colname"
dfObject("colname")
, this approach is mostly used while working with DataFrames.
Use “===
” for comparison.
// Filter example
df.filter(df("state") === "OH").show(false)
This yields the below DataFrame results.
// Output:
+----------------------+------------------+-----+------+
|name |languages |state|gender|
+----------------------+------------------+-----+------+
|[James, , Smith] |[Java, Scala, C++]|OH |M |
|[Julia, , Williams] |[CSharp, VB] |OH |F |
|[Mike, Mary, Williams]|[Python, VB] |OH |M |
+----------------------+------------------+-----+------+
Alternatively, you can also write this statement as follows. All these functions return the same result and performance.
// Alternate ways
df.filter('state === "OH").show(false)
df.filter($state === "OH").show(false)
import org.apache.spark.sql.functions.col
df.filter(col("state") === "OH").show(false)
df.where(df("state") === "OH").show(false)
df.where('state === "OH").show(false)
df.where($state === "OH").show(false)
df.where(col("state") === "OH").show(false)
3. filter() with SQL Expression
If you have an SQL background, you can use that knowledge in Spark to filter DataFrame rows with SQL expressions.
// SQL like expression
df.filter("gender == 'M'").show(false)
df.where("gender == 'M'").show(false)
This yields the below DataFrame results.
// Output:
+----------------------+------------------+-----+------+
|name |languages |state|gender|
+----------------------+------------------+-----+------+
|[James, , Smith] |[Java, Scala, C++]|OH |M |
|[Maria, Anne, Jones] |[CSharp, VB] |NY |M |
|[Jen, Mary, Brown] |[CSharp, VB] |NY |M |
|[Mike, Mary, Williams]|[Python, VB] |OH |M |
+----------------------+------------------+-----+------+
4. Filter Values by Case Insensitive
To filter the DataFrame by ignoring cases (case-insensitive), first, convert the column values to lowercase using the lower() function and compare it with values in lowercase.
// Filter rows where 'state' column contains 'oh' (case-insensitive)
import org.apache.spark.sql.functions.expr
val filteredDF = df.filter(expr("lower(state)").contains("oh"))
filteredDF.show()
Here,
expr("lower(state)")
: This part creates a Spark SQL expression using theexpr()
function. It takes the values from the “state” column and converts them to lowercase using thelower()
function..contains("oh")
: After converting the values in the “state” column to lowercase, the.contains("oh")
function is applied. This function checks if the resulting lowercase values contain the substring “oh”.
5. Filter Using the Regex Pattern
To filter using regular expression, use rlike() function; this function is used to apply regular expression (regex) patterns to filter or search for specific patterns within DataFrame columns. It’s a DataFrame function that helps in pattern matching within string columns.
// Filter rows where 'state' column matches a regex pattern
import org.apache.spark.sql.functions.col
val filteredDF = df.filter(col("state").rlike("[AEIOU].*"))
filteredDF.show()
Here,
.rlike("[AEIOU].*")
: This part of the code applies a regular expression (regex) pattern to the values in the “state” column. The.rlike()
function checks whether the values in the column match the specified regex pattern."[AEIOU].*"
is a regex pattern that matches strings starting with any uppercase vowel (A, E, I, O, or U) followed by any characters (including none with.*
). Essentially, it looks for states whose names start with a vowel.
6. Filter with Multiple Conditions
To filter rows in Spark DataFrame based on multiple conditions, use AND(&&), OR(||), and NOT(!) logical operators; you can use either Column with a condition or SQL expression as explained above.
6.1 Using && (AND) to Filter on multiple conditions
Below is a simple example using the logical AND(&&) operator to check multiple conditions. In conditional statements, the &&
operator is used to ensure that multiple conditions must all be true
for the combined expression to be true
. If any of the conditions is false
, the overall result will be false
.
This filters the DataFrame df
based on two conditions combined using the logical AND operator (&&
).
// Filter with multiple conditions (using &&)
df.filter(df("state") === "OH" && df("gender") === "M")
.show(false)
This yields the below DataFrame results. It filters rows based on conditions applied to the “state” and “gender” columns simultaneously.
// Output:
+----------------------+------------------+-----+------+
|name |languages |state|gender|
+----------------------+------------------+-----+------+
|[James, , Smith] |[Java, Scala, C++]|OH |M |
|[Mike, Mary, Williams]|[Python, VB] |OH |M |
+----------------------+------------------+-----+------+
6.2 Using || (OR) to Filter on multiple conditions
Similarly, you can also use OR(||) operator. It returns true
if at least one of the expressions is true
, and false
if both expressions are false
. This filters the DataFrame df
based on two conditions combined using the logical OR operator (||
).
// Filter using ||
df.filter(df("state") === "OH" || df("gender") === "M")
.show(false)
I will leave this to you to run and explore the output.
7. Filter on an Array Column
When you want to filter rows from DataFrame based on the value present in an array collection column, you can use the first syntax. The example below uses array_contains()
Spark SQL function, which checks if a value contains in an array, if present, it returns true, otherwise false.
// Filter on an Array Column
import org.apache.spark.sql.functions.array_contains
df.filter(array_contains(df("languages"),"Java"))
.show(false)
This yields the below DataFrame results.
// Output:
+----------------+------------------+-----+------+
|name |languages |state|gender|
+----------------+------------------+-----+------+
|[James, , Smith]|[Java, Scala, C++]|OH |M |
|[Anna, Rose, ] |[Spark, Java, C++]|NY |F |
+----------------+------------------+-----+------+
8. Filter Rows by the Size of an Array
It filters rows based on a condition related to the size of an array column named “languages”. Here, size(col("languages")) > 2
: This condition checks whether the size (number of elements) in the “languages” array column is greater than 2.
// Filter by array size
df.filter(size(col("languages")) > 2).show()
This code snippet will produce a DataFrame that includes only the rows where the “languages” column contains more than two elements in its array structure.
9. Filter on Nested Struct columns
If your DataFrame consists of nested struct columns, you can use any of the above syntaxes to filter the rows based on the nested column.
// Struct condition
df.filter(df("name.lastname") === "Williams")
.show(false)
This yields the below DataFrame results
// Output:
+----------------------+------------+-----+------+
|name |languages |state|gender|
+----------------------+------------+-----+------+
|[Julia, , Williams] |[CSharp, VB]|OH |F |
|[Mike, Mary, Williams]|[Python, VB]|OH |M |
+----------------------+------------+-----+------+
10. Source code of Spark DataFrame Where Filter
package com.sparkbyexamples.spark.dataframe
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{ArrayType, StringType, StructType}
import org.apache.spark.sql.functions.array_contains
object FilterExample extends App{
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val arrayStructureData = Seq(
Row(Row("James","","Smith"),List("Java","Scala","C++"),"OH","M"),
Row(Row("Anna","Rose",""),List("Spark","Java","C++"),"NY","F"),
Row(Row("Julia","","Williams"),List("CSharp","VB"),"OH","F"),
Row(Row("Maria","Anne","Jones"),List("CSharp","VB"),"NY","M"),
Row(Row("Jen","Mary","Brown"),List("CSharp","VB"),"NY","M"),
Row(Row("Mike","Mary","Williams"),List("Python","VB"),"OH","M")
)
val arrayStructureSchema = new StructType()
.add("name",new StructType()
.add("firstname",StringType)
.add("middlename",StringType)
.add("lastname",StringType))
.add("languages", ArrayType(StringType))
.add("state", StringType)
.add("gender", StringType)
val df = spark.createDataFrame(
spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
df.printSchema()
df.show()
// Condition
df.filter(df("state") === "OH")
.show(false)
// SQL Expression
df.filter("gender == 'M'")
.show(false)
// Multiple condition
df.filter(df("state") === "OH" && df("gender") === "M")
.show(false)
// Array condition
df.filter(array_contains(df("languages"),"Java"))
.show(false)
// Struct condition
df.filter(df("name.lastname") === "Williams")
.show(false)
}
Examples explained here are also available at GitHub project for reference.
11. Conclusion
In this tutorial, I’ve explained how to filter rows from Spark DataFrame based on single or multiple conditions and SQL expressions. I also learned how to filter rows by providing conditions on the array and struct column with Scala examples.
Alternatively, you can also use where()
function to filter the rows on DataFrame.
Thanks for reading. Any comments or suggestions are welcome in the comments sections!
Happy Learning !!
Related Articles
- Spark Filter Using contains() Examples
- Spark Filter startsWith(), endsWith() Examples
- Spark Data Frame Where () To Filter Rows
- Spark Word Count Explained with Example
- Spark Write DataFrame into Single CSV File (merge multiple part files)
- Spark Join Multiple DataFrames | Tables
- Spark – Add New Column & Multiple Columns to DataFrame
- Spark Read multiline (multiple line) CSV File
- Spark – How to update the DataFrame column?
Though Filter() and where() appears to be same but filter does take function but where does not. Where requires condition Expressions only.
Amazing & useful content 🙂
Thanks
Hi
using where it looks like it making a full scan on a huge dynamodb table, avoiding using the indexes to find and filter the rows. is this true? any suggestion? thanks
Is it possible to change the change column type from string to number and filter based on values greater than or lesser than?