La fonction PySpark filter()
est utilisée pour filtrer les lignes du RDD/DataFrame basées sur une condition ou une expression SQL. Si vous avez l’habitude de travailler avec SQL, vous pouvez également utiliser la clause where()
à la place de filter().
Les deux fonctions fonctionnent exactement de la même manière.
Dans cet article PySpark, vous allez apprendre comment appliquer la fonction filter sur des colonnes de types string/arrays/struct de DataFrame, le tout en utilisant des conditions simples et multiples. Vous saurez également appliquer un filtre en utilisant isin()
via des examples PySpark (Python Spark).
Articles Connexes (disponibles en anglais) :
- How to Filter Rows with NULL/NONE (IS NULL & IS NOT NULL) in PySpark
- Spark Filter – startsWith(), endsWith() Examples
- Spark Filter – contains(), like(), rlike() Examples
Note: Les fonctions Columns de PySpark fournissent plusieurs options qui peuvent être utilisées avec filter().
1. Syntaxe de la fonction filter() de Dataframe en PySpark
Ci-dessous se trouve la syntaxe de la fonction filter. condition correspond à l’expression utilisée pour appliquer le filtre.
filter(condition)
Avant de voir les examples, commençons par créer un DataFrame.
Ici, j’utilise un Dataframe avec des colonnes StructType et ArrayType. Ainsi vous y trouverez également des examples avec des types StructType et ArrayType.
from pyspark.sql.types import StructType,StructField from pyspark.sql.types import StringType, IntegerType, ArrayType data = [ (("James","","Smith"),["Java","Scala","C++"],"OH","M"), (("Anna","Rose",""),["Spark","Java","C++"],"NY","F"), (("Julia","","Williams"),["CSharp","VB"],"OH","F"), (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"), (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"), (("Mike","Mary","Williams"),["Python","VB"],"OH","M") ] schema = StructType([ StructField('name', StructType([ StructField('firstname', StringType(), True), StructField('middlename', StringType(), True), StructField('lastname', StringType(), True) ])), StructField('languages', ArrayType(StringType()), True), StructField('state', StringType(), True), StructField('gender', StringType(), True) ]) df = spark.createDataFrame(data = data, schema = schema) df.printSchema() df.show(truncate=False)
Le résultat de l’affichage du schéma et le résultat du DataFrame sont les suivants :
root |-- name: struct (nullable = true) | |-- firstname: string (nullable = true) | |-- middlename: string (nullable = true) | |-- lastname: string (nullable = true) |-- languages: array (nullable = true) | |-- element: string (containsNull = true) |-- state: string (nullable = true) |-- gender: string (nullable = true) +----------------------+------------------+-----+------+ |name |languages |state|gender| +----------------------+------------------+-----+------+ |[James, , Smith] |[Java, Scala, C++]|OH |M | |[Anna, Rose, ] |[Spark, Java, C++]|NY |F | |[Julia, , Williams] |[CSharp, VB] |OH |F | |[Maria, Anne, Jones] |[CSharp, VB] |NY |M | |[Jen, Mary, Brown] |[CSharp, VB] |NY |M | |[Mike, Mary, Williams]|[Python, VB] |OH |M | +----------------------+------------------+-----+------+
2. Filter() avec une condition de Column
Utilisez la classe Column avec une condition pour filtrer les lignes voulues d’un DataFrame. Cela vous permet d’exprimer des conditions complexes en faisant référence aux noms de colonnes via dfObject.colname
.
# En utilisant la condition "est égal à" (==) df.filter(df.state == "OH").show(truncate=False) +----------------------+------------------+-----+------+ |name |languages |state|gender| +----------------------+------------------+-----+------+ |[James, , Smith] |[Java, Scala, C++]|OH |M | |[Julia, , Williams] |[CSharp, VB] |OH |F | |[Mike, Mary, Williams]|[Python, VB] |OH |M | +----------------------+------------------+-----+------+ # En utilisant la condition "est différent de" (!=) df.filter(df.state != "OH") \ .show(truncate=False) df.filter(~(df.state == "OH")) \ .show(truncate=False)
Le même example peut également être réécrit comme suit.
Afin de l’utiliser, il est est nécessaire d’importer la fonction col : from pyspark.sql.functions import col
.
#Using SQL col() function from pyspark.sql.functions import col df.filter(col("state") == "OH") \ .show(truncate=False)
3. Effectuer un filter() de DataFrame avec des expressions SQL
Pour les habitués de SQL, vous pouvez réutiliser vos connaissances afin de filtrer les lignes d’un DataFrame en PySpark avec des expressions SQL.
# En utilisant les expressions SQL df.filter("gender == 'M'").show() #For not equal df.filter("gender != 'M'").show() df.filter("gender <> 'M'").show()
4. Effectuer un filter() avec plusieurs conditions
Pour filtrer avec filter()
sur les lignes de DataFrame avec plusieurs conditions, vous pouvez soit utiliser Column
avec une condition ou bien en utilisant une expression SQL. Ci-dessous est un example basique en utilisant la condition ET (&). Libre à vous de l’étendre avec les conditions OU (|), et NON (!) si besoin.
// Filter avec conditions multiples df.filter( (df.state == "OH") & (df.gender == "M") ) \ .show(truncate=False)
Le code a pour sortie le résultat ci-dessous.
+----------------------+------------------+-----+------+ |name |languages |state|gender| +----------------------+------------------+-----+------+ |[James, , Smith] |[Java, Scala, C++]|OH |M | |[Mike, Mary, Williams]|[Python, VB] |OH |M | +----------------------+------------------+-----+------+
5. Filtrer sur une liste de valeurs
Si vous avez une liste d’éléments et que vous souhaitez filtrer ce qui n’est ou n’est pas dans la liste, vous pouvez utiliser la fonction isin()
de la classe Column. Vous pouvez en effet réutiliser la même fonction sans passer par l’autre fonction isnotin() si vous utiliser l’opérateur non (~).
#Filter avec IS IN et une liste de valeurs li=["OH","CA","DE"] df.filter(df.state.isin(li)).show() +--------------------+------------------+-----+------+ | name| languages|state|gender| +--------------------+------------------+-----+------+ | [James, , Smith]|[Java, Scala, C++]| OH| M| | [Julia, , Williams]| [CSharp, VB]| OH| F| |[Mike, Mary, Will...| [Python, VB]| OH| M| +--------------------+------------------+-----+------+ # Filter avec NOT IS IN et une liste de valeurs #These show all records with NY (NY is not part of the list) df.filter(~df.state.isin(li)).show() df.filter(df.state.isin(li)==False).show()
6. Filtrer en utilisant Starts With, Ends With, Contains
Une autre manière de filtrer les lignes du DataFrame est d’utiliser les méthodes startswith()
, endswith()
and contains()
de la classe Column. Elles permettent respectivement de filtrer par “commencer par”, “terminer par”, ou “contenant”. Pour plus d’example sur la classe Column, se référer à PySpark Column Functions.
# Avec startswith df.filter(df.state.startswith("N")).show() +--------------------+------------------+-----+------+ | name| languages|state|gender| +--------------------+------------------+-----+------+ | [Anna, Rose, ]|[Spark, Java, C++]| NY| F| |[Maria, Anne, Jones]| [CSharp, VB]| NY| M| | [Jen, Mary, Brown]| [CSharp, VB]| NY| M| +--------------------+------------------+-----+------+ # Avec endswith df.filter(df.state.endswith("H")).show() # Avec contains df.filter(df.state.contains("H")).show()
7. Filtrer avec like and rlike
Si vous êtes habitués à SQL, vous devez sûrement être familiés avec like
and rlike
(regex like). Par chance, PySpark fournit également des méthodes similaires dans la classe Column class pour filtrer des valeurs similaires en utilisant des caractères génériques. Vous pouvez utiliser rlike() pour filtrer en vérifiant les valeurs insensibles à la casse.
data2 = [(2,"Michael Rose"),(3,"Robert Williams"), (4,"Rames Rose"),(5,"Rames rose") ] df2 = spark.createDataFrame(data = data2, schema = ["id","name"]) # like - Pattern SQL LIKE df2.filter(df2.name.like("%rose%")).show() +---+----------+ | id| name| +---+----------+ | 5|Rames rose| +---+----------+ # rlike - Pattern SQL RLIKE (LIKE avec Regex) #This check case insensitive df2.filter(df2.name.rlike("(?i)^*rose$")).show() +---+------------+ | id| name| +---+------------+ | 2|Michael Rose| | 4| Rames Rose| | 5| Rames rose|
8. Filtrer sur une colonne de Array
Lorsque vous voulez filtrer les lignes d’un DataFrame en fonction des valeurs présentes dans une colonne de tableau, vous pouvez utiliser la première syntaxe. L’example ci-dessous utilises array_contains()
des fonctions fonctions Pyspark SQL qui vérifie si une valeur contenue dans un tableau est présente. Retourne true si présent, sinon false.
from pyspark.sql.functions import array_contains df.filter(array_contains(df.languages,"Java")) \ .show(truncate=False)
Le code produit le résultat suivant.
+----------------+------------------+-----+------+ |name |languages |state|gender| +----------------+------------------+-----+------+ |[James, , Smith]|[Java, Scala, C++]|OH |M | |[Anna, Rose, ] |[Spark, Java, C++]|NY |F | +----------------+------------------+-----+------+
9. Filtrer sur les “nested struct columns”
Si votre DataFrame est composé d’une structure de colonnes imbriquées (nested struct columns), vous pouvez utilisez n’importe laquelle des syntaxes ci-dessus pour filtrer les lignes en fonction des colonnes imbriquées.
// Condition struct df.filter(df.name.lastname == "Williams") \ .show(truncate=False)
Le résultat en sortie est le suivant.
+----------------------+------------+-----+------+ |name |languages |state|gender| +----------------------+------------+-----+------+ |[Julia, , Williams] |[CSharp, VB]|OH |F | |[Mike, Mary, Williams]|[Python, VB]|OH |M | +----------------------+------------+-----+------+
10. Code source PySpark de filter / where
import pyspark from pyspark.sql import SparkSession from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayType from pyspark.sql.functions import col,array_contains spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate() arrayStructureData = [ (("James","","Smith"),["Java","Scala","C++"],"OH","M"), (("Anna","Rose",""),["Spark","Java","C++"],"NY","F"), (("Julia","","Williams"),["CSharp","VB"],"OH","F"), (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"), (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"), (("Mike","Mary","Williams"),["Python","VB"],"OH","M") ] arrayStructureSchema = StructType([ StructField('name', StructType([ StructField('firstname', StringType(), True), StructField('middlename', StringType(), True), StructField('lastname', StringType(), True) ])), StructField('languages', ArrayType(StringType()), True), StructField('state', StringType(), True), StructField('gender', StringType(), True) ]) df = spark.createDataFrame(data = arrayStructureData, schema = arrayStructureSchema) df.printSchema() df.show(truncate=False) df.filter(df.state == "OH") \ .show(truncate=False) df.filter(col("state") == "OH") \ .show(truncate=False) df.filter("gender == 'M'") \ .show(truncate=False) df.filter( (df.state == "OH") & (df.gender == "M") ) \ .show(truncate=False) df.filter(array_contains(df.languages,"Java")) \ .show(truncate=False) df.filter(df.name.lastname == "Williams") \ .show(truncate=False)
Les examples présents sont également disponibles sur le projet PySpark examples GitHub en référence.
11. Conclusion
Dans ce tutoriel, nous avons vu :
- Comment filtrer les lignes d’un DataFrame PySpark en fonction de conditions simples / multiples et d’expressions SQL
- Comment filtrer les lignes en appliquant des conditions sur le tableau et les colonnes avec Spark en Python
- Le tout avec des examples
En alternative, vous pouvez aussi utiliser la fonction where()
pour filter les lignes de DataFrame PySpark.
Bon apprentissage !!
Related Articles
- Pyspark: Exception: Java gateway process exited before sending the driver its port number
- PySpark Drop Rows with NULL or None Values
- PySpark split() Column into Multiple Columns
- PySpark – Drop One or Multiple Columns From DataFrame
- PySpark JSON Functions with Examples
- PySpark SQL expr() (Expression ) Function
- PySpark Window Functions
- PySpark Aggregate Functions with Examples