Spark DataFrame withColumn

  • Post author:
  • Post category:Apache Spark

Spark withColumn() is a DataFrame function that is used to add a new column to DataFrame, change the value of an existing column, convert the datatype of a column, derive a new column from an existing column, on this post, I will walk you through commonly used DataFrame column operations with Scala examples.

Spark withColumn() Syntax and Usage

Spark withColumn() is a transformation function of DataFrame that is used to manipulate the column values of all rows or selected rows on DataFrame.

withColumn() function returns a new Spark DataFrame after performing operations like adding a new column, update the value of an existing column, derive a new column from an existing column, and many more.

Below is a syntax of withColumn() function.

 
withColumn(colName : String, col : Column) : DataFrame

colName:Stirng – specify a new column you wanted to create. use an existing column to update the value.

col:Column – column expression.

Since withColumn() is a transformation function it doesn’t execute until action is called.

Spark withColumn() method introduces a projection internally. Therefore, calling it multiple times, for instance, via loops in order to add multiple columns can generate big plans which can cause performance issues and even StackOverflowException. To avoid this, use select with the multiple columns at once.

Spark Documentation

First, let’s create a DataFrame to work with.


import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructType} 
val data = Seq(Row(Row("James;","","Smith"),"36636","M","3000"),
      Row(Row("Michael","Rose",""),"40288","M","4000"),
      Row(Row("Robert","","Williams"),"42114","M","4000"),
      Row(Row("Maria","Anne","Jones"),"39192","F","4000"),
      Row(Row("Jen","Mary","Brown"),"","F","-1")
)

val schema = new StructType()
      .add("name",new StructType()
      .add("firstname",StringType)
      .add("middlename",StringType)
      .add("lastname",StringType))
      .add("dob",StringType)
      .add("gender",StringType)
      .add("salary",StringType)

val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)

1. Add a New Column to DataFrame

To create a new column, pass your desired column name to the first argument of withColumn() transformation function. Make sure this new column not already present on DataFrame, if it presents it updates the value of the column. On the below snippet, lit() function is used to add a constant value to a DataFrame column. We can also chain in order to add multiple columns.

 
import org.apache.spark.sql.functions.lit
df.withColumn("Country", lit("USA"))

//chaining to operate on multiple columns
df.withColumn("Country", lit("USA"))
   .withColumn("anotherColumn",lit("anotherValue"))

The above approach is fine if you are manipulating few columns, but when you wanted to add or update multiple columns, do not use the chaining withColumn() as it leads to performance issues, use select() to update multiple columns instead.

2. Change Value of an Existing Column

Spark withColumn() function of DataFrame can also be used to update the value of an existing column. In order to change the value, pass an existing column name as a first argument and value to be assigned as a second column. Note that the second argument should be Column type .


import org.apache.spark.sql.functions.col
df.withColumn("salary",col("salary")*100)

This snippet multiplies the value of “salary” with 100 and updates the value back to “salary” column.

3. Derive New Column From an Existing Column

To create a new column, specify the first argument with a name you want your new column to be and use the second argument to assign a value by applying an operation on an existing column.


df.withColumn("CopiedColumn",col("salary")* -1)

This snippet creates a new column “CopiedColumn” by multiplying “salary” column with value -1.

4. Change Column Data Type

By using Spark withColumn on a DataFrame and using cast function on a column, we can change datatype of a DataFrame column. The below statement changes the datatype from String to Integer for the “salary” column.

 df.withColumn("salary",col("salary").cast("Integer"))

5. Add, Replace, or Update multiple Columns

When you wanted to add, replace or update multiple columns in Spark DataFrame, it is not suggestible to chain withColumn() function as it leads into performance issue and recommends to use select() after creating a temporary view on DataFrame


df2.createOrReplaceTempView("PERSON")
spark.sql("SELECT salary*100 as salary, salary*-1 as CopiedColumn, 'USA' as country FROM PERSON").show()

6. Rename Column Name

Though examples in 6,7, and 8 doesn’t use withColumn() function, I still feel like explaining how to rename, drop, and split columns as these would be useful to you.

To rename an existing column use “withColumnRenamed” function on DataFrame.


df.withColumnRenamed("gender","sex")

7. Drop a Column

Use drop() function to drop a specific column from the DataFrame.


df.drop("CopiedColumn")

8. Split Column into Multiple Columns

Though this example doesn’t use withColumn() function, I still feel like it’s good to explain on splitting one DataFrame column to multiple columns using map() transformation function.


import spark.implicits._

val columns = Seq("name","address")
val data = Seq(("Robert, Smith", "1 Main st, Newark, NJ, 92537"),
             ("Maria, Garcia","3456 Walnut st, Newark, NJ, 94732"))
var dfFromData = spark.createDataFrame(data).toDF(columns:_*)
dfFromData.printSchema()

val newDF = dfFromData.map(f=>{
val nameSplit = f.getAs[String](0).split(",")
val addSplit = f.getAs[String](1).split(",")
      (nameSplit(0),nameSplit(1),addSplit(0),addSplit(1),addSplit(2),addSplit(3))
    })
val finalDF = newDF.toDF("First Name","Last Name",
             "Address Line1","City","State","zipCode")
finalDF.printSchema()
finalDF.show(false)

This snippet split name column into “first name”, “last name” and address column into “Address Line1”, “City”, “State” and “ZipCode”. Yields below output:


root
 |-- First Name: string (nullable = true)
 |-- Last Name: string (nullable = true)
 |-- Address Line1: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- zipCode: string (nullable = true)

+----------+---------+--------------+-------+-----+-------+
|First Name|Last Name|Address Line1 |City   |State|zipCode|
+----------+---------+--------------+-------+-----+-------+
|Robert    | Smith   |1 Main st     | Newark| NJ  | 92537 |
|Maria     | Garcia  |3456 Walnut st| Newark| NJ  | 94732 |
+----------+---------+--------------+-------+-----+-------+

Note: Note that all of these functions return the new DataFrame after applying the functions instead of updating DataFrame.

Spark withColumn Complete Example


import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.functions._
object WithColumn {

  def main(args:Array[String]):Unit= {

    val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()

    val dataRows = Seq(Row(Row("James;","","Smith"),"36636","M","3000"),
      Row(Row("Michael","Rose",""),"40288","M","4000"),
      Row(Row("Robert","","Williams"),"42114","M","4000"),
      Row(Row("Maria","Anne","Jones"),"39192","F","4000"),
      Row(Row("Jen","Mary","Brown"),"","F","-1")
    )

    val schema = new StructType()
      .add("name",new StructType()
        .add("firstname",StringType)
        .add("middlename",StringType)
        .add("lastname",StringType))
      .add("dob",StringType)
      .add("gender",StringType)
      .add("salary",StringType)

    val df2 = spark.createDataFrame(spark.sparkContext.parallelize(dataRows),schema)

    //Change the column data type
    df2.withColumn("salary",df2("salary").cast("Integer"))

    //Derive a new column from existing
    val df4=df2.withColumn("CopiedColumn",df2("salary")* -1)

    //Transforming existing column
    val df5 = df2.withColumn("salary",df2("salary")*100)

    //You can also chain withColumn to change multiple columns

    //Renaming a column.
    val df3=df2.withColumnRenamed("gender","sex")
    df3.printSchema()

    //Droping a column
    val df6=df4.drop("CopiedColumn")
    println(df6.columns.contains("CopiedColumn"))

    //Adding a literal value
    df2.withColumn("Country", lit("USA")).printSchema()

    //Retrieving
    df2.show(false)
    df2.select("name").show(false)
    df2.select("name.firstname").show(false)
    df2.select("name.*").show(false)

    import spark.implicits._

    val columns = Seq("name","address")
    val data = Seq(("Robert, Smith", "1 Main st, Newark, NJ, 92537"), ("Maria, Garcia","3456 Walnut st, Newark, NJ, 94732"))
    var dfFromData = spark.createDataFrame(data).toDF(columns:_*)
    dfFromData.printSchema()

    val newDF = dfFromData.map(f=>{
      val nameSplit = f.getAs[String](0).split(",")
      val addSplit = f.getAs[String](1).split(",")
      (nameSplit(0),nameSplit(1),addSplit(0),addSplit(1),addSplit(2),addSplit(3))
    })
    val finalDF = newDF.toDF("First Name","Last Name","Address Line1","City","State","zipCode")
    finalDF.printSchema()
    finalDF.show(false)

    df2.createOrReplaceTempView("PERSON")
    spark.sql("SELECT salary*100 as salary, salary*-1 as CopiedColumn, 'USA' as country FROM PERSON").show()
  }
}

The complete code can be downloaded from GitHub

Happy Learning !!

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

This Post Has 9 Comments

  1. NM

    Can you give an example while joining a table to the df, how to change its column with join table’s column

  2. Anonymous

    In case , we have added multiple withcolumn to the dataframe for example: df.withcolumn(…).withcolumn(……), something like this.How would this work.I just want to know in what sequence the data gets processed

  3. abdul sattar

    thank you for sharing , happy learning

  4. Rach

    so if I want to add a row to a dataframe, do I need to create another df with same structure and add that row into new df and need to perform the union operation?

    and can you explain the real time issues what we face when performing union and join operations.
    or any issues what we face in real time, how can we solve those.

    1. NNK

      Unions and Joins are slow in nature as they perform wider transformations (data shuffling over network). So you need to use them wisely.

  5. Rach

    Hi, I really like the way you explained.
    Thanks you so much.

    I have a qn:
    how can we update the row in data frame?
    and how can we add a row in the table(not like creating another df and performing union on 2 dfs)

    thanks,

    1. NNK

      Hi Rach, DataFrame’s are immutable hence, you can’t add or update the row. However, using withColumn() we can update the row but it results in a new DataFrame. Actually any operation on DataFrame results in new DataFrame.
      And finally, you can’t add a row the DataFrame without union.

  6. John J

    thank you for adding `map`

    1. NNK

      Hi John, I am glad you liked it.

Leave a Reply