Fonctions filter where en PySpark | Conditions Multiples

La fonction PySpark filter() est utilisée pour filtrer les lignes du RDD/DataFrame basées sur une condition ou une expression SQL. Si vous avez l’habitude de travailler avec SQL, vous pouvez également utiliser la clause where() à la place de filter().
Les deux fonctions fonctionnent exactement de la même manière.

Dans cet article PySpark, vous allez apprendre comment appliquer la fonction filter sur des colonnes de types string/arrays/struct de DataFrame, le tout en utilisant des conditions simples et multiples. Vous saurez également appliquer un filtre en utilisant isin() via des examples PySpark (Python Spark).

Articles Connexes (disponibles en anglais) :

Note: Les fonctions Columns de PySpark fournissent plusieurs options qui peuvent être utilisées avec filter().

1. Syntaxe de la fonction filter() de Dataframe en PySpark

Ci-dessous se trouve la syntaxe de la fonction filter. condition correspond à l’expression utilisée pour appliquer le filtre.

filter(condition)

Avant de voir les examples, commençons par créer un DataFrame.
Ici, j’utilise un Dataframe avec des colonnes StructType et ArrayType. Ainsi vous y trouverez également des examples avec des types StructType et ArrayType.

from pyspark.sql.types import StructType,StructField 
from pyspark.sql.types import StringType, IntegerType, ArrayType

data = [
    (("James","","Smith"),["Java","Scala","C++"],"OH","M"),
    (("Anna","Rose",""),["Spark","Java","C++"],"NY","F"),
    (("Julia","","Williams"),["CSharp","VB"],"OH","F"),
    (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"),
    (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"),
    (("Mike","Mary","Williams"),["Python","VB"],"OH","M")
 ]
        
schema = StructType([
     StructField('name', StructType([
        StructField('firstname', StringType(), True),
        StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
     ])),
     StructField('languages', ArrayType(StringType()), True),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True)
 ])

df = spark.createDataFrame(data = data, schema = schema)
df.printSchema()
df.show(truncate=False)

Le résultat de l’affichage du schéma et le résultat du DataFrame sont les suivants :

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|[James, , Smith]      |[Java, Scala, C++]|OH   |M     |
|[Anna, Rose, ]        |[Spark, Java, C++]|NY   |F     |
|[Julia, , Williams]   |[CSharp, VB]      |OH   |F     |
|[Maria, Anne, Jones]  |[CSharp, VB]      |NY   |M     |
|[Jen, Mary, Brown]    |[CSharp, VB]      |NY   |M     |
|[Mike, Mary, Williams]|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+

2. Filter() avec une condition de Column

Utilisez la classe Column avec une condition pour filtrer les lignes voulues d’un DataFrame. Cela vous permet d’exprimer des conditions complexes en faisant référence aux noms de colonnes via dfObject.colname.

# En utilisant la condition "est égal à" (==)
df.filter(df.state == "OH").show(truncate=False)

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|[James, , Smith]      |[Java, Scala, C++]|OH   |M     |
|[Julia, , Williams]   |[CSharp, VB]      |OH   |F     |
|[Mike, Mary, Williams]|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+

# En utilisant la condition "est différent de" (!=)
df.filter(df.state != "OH") \
    .show(truncate=False) 
df.filter(~(df.state == "OH")) \
    .show(truncate=False)

Le même example peut également être réécrit comme suit.
Afin de l’utiliser, il est est nécessaire d’importer la fonction col : from pyspark.sql.functions import col.

#Using SQL col() function
from pyspark.sql.functions import col
df.filter(col("state") == "OH") \
    .show(truncate=False) 

3. Effectuer un filter() de DataFrame avec des expressions SQL

Pour les habitués de SQL, vous pouvez réutiliser vos connaissances afin de filtrer les lignes d’un DataFrame en PySpark avec des expressions SQL.

# En utilisant les expressions SQL
df.filter("gender == 'M'").show()
#For not equal
df.filter("gender != 'M'").show()
df.filter("gender <> 'M'").show()

4. Effectuer un filter() avec plusieurs conditions

Pour filtrer avec filter() sur les lignes de DataFrame avec plusieurs conditions, vous pouvez soit utiliser Column avec une condition ou bien en utilisant une expression SQL. Ci-dessous est un example basique en utilisant la condition ET (&). Libre à vous de l’étendre avec les conditions OU (|), et NON (!) si besoin.

// Filter avec conditions multiples
df.filter( (df.state  == "OH") & (df.gender  == "M") ) \
    .show(truncate=False)  

Le code a pour sortie le résultat ci-dessous.

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|[James, , Smith]      |[Java, Scala, C++]|OH   |M     |
|[Mike, Mary, Williams]|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+

5. Filtrer sur une liste de valeurs

Si vous avez une liste d’éléments et que vous souhaitez filtrer ce qui n’est ou n’est pas dans la liste, vous pouvez utiliser la fonction isin() de la classe Column. Vous pouvez en effet réutiliser la même fonction sans passer par l’autre fonction isnotin() si vous utiliser l’opérateur non (~).

#Filter avec IS IN et une liste de valeurs
li=["OH","CA","DE"]
df.filter(df.state.isin(li)).show()
+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    [James, , Smith]|[Java, Scala, C++]|   OH|     M|
| [Julia, , Williams]|      [CSharp, VB]|   OH|     F|
|[Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+

# Filter avec NOT IS IN et une liste de valeurs
#These show all records with NY (NY is not part of the list)
df.filter(~df.state.isin(li)).show()
df.filter(df.state.isin(li)==False).show()

6. Filtrer en utilisant Starts With, Ends With, Contains

Une autre manière de filtrer les lignes du DataFrame est d’utiliser les méthodes startswith()endswith() and contains() de la classe Column. Elles permettent respectivement de filtrer par “commencer par”, “terminer par”, ou “contenant”. Pour plus d’example sur la classe Column, se référer à PySpark Column Functions.

# Avec startswith
df.filter(df.state.startswith("N")).show()
+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|      [Anna, Rose, ]|[Spark, Java, C++]|   NY|     F|
|[Maria, Anne, Jones]|      [CSharp, VB]|   NY|     M|
|  [Jen, Mary, Brown]|      [CSharp, VB]|   NY|     M|
+--------------------+------------------+-----+------+

# Avec endswith
df.filter(df.state.endswith("H")).show()

# Avec contains
df.filter(df.state.contains("H")).show()

7. Filtrer avec like and rlike

Si vous êtes habitués à SQL, vous devez sûrement être familiés avec like and rlike (regex like). Par chance, PySpark fournit également des méthodes similaires dans la classe Column class pour filtrer des valeurs similaires en utilisant des caractères génériques. Vous pouvez utiliser rlike() pour filtrer en vérifiant les valeurs insensibles à la casse.

data2 = [(2,"Michael Rose"),(3,"Robert Williams"),
     (4,"Rames Rose"),(5,"Rames rose")
  ]
df2 = spark.createDataFrame(data = data2, schema = ["id","name"])

# like - Pattern SQL LIKE
df2.filter(df2.name.like("%rose%")).show()
+---+----------+
| id|      name|
+---+----------+
|  5|Rames rose|
+---+----------+

# rlike - Pattern SQL RLIKE (LIKE avec Regex)
#This check case insensitive
df2.filter(df2.name.rlike("(?i)^*rose$")).show()
+---+------------+
| id|        name|
+---+------------+
|  2|Michael Rose|
|  4|  Rames Rose|
|  5|  Rames rose|

8. Filtrer sur une colonne de Array

Lorsque vous voulez filtrer les lignes d’un DataFrame en fonction des valeurs présentes dans une colonne de tableau, vous pouvez utiliser la première syntaxe. L’example ci-dessous utilises array_contains() des fonctions fonctions Pyspark SQL qui vérifie si une valeur contenue dans un tableau est présente. Retourne true si présent, sinon false.

from pyspark.sql.functions import array_contains
df.filter(array_contains(df.languages,"Java")) \
    .show(truncate=False)     

Le code produit le résultat suivant.

+----------------+------------------+-----+------+
|name            |languages         |state|gender|
+----------------+------------------+-----+------+
|[James, , Smith]|[Java, Scala, C++]|OH   |M     |
|[Anna, Rose, ]  |[Spark, Java, C++]|NY   |F     |
+----------------+------------------+-----+------+

9. Filtrer sur les “nested struct columns”

Si votre DataFrame est composé d’une structure de colonnes imbriquées (nested struct columns), vous pouvez utilisez n’importe laquelle des syntaxes ci-dessus pour filtrer les lignes en fonction des colonnes imbriquées.

  // Condition struct
df.filter(df.name.lastname == "Williams") \
    .show(truncate=False) 

Le résultat en sortie est le suivant.

+----------------------+------------+-----+------+
|name                  |languages   |state|gender|
+----------------------+------------+-----+------+
|[Julia, , Williams]   |[CSharp, VB]|OH   |F     |
|[Mike, Mary, Williams]|[Python, VB]|OH   |M     |
+----------------------+------------+-----+------+

10. Code source PySpark de filter / where

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayType
from pyspark.sql.functions import col,array_contains

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

arrayStructureData = [
        (("James","","Smith"),["Java","Scala","C++"],"OH","M"),
        (("Anna","Rose",""),["Spark","Java","C++"],"NY","F"),
        (("Julia","","Williams"),["CSharp","VB"],"OH","F"),
        (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"),
        (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"),
        (("Mike","Mary","Williams"),["Python","VB"],"OH","M")
        ]
        
arrayStructureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('languages', ArrayType(StringType()), True),
         StructField('state', StringType(), True),
         StructField('gender', StringType(), True)
         ])


df = spark.createDataFrame(data = arrayStructureData, schema = arrayStructureSchema)
df.printSchema()
df.show(truncate=False)

df.filter(df.state == "OH") \
    .show(truncate=False)

df.filter(col("state") == "OH") \
    .show(truncate=False)    
    
df.filter("gender  == 'M'") \
    .show(truncate=False)    

df.filter( (df.state  == "OH") & (df.gender  == "M") ) \
    .show(truncate=False)        

df.filter(array_contains(df.languages,"Java")) \
    .show(truncate=False)        

df.filter(df.name.lastname == "Williams") \
    .show(truncate=False) 

Les examples présents sont également disponibles sur le projet PySpark examples GitHub en référence.

11. Conclusion

Dans ce tutoriel, nous avons vu :

  • Comment filtrer les lignes d’un DataFrame PySpark en fonction de conditions simples / multiples et d’expressions SQL
  • Comment filtrer les lignes en appliquant des conditions sur le tableau et les colonnes avec Spark en Python
  • Le tout avec des examples

En alternative, vous pouvez aussi utiliser la fonction where() pour filter les lignes de DataFrame PySpark.

Bon apprentissage !!

Leave a Reply