• Post author:
  • Post category:HBase
  • Post last modified:March 27, 2024
  • Reading time:15 mins read
You are currently viewing Which Spark HBase Connector to use?

This tutorial explains different Spark connectors and libraries to interact with HBase Database and provides a Hortonworks connector example of how to create DataFrame from and Insert DataFrame to the table.

Advertisements
Spark HBase Connectors
Spark HBase Connectors

On the internet, you would find several ways and API’s to connect Spark to HBase and some of these are outdated or not maintained properly. Here, I will explain some libraries and what they are used for and later will see some spark SQL examples.

Let’s see these in detail.

Apache HBase Client

Apache hbase-client API comes with HBase distribution and you can find this jar in /lib at your installation directory. All Spark connectors use this library to interact with database natively. Below is maven dependency to use.

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version> replace hbase version </version>
</dependency>

If you want to connect to HBase from Java or Scala to connect to HBase, you can use this client API directly without any third-party libraries but with Spark you need connectors. You can find the recent releases and it’s compatible dependencies at Maven repository.

When you need to connect to HBase, you don’t have to add this dependency as connectors provide this library as a transitive dependency. and providing this additionally would result in incompatibility issues.

Spark HBase Connector ( hbase-spark )

hbase-spark API enables us to integrate Spark and fulfill the gap between Key-Value structure and Spark SQL table structure, and enables users to perform complex data analytical work on top of HBase.

It also helps us to leverage the benefits of RDD and DataFrame to use. HBaseContext is the root of all Spark integration, the HBaseContext reads HBase configurations and pushes them to the executors. This allows us to have an HBase Connection per Spark Executor.

Using this we should able to do parallel processing like bulk inserts through bulkPut, bulk deletes through buldDelete, build retrieval using bulkGet functions and it also has more functions like mapPartition e.t.c, All these functions return RDD and to use these we should import "org.apache.hadoop.hbase.spark.HBaseRDDFunctions._"

When using this with DataFrame, use “org.apache.hadoop.hbase.spark” as a DataSource

   <dependency>
     <groupId>org.apache.hbase.connectors.spark</groupId>
     <artifactId>hbase-spark</artifactId>
     <version>replace version here</version>
   </dependency>

You can find the recent releases and it’s compatible dependencies at Maven repository.

Note that this has been published to two maven artifacts, first “org.apache.hbase” doesn’t have new changes on “central” table as of writing this post and all versions released were beta and it has releases from Cloudera and Hortonworks with latest releases. In 2020 second artifact “org.apache.hbase.connectors.spark” created and released the first version with the latest version supported.

Spark Hortonworks Connector ( shc-core )

shc-core is from Hortonworks which provides DataSource “org.apache.spark.sql.execution.datasources.hbase” to integrate DataFrame with HBase, and it uses “Spark HBase connector” as dependency hence, we can use all its operations we discussed in the previous section.

    <dependency>
      <groupId>com.hortonworks</groupId>
      <artifactId>shc-core</artifactId>
      <version>replace version here</version>
    </dependency>

You can find the recent releases and it’s compatible dependencies at Maven repository.

DataFrame operations on HBase Table

Regardless of which Connector implementation you use, the underlying usage would not change much hence, In this remaining tutorial, we will learn with examples on how to use the HBase from Hortonworks.

Prerequisites:

HBase installation and setup at least Standalone Mode

We should define Catalog for the schema mapping between HBase table and Spark DataFrame, prepare the data and populate the table, then load it to DataFrame. After that, users can do integrated query and access records in HBase tables with SQL query. The following illustrates the basic procedure.

Create an HBase table

Use create command to create the table with at least one column family. Below statement creates ’employee’ table with ‘person’ and ‘address’ column family.

Related: How to create an HBase table using shell


Hbase> create 'employee', 'person', 'address'

Define Catalog Schema

Define catalog schema to map HBase table to Spark DataFrame. Below schema specifies to create a table ’employee’ in ‘default’ name schema with columns ‘key’, ‘firstName’, ‘lastName’, ‘middleName’ in ‘person’ column family and ‘addressLine1’, ‘city’, ‘state’ and ‘zipCode’ in ‘address’ column family. It’s not necessary to have a column names same between DataFrame and table hence, here we have used different column names for better understanding. For example, fName is DataFrame name for firstName in the table.

Note that the rowkey also has to be defined in details as a column (key), which has a specific column family rowkey


defcatalog=
s"""{
|"table":{"namespace":"default","name":"employee"},
|"rowkey":"key",
|"columns":{
|"key":{"cf":"rowkey","col":"key","type":"string"},
|"fName":{"cf":"person","col":"firstName","type":"string"},
|"lName":{"cf":"person","col":"lastName","type":"string"},
|"mName":{"cf":"person","col":"middleName","type":"string"},
|"addressLine":{"cf":"address","col":"addressLine","type":"string"},
|"city":{"cf":"address","col":"city","type":"string"},
|"state":{"cf":"address","col":"state","type":"string"},
|"zipCode":{"cf":"address","col":"zipCode","type":"string"}
|}
|}""".stripMargin
Create DataFrame

Now, Let’s create data and DataFrame from it to work with.


Val data=Seq(Employee("1","Abby","Smith","K","3456main","Orlando","FL","45235"),
Employee("2","Amaya","Williams","L","123Orange","Newark","NJ","27656"),
Employee("3","Alchemy","Davis","P","Warners","Sanjose","CA","34789"))

Val spark:SparkSession=SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()

Import spark.implicits._
Val df=spark.sparkContext.parallelize(data).toDF

Insert DataFrame Rows to table

As described earlier, Hortonworks connecters define DataSource “org.apache.spark.sql.execution.datasources.hbase” to read from and write to the table. One, you have a DataFrame, let’s inserts it’s rows to the table using this DataSource in format function. write is an object of DataFrameWriter to insert the DataFrame rows to HBase storage systems. Given a DataFrame with specified schema catalog, save function will create an table with 5 regions and save the DataFrame inside.


df.write.options(
Map(HBaseTableCatalog.tableCatalog->catalog,HBaseTableCatalog.newTable->"4"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()

scan ’employee’ shell command returns the following with 3 row keys 1,2,3

Spark HBase Connector Example
Spark HBase Connector

Reading and applying filters

Along with this, we should pass the table name to scan and catalog for DataFrame structure as options.


Val hbaseDF=spark.read
.options(Map(HBaseTableCatalog.tableCatalog->catalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()

hbaseDF.printSchema()

hbaseDF.show(false)

printSchema() prints the following.


root
 |-- key: string (nullable = true)
 |-- fName: string (nullable = true)
 |-- lName: string (nullable = true)
 |-- mName: string (nullable = true)
 |-- addressLine: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipCode: string (nullable = true)

show(false) returns the following. false arguments say not to truncate the data on results.


+---+-------+--------+-----+-----------+-------+-----+-------+
|key|fName  |lName   |mName|addressLine|city   |state|zipCode|
+---+-------+--------+-----+-----------+-------+-----+-------+
|1  |Abby   |Smith   |K    |3456 main  |Orlando|FL   |45235  |
|2  |Amaya  |Williams|L    |123 Orange |Newark |NJ   |27656  |
|3  |Alchemy|Davis   |P    |Warners    |Sanjose|CA   |34789  |
+---+-------+--------+-----+-----------+-------+-----+-------+

Now, we should able to apply DataFrame operations on this.


hbaseDF.filter($"key"==="1"&&$"state"==="FL")
.select("key","fName","lName")
.show()

Running SQL Query on table

Finally, Let’s create a temporary table on DataFrame which enables us to run SQL queries on the HBase table. The lifetime of this temporary table is tied to the SQLContext that was used to create.


hbaseDF.createOrReplaceTempView("employeeTable")
spark.sql("select*fromemployeeTablewherefName='Amaya'").show
Conclusion:

In this tutorial, you have learned different Spark HBase libraries to work with and the usage of the Hortonworks connector DataSource API with example. hope you like it.

Happy Learning :)

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

This Post Has 5 Comments

  1. NNK

    Hi Chris, I wrote this article in early 2020, and Spark 3.0 was not released by then. Will update this article with the latest version soon. Thanks for bringing this up.

  2. Chris

    Hi, I have problem. Now is 2020. We have Scala 3.0 and Spark 3.0 which uses Scala 2.12.
    All of these mentioned connectors work only with Scala 2.11.
    So we have to work with Spark 2.3 which was released February 28, 2018
    Where is 2020? ;)

  3. Hassaan

    Thanks for the article. If you’re running Spark on Azure Databricks, then the hbase-spark library is the one to use.

  4. NNK

    This seem to be a version mismatch of Jackson library, Can you please send me all dependencies libraries and their versions you are using?

  5. Enzo B

    Thanks for this great article ! I have a question:
    You said: “[…] the HBaseContext reads HBase configurations and pushes them to the executors. This allows us to have an HBase Connection per Spark Executor” –> Does this mean that if we have only 1 executor but 16 cores we will only have a reading parallelism of 1 and not 16 because there is only 1 HBase connection per executor ?

    Thanks a lot !

Comments are closed.