Spark DataFrame withColumn

  • Post author:
  • Post category:Apache Spark

Spark withColumn() function is used to rename, change the value, convert the datatype of an existing DataFrame column and also can be used to create a new column, on this post, I will walk you through commonly used DataFrame column operations with Scala examples.

First, let’s create a DataFrame to work with.

 
val data = Seq(Row(Row("James ","","Smith"),"36636","M","3000"),
      Row(Row("Michael ","Rose",""),"40288","M","4000"),
      Row(Row("Robert ","","Williams"),"42114","M","4000"),
      Row(Row("Maria ","Anne","Jones"),"39192","F","4000"),
      Row(Row("Jen","Mary","Brown"),"","F","-1")
)

val schema = new StructType()
      .add("name",new StructType()
      .add("firstname",StringType)
      .add("middlename",StringType)
      .add("lastname",StringType))
      .add("dob",StringType)
      .add("gender",StringType)
      .add("salary",StringType)

val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)

1. Spark withColumn – To change column DataType

By using Spark withColumn on a DataFrame and using cast function on a column, we can change datatype of a DataFrame column. The below statement changes the datatype from String to Integer for the “salary” column.

 df.withColumn("salary",col("salary").cast("Integer"))

2. Change the value of an existing column

Spark withColumn() function of DataFrame can also be used to update the value of an existing column. In order to change the value, pass an existing column name as a first argument and value to be assigned as a second column. Note that the second argument should be Column type .


df.withColumn("salary",col("salary")*100)

This snippet multiplies the value of “salary” with 100 and updates the value back to “salary” column.

3. Derive new column from an existing column

To create a new column, specify the first argument with a name you want your new column to be and use the second argument to assign a value by applying an operation on an existing column.


df.withColumn("CopiedColumn",col("salary")* -1)

This snippet creates a new column “CopiedColumn” by multiplying “salary” column with value -1.

4. Add a new column

To create a new column, pass your desired column name to the first argument of withColumn() transformation function. Make sure this new column not already present on DataFrame, if it presents it updates the value of the column. On below snippet, lit() function is used to add a constant value to a DataFrame column. We can also chain in order to add multiple columns.

 
df.withColumn("Country", lit("USA"))

//chaining to operate on multiple columns
df.withColumn("Country", lit("USA"))
   .withColumn("anotherColumn",lit("anotherValue"))

5. Rename DataFrame column name

To rename an existing column use “withColumnRenamed” function on DataFrame.


df.withColumnRenamed("gender","sex")

6. Drop a column from Spark DataFrame

Use “drop” function to drop a specific column from the DataFrame.


df.drop("CopiedColumn")

7. Split column to multiple columns

Though this example doesn’t use withColumn() function, I still feel like it’s good to explain on splitting one DataFrame column to multiple columns using map() transformation function.


import spark.implicits._

val columns = Seq("name","address")
val data = Seq(("Robert, Smith", "1 Main st, Newark, NJ, 92537"),
             ("Maria, Garcia","3456 Walnut st, Newark, NJ, 94732"))
var dfFromData = spark.createDataFrame(data).toDF(columns:_*)
dfFromData.printSchema()

val newDF = dfFromData.map(f=>{
val nameSplit = f.getAs[String](0).split(",")
val addSplit = f.getAs[String](1).split(",")
      (nameSplit(0),nameSplit(1),addSplit(0),addSplit(1),addSplit(2),addSplit(3))
    })
val finalDF = newDF.toDF("First Name","Last Name",
             "Address Line1","City","State","zipCode")
finalDF.printSchema()
finalDF.show(false)

This snippet split name column into “first name”, “last name” and address column into “Address Line1”, “City”, “State” and “ZipCode”. Yields below output:


root
 |-- First Name: string (nullable = true)
 |-- Last Name: string (nullable = true)
 |-- Address Line1: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- zipCode: string (nullable = true)

+----------+---------+--------------+-------+-----+-------+
|First Name|Last Name|Address Line1 |City   |State|zipCode|
+----------+---------+--------------+-------+-----+-------+
|Robert    | Smith   |1 Main st     | Newark| NJ  | 92537 |
|Maria     | Garcia  |3456 Walnut st| Newark| NJ  | 94732 |
+----------+---------+--------------+-------+-----+-------+

Note: Note that all of these functions return the new DataFrame after applying the functions instead of updating DataFrame.

Spark withColumn complete example


package com.sparkbyexamples.spark.dataframe

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, StringType, StructType}
import org.apache.spark.sql.functions._
object WithColumn {

  def main(args:Array[String]):Unit= {

    val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()

    val arrayStructureData = Seq(
      Row(Row("James ","","Smith"),"1","M",3100,List("Cricket","Movies"),Map("hair"->"black","eye"->"brown")),
      Row(Row("Michael ","Rose",""),"2","M",3100,List("Tennis"),Map("hair"->"brown","eye"->"black")),
      Row(Row("Robert ","","Williams"),"3","M",3100,List("Cooking","Football"),Map("hair"->"red","eye"->"gray")),
      Row(Row("Maria ","Anne","Jones"),"4","M",3100,null,Map("hair"->"blond","eye"->"red")),
      Row(Row("Jen","Mary","Brown"),"5","M",3100,List("Blogging"),Map("white"->"black","eye"->"black"))
    )

    val arrayStructureSchema = new StructType()
      .add("name",new StructType()
        .add("firstname",StringType)
        .add("middlename",StringType)
        .add("lastname",StringType))
      .add("id",StringType)
      .add("gender",StringType)
      .add("salary",IntegerType)
      .add("Hobbies", ArrayType(StringType))
      .add("properties", MapType(StringType,StringType))

    val df2 = spark.createDataFrame(
      spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)

    //Change the column data type
    df2.withColumn("salary",df2("salary").cast("Integer"))

    //Derive a new column from existing
    val df4=df2.withColumn("CopiedColumn",df2("salary")* -1)

    //Transforming existing column
    val df5 = df2.withColumn("salary",df2("salary")*100)

    //You can also chain withColumn to change multiple columns

    //Renaming a column.
    val df3=df2.withColumnRenamed("gender","sex")
    df3.printSchema()

    //Droping a column
    val df6=df4.drop("CopiedColumn")
    println(df6.columns.contains("CopiedColumn"))

    //Adding a literal value
    df2.withColumn("Country", lit("USA")).printSchema()

    //Retrieving
    df2.show(false)
    df2.select("name").show(false)
    df2.select("name.firstname").show(false)
    df2.select("name.*").show(false)

    val df8 = df2.select(col("*"),explode(col("hobbies")))
    df8.show(false)
    
    //Splitting one column to multiple columns
    import spark.implicits._

    val columns = Seq("name","address")
    val data = Seq(("Robert, Smith", "1 Main st, Newark, NJ, 92537"),
        ("Maria, Garcia","3456 Walnut st, Newark, NJ, 94732"))
    var dfFromData = spark.createDataFrame(data).toDF(columns:_*)
    dfFromData.printSchema()

    val newDF = dfFromData.map(f=>{
      val nameSplit = f.getAs[String](0).split(",")
      val addSplit = f.getAs[String](1).split(",")
      (nameSplit(0),nameSplit(1),addSplit(0),addSplit(1),addSplit(2),addSplit(3))
    })
    val finalDF = newDF.toDF("First Name","Last Name",
           "Address Line1","City","State","zipCode")
    finalDF.printSchema()
    finalDF.show(false)
  }
}

The complete code can be downloaded from GitHub

Happy Learning !!

NNK

SparkByExamples.com is a BigData and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment using Scala and Maven.

This Post Has 8 Comments

  1. Anonymous

    In case , we have added multiple withcolumn to the dataframe for example: df.withcolumn(…).withcolumn(……), something like this.How would this work.I just want to know in what sequence the data gets processed

  2. abdul sattar

    thank you for sharing , happy learning

  3. Rach

    so if I want to add a row to a dataframe, do I need to create another df with same structure and add that row into new df and need to perform the union operation?

    and can you explain the real time issues what we face when performing union and join operations.
    or any issues what we face in real time, how can we solve those.

    1. NNK

      Unions and Joins are slow in nature as they perform wider transformations (data shuffling over network). So you need to use them wisely.

  4. Rach

    Hi, I really like the way you explained.
    Thanks you so much.

    I have a qn:
    how can we update the row in data frame?
    and how can we add a row in the table(not like creating another df and performing union on 2 dfs)

    thanks,

    1. NNK

      Hi Rach, DataFrame’s are immutable hence, you can’t add or update the row. However, using withColumn() we can update the row but it results in a new DataFrame. Actually any operation on DataFrame results in new DataFrame.
      And finally, you can’t add a row the DataFrame without union.

  5. John J

    thank you for adding `map`

    1. NNK

      Hi John, I am glad you liked it.

Leave a Reply