You are currently viewing Spark RDD filter() with examples

Spark RDD filter is an operation that creates a new RDD by selecting the elements from the input RDD that satisfy a given predicate (or condition). The filter operation does not modify the original RDD but creates a new RDD with the filtered elements. In this article, we shall discuss the syntax of Spark RDD Filter and different patterns to apply it.

Advertisements

1. Syntax of Spark RDD Filter

The syntax for the RDD filter in Spark using Scala is:


// Syntax of RDD filter()
val filteredRDD = inputRDD.filter(predicate)

Here, inputRDD is the RDD to be filtered and predicate is a function that takes an element from the RDD and returns a boolean value indicating whether the element satisfies the filtering condition. The filteredRDD is the resulting RDD containing only the elements that satisfy the predicate.

For example, suppose we have an RDD of integers and we want to filter out the even numbers. We can use the filter operation as follows:


// Import
import org.apache.spark.sql.SparkSession

// Create SparkSession
val spark = SparkSession.builder()
        .appName("Creating DataFrame")
        .master("local[*]")
        .getOrCreate()

// Create RDD
val inputRDD = spark.sparkContext
        .parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))

// RDD filter() usage
val filteredRDD = inputRDD.filter(x => x % 2 != 0)

In this example, we create an input RDD with ten integers, and then we apply the filter operation with the predicate x % 2 != 0 to select only the odd numbers. The resulting filteredRDD will contain the elements 1, 3, 5, 7, 9.

2. Spark RDD Filter Examples

Following are some more examples of using RDD filter().

2.1 Filter based on a condition using a lambda function

First, let’s see how to filter RDD by using lambda function.


val rdd = spark.sparkContext
        .parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
val filteredRDD = rdd.filter(x => x % 2 == 0)

Here, the Spark RDD filter will create an RDD containing only the even numbers (2, 4, 6, 8, 10).

2.2 Filter based on multiple conditions

Below is an example of filtering Spark RDD on multiple conditions.


val rdd = spark.sparkContext
      .parallelize(List("apple", "banana", "pear", "orange"))
val filteredRDD = rdd.filter(x => x.startsWith("a") || x.startsWith("o"))

This will create an RDD containing only the strings starting with “a” or “o” (“apple” and “orange”).

2.3 Filter based on the presence of a certain substring


val rdd = spark.sparkContext
        .parallelize(List("apple", "banana", "pear", "orange"))
val filteredRDD = rdd.filter(x => x.contains("p"))

This will create an RDD containing only the strings containing the letter “p” (“apple”, “pear”, and “orange”).

2.4 Filter based on the length of a string


vval rdd = spark.sparkContext
          .parallelize(List("apple", "banana", "pear", "orange"))
val filteredRDD = rdd.filter(x => x.length > 5)

This will create an RDD containing only the strings with lengths greater than 5 (“banana” and “orange”).

2.5 Filter based on a regular expression


val rdd = spark.sparkContext
        .parallelize(List("apple", "banana", "pear", "orange"))
val filteredRDD = rdd.filter(x => x.matches(".*a.*"))

Spark RDD will create an RDD containing only the strings containing the letter “a” (“apple” and “banana”).

3. Conclusion

In conclusion, the Spark RDD filter is a transformation operation that allows you to create a new RDD by selecting only the elements from an existing RDD that meet a specific condition. This operation is efficient because it leverages the distributed nature of Spark to parallelize the filtering process across multiple nodes.

Related Articles

rimmalapudi

Data Engineer. I write about BigData Architecture, tools and techniques that are used to build Bigdata pipelines and other generic blogs.