You are currently viewing Spark Read from & Write to HBase table | Example

This tutorial explains how to read or load from and write Spark (2.4.X version) DataFrame rows to HBase table using hbase-spark connector and Datasource  "org.apache.spark.sql.execution.datasources.hbase" along with Scala example.

Hello everyone,

Lately, one of the HBase libraries used in this article has been changed in the Maven repository and many readers experiencing issues with the data source not found error. Until I find the right library version I recommend use the below articles to read/write HBase tables using Spark.

Related: Libraries and DataSource API’s to connect Spark with HBase

Spark HBase library dependencies

Below HBase libraries are required to connect Spark with the HBase database and perform read and write rows to the table.

  • hbase-client This library provides by HBase which is used natively to interact with HBase.
  • hbase-spark connector which provides HBaseContext to interact Spark with HBase. HBaseContext pushes the configuration to the Spark executors and allows it to have an HBase Connection per Executor.

Below are complete maven dependencies to run the below examples in your environment. Note that “hbase-client” has not provided as a dependency since Spark HBase connector provides this as a transitive dependency and this is a recommended way otherwise you may come across incompatibility issues between these two.

  <dependencies>

    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.11.12</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.4.0</version>
      <scope>compile</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.4.0</version>
      <scope>compile</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.4.0</version>
    </dependency>

   <dependency>
     <groupId>org.apache.hbase.connectors.spark</groupId>
     <artifactId>hbase-spark</artifactId>
     <version>1.0.0</version>
   </dependency>

  </dependencies>

Writing Spark DataFrame to HBase table using “hbase-spark” connector

First, let’s create a DataFrame which we will store to HBase using “hbase-spark” connector. In this snippet, we are creating an employee DF with 3 rows.


case class Employee(key: String, fName: String, lName: String,
                      mName:String, addressLine:String, city:String,
                      state:String, zipCode:String)

 val data = Seq(Employee("1","Abby","Smith","K","3456 main", "Orlando","FL","45235"),
      Employee("2","Amaya","Williams","L","123 Orange","Newark", "NJ", "27656"),
      Employee("3","Alchemy","Davis","P","Warners","Sanjose","CA", "34789"))

    val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()

    import spark.implicits._
    val df = spark.sparkContext.parallelize(data).toDF

Now, Let’s define a catalog which bridges the gap between HBase KV store and DataFrame table structure. using this we will also map the column names between the two structures and keys. please refer below example for the snippet. within the catalog, we also specify the HBase table we are going to use and the namespace. here, we are using the “employee” table in the “default” namespace.


def catalog =
      s"""{
         |"table":{"namespace":"default", "name":"employee"},
         |"rowkey":"key",
         |"columns":{
         |"key":{"cf":"rowkey", "col":"key", "type":"string"},
         |"fName":{"cf":"person", "col":"firstName", "type":"string"},
         |"lName":{"cf":"person", "col":"lastName", "type":"string"},
         |"mName":{"cf":"person", "col":"middleName", "type":"string"},
         |"addressLine":{"cf":"address", "col":"addressLine", "type":"string"},
         |"city":{"cf":"address", "col":"city", "type":"string"},
         |"state":{"cf":"address", "col":"state", "type":"string"},
         |"zipCode":{"cf":"address", "col":"zipCode", "type":"string"}
         |}
         |}""".stripMargin

Finally, let’s store the DataFrame to HBase table using save() function on the data frame. from the below example, format takes "org.apache.spark.sql.execution.datasources.hbase" DataSource defined in “hbase-spark” API which enables us to use DataFrame with HBase tables. And, df.write.options take the catalog and specifies to use 4 regions in cluster. Finally, save()writes it to HBase table.


df.write.options(
      Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "4"))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .save()

Below is the complete example, and the same is available at GitHub


package com.sparkbyexamples.spark.hbase.dataframe

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog
import org.apache.spark.sql.SparkSession

object HBaseSparkInsert {

  case class Employee(key: String, fName: String, lName: String,
                      mName:String, addressLine:String, city:String,
                      state:String, zipCode:String)

  def main(args: Array[String]): Unit = {

    def catalog =
      s"""{
         |"table":{"namespace":"default", "name":"employee"},
         |"rowkey":"key",
         |"columns":{
         |"key":{"cf":"rowkey", "col":"key", "type":"string"},
         |"fName":{"cf":"person", "col":"firstName", "type":"string"},
         |"lName":{"cf":"person", "col":"lastName", "type":"string"},
         |"mName":{"cf":"person", "col":"middleName", "type":"string"},
         |"addressLine":{"cf":"address", "col":"addressLine", "type":"string"},
         |"city":{"cf":"address", "col":"city", "type":"string"},
         |"state":{"cf":"address", "col":"state", "type":"string"},
         |"zipCode":{"cf":"address", "col":"zipCode", "type":"string"}
         |}
         |}""".stripMargin


    val data = Seq(Employee("1","Abby","Smith","K","3456 main", "Orlando","FL","45235"),
      Employee("2","Amaya","Williams","L","123 Orange","Newark", "NJ", "27656"),
      Employee("3","Alchemy","Davis","P","Warners","Sanjose","CA", "34789"))

    val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()

    import spark.implicits._
    val df = spark.sparkContext.parallelize(data).toDF

    df.write.options(
      Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "4"))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .save()
  }
}

If you use scan 'employee' on a shell, you will get the below 3 rows as an output.

Spark HBase "hbase-spark" Connector
Spark HBase Connector

Reading the table to DataFrame using “hbase-spark”

In this example, I will explain how to read data from the HBase table, create a DataFrame and finally run some filters using DSL and SQL’s.

Below is a complete example and it is also available at GitHub.


package com.sparkbyexamples.spark.hbase.dataframe

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog
import org.apache.spark.sql.SparkSession

object HBaseSparkRead {

  def main(args: Array[String]): Unit = {

    def catalog =
      s"""{
         |"table":{"namespace":"default", "name":"employee"},
         |"rowkey":"key",
         |"columns":{
         |"key":{"cf":"rowkey", "col":"key", "type":"string"},
         |"fName":{"cf":"person", "col":"firstName", "type":"string"},
         |"lName":{"cf":"person", "col":"lastName", "type":"string"},
         |"mName":{"cf":"person", "col":"middleName", "type":"string"},
         |"addressLine":{"cf":"address", "col":"addressLine", "type":"string"},
         |"city":{"cf":"address", "col":"city", "type":"string"},
         |"state":{"cf":"address", "col":"state", "type":"string"},
         |"zipCode":{"cf":"address", "col":"zipCode", "type":"string"}
         |}
         |}""".stripMargin

    val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()

    import spark.implicits._

    // Reading from HBase to DataFrame
    val hbaseDF = spark.read
      .options(Map(HBaseTableCatalog.tableCatalog -> catalog))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .load()

    //Display Schema from DataFrame
    hbaseDF.printSchema()

    //Collect and show Data from DataFrame
    hbaseDF.show(false)

    //Applying Filters
    hbaseDF.filter($"key" === "1" && $"state" === "FL")
      .select("key", "fName", "lName")
      .show()

    //Create Temporary Table on DataFrame
    hbaseDF.createOrReplaceTempView("employeeTable")

    //Run SQL
    spark.sql("select * from employeeTable where fName = 'Amaya' ").show

  }
}

hbaseDF.printSchema() displays the below schema.


root
 |-- key: string (nullable = true)
 |-- fName: string (nullable = true)
 |-- lName: string (nullable = true)
 |-- mName: string (nullable = true)
 |-- addressLine: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipCode: string (nullable = true)

hbaseDF.show(false) get the below data. Please note the DataFrame field names differences with table column cell names.


+---+-------+--------+-----+-----------+-------+-----+-------+
|key|fName  |lName   |mName|addressLine|city   |state|zipCode|
+---+-------+--------+-----+-----------+-------+-----+-------+
|1  |Abby   |Smith   |K    |3456 main  |Orlando|FL   |45235  |
|2  |Amaya  |Williams|L    |123 Orange |Newark |NJ   |27656  |
|3  |Alchemy|Davis   |P    |Warners    |Sanjose|CA   |34789  |
+---+-------+--------+-----+-----------+-------+-----+-------+
Conclusion

In this tutorial, you have learned how the read from and write DataFrame rows to HBase table using Spark HBase connector and Datasource  "org.apache.spark.sql.execution.datasources.hbase" with Scala example.

This complete project with Maven dependencies and many more HBase examples are available at GitHub “spark-hbase-connector-examples” project

If you like this article, please leave me a message below.

Happy Learning !!

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

This Post Has 16 Comments

  1. NNK

    You can do that but not straight forward. maybe you need to use Cygwin.

  2. Anonymous

    Will be able to install Hbase in Windows? and run this through windows spark with hbase combo?

  3. NNK

    Hi Sabaree, in order to run this example successfully you need to need to have HBase setup.

  4. carlos

    Hello NNK, thanks for replying. That’s great. I did try to use the SHC libraries. I used the version 1.1.1-2.1-s_2.11, but I’m getting the following exception:

    java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse

    This seems like a incompatibility issue because my environment is using spark 2.4.4 and scala 2.11.12.and this library seems to be targeted for spark 2.1. Anyway, I know this is not related to this thread but if you have run into this issue and solved it please let me know.

    Thank you!

    Carlos

  5. NNK

    Hi You are getting error “Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/spark/datasources/HBaseTableCatalog” because you are not sending hbase jar’s to the Spark submit. please add all dependency jar’s to spark submit.

  6. Carlos

    Hi NNK again…, i was able to check out your project and tried to run it with the following command:

    spark-submit –class com.sparkbyexamples.spark.hbase.dataframe.MyHBaseSparkRead \
    –master yarn –deploy-mode cluster \
    –executor-memory 4g –num-executors 4 –driver-memory 2g \
    –name PipelineRunner –files /etc/hbase/conf/hbase-site.xml $DIR/target/spark-hbase-connector-examples-1.0-SNAPSHOT.jar

    but i’m getting the following error:

    rg.apache.spark.SparkException: Exception thrown in awaitResult:
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
    at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:468)
    at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:779)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
    at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:778)
    at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:244)
    at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:803)
    at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
    Caused by: java.util.concurrent.ExecutionException: Boxed Error
    at scala.concurrent.impl.Promise$.resolver(Promise.scala:59)
    at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:51)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
    at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:157)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:703)
    Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/spark/datasources/HBaseTableCatalog$
    at com.sparkbyexamples.spark.hbase.dataframe.MyHBaseSparkRead$.main(MyHBaseSparkRead.scala:32)
    at com.sparkbyexamples.spark.hbase.dataframe.MyHBaseSparkRead.main(MyHBaseSparkRead.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
    Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog$
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

    please let me know if you have any insights.

    Thanks!

    Carlos

  7. carlos

    Hello NNK,

    I’m also getting the same error (Failed to find data source: org.apache.spark.sql.execution.datasources.hbase). in my project. Any ideas? I’m going to checkout your project and try to run it. But if you have any insights, that would be great. Thanks!
    Carlos

  8. NNK

    Hi Sasmit, Have you tried cloning the GitHub project and have executed the example given?

  9. Sasmit Bhitiria

    Hi,

    I am able to create catalog and also create a data frame but while doing a show its not giving any record. In hbase the table having data.

  10. NNK

    Please share your pom.xml file.

  11. NNK

    Hi Santhosh, Have you added hbase dependencies to class path? Also, please specify if you are running job from IDE or using spark-submit?

    org.apache.hbase
    hbase-client
    2.1.0

    org.apache.hbase.connectors.spark
    hbase-spark
    1.0.0

  12. Saurabh Santhosh

    getting the following error :
    Exception in thread “main” java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.execution.datasources.hbase. Please find packages at http://spark.apache.org/third-party-projects.html
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:244)
    at com.sparkbyexamples.spark.hbase.dataframe.HBaseSparkInsert$.main(HBaseSparkInsert.scala:46)
    at com.sparkbyexamples.spark.hbase.dataframe.HBaseSparkInsert.main(HBaseSparkInsert.scala)

  13. Anonymous

    Hi NNK, the above code does not work for me as it says
    Failed to find data source: org.apache.spark.sql.execution.datasources.hbase
    Am i missing anything here?

  14. NNK

    Hi Indranil, Thanks for reading. Please checkout the Github project “https://github.com/sparkbyexamples/spark-hbase-connector-examples where I have complete code with maven dependencies. after checkout import it on your favorite IDE and execute the program. If you still have an issue, please let me know.

  15. Indranil

    Hi I am new to spark.. please help me with the below queries –
    1. where should I put the dependencies? I mean in which folder and which xml file?
    2. “hbase-spark” – where this library resides?
    3. what are the spark amd hbase versions those supported these integration?

Comments are closed.