Duplicate rows could be remove or drop from Spark SQL DataFrame using distinct()
and dropDuplicates()
functions, distinct() can be used to remove rows that have the same values on all columns whereas dropDuplicates() can be used to remove rows that have the same values on multiple selected columns.
Before we start, first let’s create a DataFrame with some duplicate rows and duplicate values on a few columns.
import spark.implicits._
val simpleData = Seq(("James", "Sales", 3000),
("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100)
)
val df = simpleData.toDF("employee_name", "department", "salary")
df.show()
Yields below output
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
| James| Sales| 3000|
| Michael| Sales| 4600|
| Robert| Sales| 4100|
| Maria| Finance| 3000|
| James| Sales| 3000|
| Scott| Finance| 3300|
| Jen| Finance| 3900|
| Jeff| Marketing| 3000|
| Kumar| Marketing| 2000|
| Saif| Sales| 4100|
+-------------+----------+------+
In the above table, notice that we have 1 row with duplicate values on all columns (employer name James
) and we have 4 rows that have duplicate values on “department
” and “salary
” columns.
1. Use distinct() – Remove Duplicate Rows on DataFrame
In the above dataset, we have a total of 10 rows and one row with all values duplicated, performing distinct on this DataFrame should get us 9 as we have one duplicate.
//Distinct all columns
val distinctDF = df.distinct()
println("Distinct count: "+distinctDF.count())
distinctDF.show(false)
distinct()
function on DataFrame returns a new DataFrame after removing the duplicate records. This example yields the below output.
Distinct count: 9
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James |Sales |3000 |
|Michael |Sales |4600 |
|Maria |Finance |3000 |
|Robert |Sales |4100 |
|Saif |Sales |4100 |
|Scott |Finance |3300 |
|Jeff |Marketing |3000 |
|Jen |Finance |3900 |
|Kumar |Marketing |2000 |
+-------------+----------+------+
Alternatively, you can also run dropDuplicates()
function which return a new DataFrame with duplicate rows removed.
val df2 = df.dropDuplicates()
println("Distinct count: "+df2.count())
df2.show(false)
2. Use dropDuplicate() – Remove Duplicate Rows on DataFrame
Spark doesn’t have a distinct method that takes columns that should run distinct on however, Spark provides another signature of dropDuplicates()
function which takes multiple columns to eliminate duplicates.
Note that calling dropDuplicates() on DataFrame returns a new DataFrame with duplicate rows removed.
//Distinct using dropDuplicates
val dropDisDF = df.dropDuplicates("department","salary")
println("Distinct count of department & salary : "+dropDisDF.count())
dropDisDF.show(false)
Yields below output. If you notice the output, It dropped 2 records that are duplicate.
Distinct count of department & salary : 8
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|Jen |Finance |3900 |
|Maria |Finance |3000 |
|Scott |Finance |3300 |
|Michael |Sales |4600 |
|Kumar |Marketing |2000 |
|Robert |Sales |4100 |
|James |Sales |3000 |
|Jeff |Marketing |3000 |
+-------------+----------+------+
3. Source code – Remove Duplicate Rows
package com.sparkbyexamples.spark.dataframe.functions.aggregate
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object SQLDistinct extends App {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val simpleData = Seq(("James", "Sales", 3000),
("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100)
)
val df = simpleData.toDF("employee_name", "department", "salary")
df.show()
//Distinct all columns
val distinctDF = df.distinct()
println("Distinct count: "+distinctDF.count())
distinctDF.show(false)
val df2 = df.dropDuplicates()
println("Distinct count: "+df2.count())
df2.show(false)
//Distinct using dropDuplicates
val dropDisDF = df.dropDuplicates("department","salary")
println("Distinct count of department & salary : "+dropDisDF.count())
dropDisDF.show(false)
}
The complete example is available at GitHub for reference.
4. Conclusion
In this Spark article, you have learned how to remove DataFrame rows that are exact duplicates using distinct()
and learned how to remove duplicate rows based on multiple columns using dropDuplicate()
function with Scala example.
Thank you, that helped.
These tutorials are realy helpful, it would have been very good, if written in pyhton
Hi, There are many articles written in Python as-well. Please refer to Remove duplicate rows in PySpark (Spark with Python)
Awesome