Spark withColumn()
is a DataFrame function that is used to add a new column to DataFrame, change the value of an existing column, convert the datatype of a column, derive a new column from an existing column, on this post, I will walk you through commonly used DataFrame column operations with Scala examples.
- Spark withColumn() Syntax and Usage
- Add a New Column to DataFrame
- Change Value of an Existing Column
- Derive New Column From an Existing Column
- Change Column DataType
- Add, Replace or Update Multiple Columns
- Rename Column Name
- Drop a Column From DataFrame
- Split Column into Multiple Columns
Spark withColumn() Syntax and Usage
Spark withColumn()
is a transformation function of DataFrame that is used to manipulate the column values of all rows or selected rows on DataFrame.
withColumn() function returns a new Spark DataFrame after performing operations like adding a new column, update the value of an existing column, derive a new column from an existing column, and many more.
Below is a syntax of withColumn()
function.
withColumn(colName : String, col : Column) : DataFrame
colName:String
– specify a new column you wanted to create. use an existing column to update the value.
col:Column
– column expression.
Since withColumn() is a transformation function it doesn’t execute until action is called.
Spark withColumn() method introduces a projection internally. Therefore, calling it multiple times, for instance, via loops in order to add multiple columns can generate big plans which can cause performance issues and even
Spark DocumentationStackOverflowException
. To avoid this, useselect
with the multiple columns at once.
First, let’s create a DataFrame to work with.
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructType}
val data = Seq(Row(Row("James;","","Smith"),"36636","M","3000"),
Row(Row("Michael","Rose",""),"40288","M","4000"),
Row(Row("Robert","","Williams"),"42114","M","4000"),
Row(Row("Maria","Anne","Jones"),"39192","F","4000"),
Row(Row("Jen","Mary","Brown"),"","F","-1")
)
val schema = new StructType()
.add("name",new StructType()
.add("firstname",StringType)
.add("middlename",StringType)
.add("lastname",StringType))
.add("dob",StringType)
.add("gender",StringType)
.add("salary",StringType)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)
1. Add a New Column to DataFrame
To create a new column, pass your desired column name to the first argument of withColumn() transformation function. Make sure this new column not already present on DataFrame, if it presents it updates the value of the column. On the below snippet, lit() function is used to add a constant value to a DataFrame column. We can also chain in order to add multiple columns.
// Add a New Column to DataFrame
import org.apache.spark.sql.functions.lit
df.withColumn("Country", lit("USA"))
// Chaining to operate on multiple columns
df.withColumn("Country", lit("USA"))
.withColumn("anotherColumn",lit("anotherValue"))
The above approach is fine if you are manipulating few columns, but when you wanted to add or update multiple columns, do not use the chaining withColumn() as it leads to performance issues, use select() to update multiple columns instead.
2. Change Value of an Existing Column
Spark withColumn()
function of DataFrame can also be used to update the value of an existing column. In order to change the value, pass an existing column name as a first argument and value to be assigned as a second column. Note that the second argument should be Column
type .
// Change Value of an Existing Column
import org.apache.spark.sql.functions.col
df.withColumn("salary",col("salary")*100)
This snippet multiplies the value of “salary” with 100 and updates the value back to “salary” column.
3. Derive New Column From an Existing Column
To create a new column, specify the first argument with a name you want your new column to be and use the second argument to assign a value by applying an operation on an existing column.
// Derive New Column From an Existing Column
df.withColumn("CopiedColumn",col("salary")* -1)
This snippet creates a new column “CopiedColumn” by multiplying “salary” column with value -1.
4. Change Column Data Type
By using Spark withColumn on a DataFrame and using cast function on a column, we can change datatype of a DataFrame column. The below statement changes the datatype from String to Integer for the “salary” column.
// Change Column Data Type
df.withColumn("salary",col("salary").cast("Integer"))
5. Add, Replace, or Update multiple Columns
When you wanted to add, replace or update multiple columns in Spark DataFrame, it is not suggestible to chain withColumn()
function as it leads into performance issue and recommends to use select() after creating a temporary view on DataFrame
// Add, Replace, or Update multiple Columns
df2.createOrReplaceTempView("PERSON")
spark.sql("SELECT salary*100 as salary, salary*-1 as CopiedColumn, 'USA' as country FROM PERSON").show()
6. Rename Column Name
Though examples in 6,7, and 8 doesn’t use withColumn()
function, I still feel like explaining how to rename, drop, and split columns as these would be useful to you.
To rename an existing column use “withColumnRenamed” function on DataFrame.
// Rename Column Name
df.withColumnRenamed("gender","sex")
7. Drop a Column
Use drop()
function to drop a specific column from the DataFrame.
// Drop a Column
df.drop("CopiedColumn")
8. Split Column into Multiple Columns
Though this example doesn’t use withColumn()
function, I still feel like it’s good to explain on splitting one DataFrame column to multiple columns using Spark map()
transformation function.
// Split Column into Multiple Columns
import spark.implicits._
val columns = Seq("name","address")
val data = Seq(("Robert, Smith", "1 Main st, Newark, NJ, 92537"),
("Maria, Garcia","3456 Walnut st, Newark, NJ, 94732"))
var dfFromData = spark.createDataFrame(data).toDF(columns:_*)
dfFromData.printSchema()
val newDF = dfFromData.map(f=>{
val nameSplit = f.getAs[String](0).split(",")
val addSplit = f.getAs[String](1).split(",")
(nameSplit(0),nameSplit(1),addSplit(0),addSplit(1),addSplit(2),addSplit(3))
})
val finalDF = newDF.toDF("First Name","Last Name",
"Address Line1","City","State","zipCode")
finalDF.printSchema()
finalDF.show(false)
This snippet split name column into “first name”, “last name” and address column into “Address Line1”, “City”, “State” and “ZipCode”. Yields below output:
// Output
root
|-- First Name: string (nullable = true)
|-- Last Name: string (nullable = true)
|-- Address Line1: string (nullable = true)
|-- City: string (nullable = true)
|-- State: string (nullable = true)
|-- zipCode: string (nullable = true)
+----------+---------+--------------+-------+-----+-------+
|First Name|Last Name|Address Line1 |City |State|zipCode|
+----------+---------+--------------+-------+-----+-------+
|Robert | Smith |1 Main st | Newark| NJ | 92537 |
|Maria | Garcia |3456 Walnut st| Newark| NJ | 94732 |
+----------+---------+--------------+-------+-----+-------+
Note: Note that all of these functions return the new DataFrame after applying the functions instead of updating DataFrame.
9. Spark withColumn() Complete Example
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.functions._
object WithColumn {
def main(args:Array[String]):Unit= {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
val dataRows = Seq(Row(Row("James;","","Smith"),"36636","M","3000"),
Row(Row("Michael","Rose",""),"40288","M","4000"),
Row(Row("Robert","","Williams"),"42114","M","4000"),
Row(Row("Maria","Anne","Jones"),"39192","F","4000"),
Row(Row("Jen","Mary","Brown"),"","F","-1")
)
val schema = new StructType()
.add("name",new StructType()
.add("firstname",StringType)
.add("middlename",StringType)
.add("lastname",StringType))
.add("dob",StringType)
.add("gender",StringType)
.add("salary",StringType)
val df2 = spark.createDataFrame(spark.sparkContext.parallelize(dataRows),schema)
// Change the column data type
df2.withColumn("salary",df2("salary").cast("Integer"))
// Derive a new column from existing
val df4=df2.withColumn("CopiedColumn",df2("salary")* -1)
// Transforming existing column
val df5 = df2.withColumn("salary",df2("salary")*100)
// You can also chain withColumn to change multiple columns
// Renaming a column.
val df3=df2.withColumnRenamed("gender","sex")
df3.printSchema()
// Droping a column
val df6=df4.drop("CopiedColumn")
println(df6.columns.contains("CopiedColumn"))
// Adding a literal value
df2.withColumn("Country", lit("USA")).printSchema()
// Retrieving
df2.show(false)
df2.select("name").show(false)
df2.select("name.firstname").show(false)
df2.select("name.*").show(false)
import spark.implicits._
val columns = Seq("name","address")
val data = Seq(("Robert, Smith", "1 Main st, Newark, NJ, 92537"), ("Maria, Garcia","3456 Walnut st, Newark, NJ, 94732"))
var dfFromData = spark.createDataFrame(data).toDF(columns:_*)
dfFromData.printSchema()
val newDF = dfFromData.map(f=>{
val nameSplit = f.getAs[String](0).split(",")
val addSplit = f.getAs[String](1).split(",")
(nameSplit(0),nameSplit(1),addSplit(0),addSplit(1),addSplit(2),addSplit(3))
})
val finalDF = newDF.toDF("First Name","Last Name","Address Line1","City","State","zipCode")
finalDF.printSchema()
finalDF.show(false)
df2.createOrReplaceTempView("PERSON")
spark.sql("SELECT salary*100 as salary, salary*-1 as CopiedColumn, 'USA' as country FROM PERSON").show()
}
}
The complete code can be downloaded from GitHub
Happy Learning !!
thank you for adding `map`
Hi John, I am glad you liked it.
Hi, I really like the way you explained.
Thanks you so much.
I have a qn:
how can we update the row in data frame?
and how can we add a row in the table(not like creating another df and performing union on 2 dfs)
thanks,
Hi Rach, DataFrame’s are immutable hence, you can’t add or update the row. However, using withColumn() we can update the row but it results in a new DataFrame. Actually any operation on DataFrame results in new DataFrame.
And finally, you can’t add a row the DataFrame without union.
so if I want to add a row to a dataframe, do I need to create another df with same structure and add that row into new df and need to perform the union operation?
and can you explain the real time issues what we face when performing union and join operations.
or any issues what we face in real time, how can we solve those.
Unions and Joins are slow in nature as they perform wider transformations (data shuffling over network). So you need to use them wisely.
thank you for sharing , happy learning
In case , we have added multiple withcolumn to the dataframe for example: df.withcolumn(…).withcolumn(……), something like this.How would this work.I just want to know in what sequence the data gets processed
Can you give an example while joining a table to the df, how to change its column with join table’s column
thank a lot keep making this content