Spark Read from & Write to HBase table | Example

This tutorial explains how to read or load from and write Spark (2.4.X version) DataFrame rows to HBase table using hbase-spark connector and Datasource  "org.apache.spark.sql.execution.datasources.hbase" along with Scala example.

Hello everyone,

Lately, one of the HBase libraries used in this article has been changed in the Maven repository and many readers experiencing issues with the data source not found error. Until I find the right library version I recommend use the below articles to read/write HBase tables using Spark.

Related: Libraries and DataSource API’s to connect Spark with HBase

Spark HBase library dependencies

Below HBase libraries are required to connect Spark with the HBase database and perform read and write rows to the table.

  • hbase-client This library provides by HBase which is used natively to interact with HBase.
  • hbase-spark connector which provides HBaseContext to interact Spark with HBase. HBaseContext pushes the configuration to the Spark executors and allows it to have an HBase Connection per Executor.

Below are complete maven dependencies to run the below examples in your environment. Note that “hbase-client” has not provided as a dependency since Spark HBase connector provides this as a transitive dependency and this is a recommended way otherwise you may come across incompatibility issues between these two.

  <dependencies>

    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.11.12</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.4.0</version>
      <scope>compile</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.4.0</version>
      <scope>compile</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.4.0</version>
    </dependency>

   <dependency>
     <groupId>org.apache.hbase.connectors.spark</groupId>
     <artifactId>hbase-spark</artifactId>
     <version>1.0.0</version>
   </dependency>

  </dependencies>

Writing Spark DataFrame to HBase table using “hbase-spark” connector

First, let’s create a DataFrame which we will store to HBase using “hbase-spark” connector. In this snippet, we are creating an employee DF with 3 rows.


case class Employee(key: String, fName: String, lName: String,
                      mName:String, addressLine:String, city:String,
                      state:String, zipCode:String)

 val data = Seq(Employee("1","Abby","Smith","K","3456 main", "Orlando","FL","45235"),
      Employee("2","Amaya","Williams","L","123 Orange","Newark", "NJ", "27656"),
      Employee("3","Alchemy","Davis","P","Warners","Sanjose","CA", "34789"))

    val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()

    import spark.implicits._
    val df = spark.sparkContext.parallelize(data).toDF

Now, Let’s define a catalog which bridges the gap between HBase KV store and DataFrame table structure. using this we will also map the column names between the two structures and keys. please refer below example for the snippet. within the catalog, we also specify the HBase table we are going to use and the namespace. here, we are using the “employee” table in the “default” namespace.


def catalog =
      s"""{
         |"table":{"namespace":"default", "name":"employee"},
         |"rowkey":"key",
         |"columns":{
         |"key":{"cf":"rowkey", "col":"key", "type":"string"},
         |"fName":{"cf":"person", "col":"firstName", "type":"string"},
         |"lName":{"cf":"person", "col":"lastName", "type":"string"},
         |"mName":{"cf":"person", "col":"middleName", "type":"string"},
         |"addressLine":{"cf":"address", "col":"addressLine", "type":"string"},
         |"city":{"cf":"address", "col":"city", "type":"string"},
         |"state":{"cf":"address", "col":"state", "type":"string"},
         |"zipCode":{"cf":"address", "col":"zipCode", "type":"string"}
         |}
         |}""".stripMargin

Finally, let’s store the DataFrame to HBase table using save() function on the data frame. from the below example, format takes "org.apache.spark.sql.execution.datasources.hbase" DataSource defined in “hbase-spark” API which enables us to use DataFrame with HBase tables. And, df.write.options take the catalog and specifies to use 4 regions in cluster. Finally, save()writes it to HBase table.


df.write.options(
      Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "4"))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .save()

Below is the complete example, and the same is available at GitHub


package com.sparkbyexamples.spark.hbase.dataframe

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog
import org.apache.spark.sql.SparkSession

object HBaseSparkInsert {

  case class Employee(key: String, fName: String, lName: String,
                      mName:String, addressLine:String, city:String,
                      state:String, zipCode:String)

  def main(args: Array[String]): Unit = {

    def catalog =
      s"""{
         |"table":{"namespace":"default", "name":"employee"},
         |"rowkey":"key",
         |"columns":{
         |"key":{"cf":"rowkey", "col":"key", "type":"string"},
         |"fName":{"cf":"person", "col":"firstName", "type":"string"},
         |"lName":{"cf":"person", "col":"lastName", "type":"string"},
         |"mName":{"cf":"person", "col":"middleName", "type":"string"},
         |"addressLine":{"cf":"address", "col":"addressLine", "type":"string"},
         |"city":{"cf":"address", "col":"city", "type":"string"},
         |"state":{"cf":"address", "col":"state", "type":"string"},
         |"zipCode":{"cf":"address", "col":"zipCode", "type":"string"}
         |}
         |}""".stripMargin


    val data = Seq(Employee("1","Abby","Smith","K","3456 main", "Orlando","FL","45235"),
      Employee("2","Amaya","Williams","L","123 Orange","Newark", "NJ", "27656"),
      Employee("3","Alchemy","Davis","P","Warners","Sanjose","CA", "34789"))

    val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()

    import spark.implicits._
    val df = spark.sparkContext.parallelize(data).toDF

    df.write.options(
      Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "4"))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .save()
  }
}

If you use scan 'employee' on a shell, you will get the below 3 rows as an output.

Spark HBase "hbase-spark" Connector
Spark HBase Connector

Reading the table to DataFrame using “hbase-spark”

In this example, I will explain how to read data from the HBase table, create a DataFrame and finally run some filters using DSL and SQL’s.

Below is a complete example and it is also available at GitHub.


package com.sparkbyexamples.spark.hbase.dataframe

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog
import org.apache.spark.sql.SparkSession

object HBaseSparkRead {

  def main(args: Array[String]): Unit = {

    def catalog =
      s"""{
         |"table":{"namespace":"default", "name":"employee"},
         |"rowkey":"key",
         |"columns":{
         |"key":{"cf":"rowkey", "col":"key", "type":"string"},
         |"fName":{"cf":"person", "col":"firstName", "type":"string"},
         |"lName":{"cf":"person", "col":"lastName", "type":"string"},
         |"mName":{"cf":"person", "col":"middleName", "type":"string"},
         |"addressLine":{"cf":"address", "col":"addressLine", "type":"string"},
         |"city":{"cf":"address", "col":"city", "type":"string"},
         |"state":{"cf":"address", "col":"state", "type":"string"},
         |"zipCode":{"cf":"address", "col":"zipCode", "type":"string"}
         |}
         |}""".stripMargin

    val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()

    import spark.implicits._

    // Reading from HBase to DataFrame
    val hbaseDF = spark.read
      .options(Map(HBaseTableCatalog.tableCatalog -> catalog))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .load()

    //Display Schema from DataFrame
    hbaseDF.printSchema()

    //Collect and show Data from DataFrame
    hbaseDF.show(false)

    //Applying Filters
    hbaseDF.filter($"key" === "1" && $"state" === "FL")
      .select("key", "fName", "lName")
      .show()

    //Create Temporary Table on DataFrame
    hbaseDF.createOrReplaceTempView("employeeTable")

    //Run SQL
    spark.sql("select * from employeeTable where fName = 'Amaya' ").show

  }
}

hbaseDF.printSchema() displays the below schema.


root
 |-- key: string (nullable = true)
 |-- fName: string (nullable = true)
 |-- lName: string (nullable = true)
 |-- mName: string (nullable = true)
 |-- addressLine: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipCode: string (nullable = true)

hbaseDF.show(false) get the below data. Please note the DataFrame field names differences with table column cell names.


+---+-------+--------+-----+-----------+-------+-----+-------+
|key|fName  |lName   |mName|addressLine|city   |state|zipCode|
+---+-------+--------+-----+-----------+-------+-----+-------+
|1  |Abby   |Smith   |K    |3456 main  |Orlando|FL   |45235  |
|2  |Amaya  |Williams|L    |123 Orange |Newark |NJ   |27656  |
|3  |Alchemy|Davis   |P    |Warners    |Sanjose|CA   |34789  |
+---+-------+--------+-----+-----------+-------+-----+-------+
Conclusion

In this tutorial, you have learned how the read from and write DataFrame rows to HBase table using Spark HBase connector and Datasource  "org.apache.spark.sql.execution.datasources.hbase" with Scala example.

This complete project with Maven dependencies and many more HBase examples are available at GitHub “spark-hbase-connector-examples” project

If you like this article, please leave me a message below.

Happy Learning !!

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

Leave a Reply

This Post Has 24 Comments

  1. sabaree

    Exception in thread “main” java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.avro.AvroFileFormat could not be instantiated
    at java.util.ServiceLoader.fail(ServiceLoader.java:232)
    at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
    at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:40)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:932)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:644)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:728)
    at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:832)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:252)
    at org.example.hbase_Spark$.main(hbase_Spark.scala:42)
    at org.example.hbase_Spark.main(hbase_Spark.scala)
    Caused by: java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormat$class
    at org.apache.spark.sql.avro.AvroFileFormat.(AvroFileFormat.scala:44)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.lang.Class.newInstance(Class.java:442)
    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
    … 13 more
    Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.FileFormat$class
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    … 20 more

    I am new to spark. getting the above error can u pls help. Do we need to install hbase in our local for working with this example. I am using windows 10.

    1. NNK

      Hi Sabaree, in order to run this example successfully you need to need to have HBase setup.

  2. sabaree

    Exception in thread “main” java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.avro.AvroFileFormat could not be instantiated
    at java.util.ServiceLoader.fail(ServiceLoader.java:232)
    at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
    at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:40)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:932)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:644)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:728)
    at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:832)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:252)
    at org.example.hbase_Spark$.main(hbase_Spark.scala:42)
    at org.example.hbase_Spark.main(hbase_Spark.scala)
    Caused by: java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormat$class
    at org.apache.spark.sql.avro.AvroFileFormat.(AvroFileFormat.scala:44)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.lang.Class.newInstance(Class.java:442)
    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
    … 13 more
    Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.FileFormat$class
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    … 20 more

    I am new to spark. getting the above error can u pls help. Do we need to install hbase in our local for working with this example. I am using windows 10.

    1. NNK

      Hi Sabaree, in order to run this example successfully you need to need to have HBase setup.

      1. Anonymous

        Will be able to install Hbase in Windows? and run this through windows spark with hbase combo?

        1. NNK

          You can do that but not straight forward. maybe you need to use Cygwin.

  3. Carlos

    Hi NNK again…, i was able to check out your project and tried to run it with the following command:

    spark-submit –class com.sparkbyexamples.spark.hbase.dataframe.MyHBaseSparkRead \
    –master yarn –deploy-mode cluster \
    –executor-memory 4g –num-executors 4 –driver-memory 2g \
    –name PipelineRunner –files /etc/hbase/conf/hbase-site.xml $DIR/target/spark-hbase-connector-examples-1.0-SNAPSHOT.jar

    but i’m getting the following error:

    rg.apache.spark.SparkException: Exception thrown in awaitResult:
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
    at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:468)
    at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:779)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
    at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:778)
    at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:244)
    at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:803)
    at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
    Caused by: java.util.concurrent.ExecutionException: Boxed Error
    at scala.concurrent.impl.Promise$.resolver(Promise.scala:59)
    at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:51)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
    at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:157)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:703)
    Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/spark/datasources/HBaseTableCatalog$
    at com.sparkbyexamples.spark.hbase.dataframe.MyHBaseSparkRead$.main(MyHBaseSparkRead.scala:32)
    at com.sparkbyexamples.spark.hbase.dataframe.MyHBaseSparkRead.main(MyHBaseSparkRead.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
    Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog$
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

    please let me know if you have any insights.

    Thanks!

    Carlos

    1. NNK

      Hi You are getting error “Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/spark/datasources/HBaseTableCatalog” because you are not sending hbase jar’s to the Spark submit. please add all dependency jar’s to spark submit.

  4. carlos

    Hello NNK,

    I’m also getting the same error (Failed to find data source: org.apache.spark.sql.execution.datasources.hbase). in my project. Any ideas? I’m going to checkout your project and try to run it. But if you have any insights, that would be great. Thanks!
    Carlos

      1. carlos

        Hello NNK, thanks for replying. That’s great. I did try to use the SHC libraries. I used the version 1.1.1-2.1-s_2.11, but I’m getting the following exception:

        java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse

        This seems like a incompatibility issue because my environment is using spark 2.4.4 and scala 2.11.12.and this library seems to be targeted for spark 2.1. Anyway, I know this is not related to this thread but if you have run into this issue and solved it please let me know.

        Thank you!

        Carlos

  5. Saurabh Santhosh

    getting the following error :
    Exception in thread “main” java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.execution.datasources.hbase. Please find packages at http://spark.apache.org/third-party-projects.html
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:244)
    at com.sparkbyexamples.spark.hbase.dataframe.HBaseSparkInsert$.main(HBaseSparkInsert.scala:46)
    at com.sparkbyexamples.spark.hbase.dataframe.HBaseSparkInsert.main(HBaseSparkInsert.scala)

    1. NNK

      Hi Santhosh, Have you added hbase dependencies to class path? Also, please specify if you are running job from IDE or using spark-submit?


      org.apache.hbase
      hbase-client
      2.1.0


      org.apache.hbase.connectors.spark
      hbase-spark
      1.0.0

    2. NNK

      Hi Santhosh, Have you added hbase dependencies to class path? Also, please specify if you are running job from IDE or using spark-submit?

      org.apache.hbase
      hbase-client
      2.1.0

      org.apache.hbase.connectors.spark
      hbase-spark
      1.0.0

      1. Sasmit Bhitiria

        Hi,

        I am able to create catalog and also create a data frame but while doing a show its not giving any record. In hbase the table having data.

        1. NNK

          Hi Sasmit, Have you tried cloning the GitHub project and have executed the example given?

  6. Anonymous

    Hi NNK, the above code does not work for me as it says
    Failed to find data source: org.apache.spark.sql.execution.datasources.hbase
    Am i missing anything here?

    1. NNK

      Please share your pom.xml file.

  7. Indranil

    Hi I am new to spark.. please help me with the below queries –
    1. where should I put the dependencies? I mean in which folder and which xml file?
    2. “hbase-spark” – where this library resides?
    3. what are the spark amd hbase versions those supported these integration?

    1. NNK

      Hi Indranil, Thanks for reading. Please checkout the Github project “https://github.com/sparkbyexamples/spark-hbase-connector-examples where I have complete code with maven dependencies. after checkout import it on your favorite IDE and execute the program. If you still have an issue, please let me know.