Create Spark DataFrame from HBase using Hortonworks

This tutorial explains with a Scala example of how to create Spark DataFrame from HBase table using Hortonworks DataSource "org.apache.spark.sql.execution.datasources.hbase" from shc-core library.

I would recommend reading Inserting Spark DataFrame to HBase table before you proceed to the rest of the article where I explained Maven dependencies and their usage.

Related: Libraries and DataSource API’s to connect Spark with HBase

In summary, To interact Spark DataFrame with HBase, you would need hbase-clinet, hbase-spark and shc-core API’s.


  <dependencies>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-client</artifactId>
      <version>2.0.2.3.1.0.6-1</version> <!-- Hortonworks Latest -->
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-spark</artifactId>
      <version>2.0.2.3.1.0.6-1</version> <!-- Hortonworks Latest -->
    </dependency>
    <dependency>
      <groupId>com.hortonworks</groupId>
      <artifactId>shc-core</artifactId>
      <version>1.1.1-2.1-s_2.11</version> <!-- Hortonworks Latest -->
    </dependency>
  </dependencies>

Below is the complete example, for your reference and the same example is also available at GitHub.


package com.sparkbyexamples.spark.dataframe.hbase.hortonworks

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog

object HBaseSparkRead {

  def main(args: Array[String]): Unit = {

    def catalog =
      s"""{
         |"table":{"namespace":"default", "name":"employee"},
         |"rowkey":"key",
         |"columns":{
         |"key":{"cf":"rowkey", "col":"key", "type":"string"},
         |"fName":{"cf":"person", "col":"firstName", "type":"string"},
         |"lName":{"cf":"person", "col":"lastName", "type":"string"},
         |"mName":{"cf":"person", "col":"middleName", "type":"string"},
         |"addressLine":{"cf":"address", "col":"addressLine", "type":"string"},
         |"city":{"cf":"address", "col":"city", "type":"string"},
         |"state":{"cf":"address", "col":"state", "type":"string"},
         |"zipCode":{"cf":"address", "col":"zipCode", "type":"string"}
         |}
         |}""".stripMargin

    val sparkSession: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()

    import sparkSession.implicits._

    val hbaseDF = sparkSession.read
      .options(Map(HBaseTableCatalog.tableCatalog -> catalog))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .load()

    hbaseDF.printSchema()

    hbaseDF.show(false)

    hbaseDF.filter($"key" === "1" && $"state" === "FL")
      .select("key", "fName", "lName")
      .show()

    //Create Temporary Table on DataFrame
    hbaseDF.createOrReplaceTempView("employeeTable")

    //Run SQL
    sparkSession.sql("select * from employeeTable where fName = 'Amaya' ").show

  }
}

Let me explain what’s happening at a few statements in this example.

First, we need to define a catalog to bridge the gap between HBase KV store and Spark DataFrame table structure. using this we will also map the column names between the two structures and keys.

A couple of things happening at below snippet, format takes "org.apache.spark.sql.execution.datasources.hbase" DataSource defined in “shc-core” API which enables us to use DataFrame with HBase tables. And, df.read.options take the catalog which we defined earlier. Finally, load() reads the HBase table.


    val hbaseDF = sparkSession.read
      .options(Map(HBaseTableCatalog.tableCatalog -> catalog))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .load()

hbaseDF.printSchema() displays the below schema.


root
 |-- key: string (nullable = true)
 |-- fName: string (nullable = true)
 |-- lName: string (nullable = true)
 |-- mName: string (nullable = true)
 |-- addressLine: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipCode: string (nullable = true)

hbaseDF.show(false) get the below data. Please note the DataFrame field names differences with table column cell names.


+---+-------+--------+-----+-----------+-------+-----+-------+
|key|fName  |lName   |mName|addressLine|city   |state|zipCode|
+---+-------+--------+-----+-----------+-------+-----+-------+
|1  |Abby   |Smith   |K    |3456 main  |Orlando|FL   |45235  |
|2  |Amaya  |Williams|L    |123 Orange |Newark |NJ   |27656  |
|3  |Alchemy|Davis   |P    |Warners    |Sanjose|CA   |34789  |
+---+-------+--------+-----+-----------+-------+-----+-------+

hbaseDF.filter() filter the data using DSL functions.


+---+-----+-----+
|key|fName|lName|
+---+-----+-----+
|  1| Abby|Smith|
+---+-----+-----+

finally, we can create a temporary SQL table and run all SQL queries.


+---+-----+--------+-----+-----------+------+-----+-------+
|key|fName|   lName|mName|addressLine|  city|state|zipCode|
+---+-----+--------+-----+-----------+------+-----+-------+
|  2|Amaya|Williams|    L| 123 Orange|Newark|   NJ|  27656|
+---+-----+--------+-----+-----------+------+-----+-------+
Conclusion:

In this tutorial, you have learned how to create Spark DataFrame from HBase table using Hortonworks DataSource API and also have seen how to run DSL and SQL queries on Hbase DataFrame.

Happy Learning !!

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

Leave a Reply