This tutorial explains how to read or load from and write Spark (2.4.X version) DataFrame rows to HBase table using hbase-spark
connector and Datasource "org.apache.spark.sql.execution.datasources.hbase"
along with Scala example.
Hello everyone,
Lately, one of the HBase libraries used in this article has been changed in the Maven repository and many readers experiencing issues with the data source not found error. Until I find the right library version I recommend use the below articles to read/write HBase tables using Spark.
- https://sparkbyexamples.com/spark/create-spark-dataframe-from-hbase-using-hortonworks/
- https://sparkbyexamples.com/spark/writing-spark-dataframe-to-hbase-table-hortonworks/
Related: Libraries and DataSource API’s to connect Spark with HBase
Spark HBase library dependencies
Below HBase libraries are required to connect Spark with the HBase database and perform read and write rows to the table.
hbase-client
This library provides by HBase which is used natively to interact with HBase.hbase-spark
connector which provides HBaseContext to interact Spark with HBase. HBaseContext pushes the configuration to the Spark executors and allows it to have an HBase Connection per Executor.
Below are complete maven dependencies to run the below examples in your environment. Note that “hbase-client” has not provided as a dependency since Spark HBase connector provides this as a transitive dependency and this is a recommended way otherwise you may come across incompatibility issues between these two.
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase.connectors.spark</groupId>
<artifactId>hbase-spark</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
Writing Spark DataFrame to HBase table using “hbase-spark” connector
First, let’s create a DataFrame which we will store to HBase using “hbase-spark” connector. In this snippet, we are creating an employee DF with 3 rows.
case class Employee(key: String, fName: String, lName: String,
mName:String, addressLine:String, city:String,
state:String, zipCode:String)
val data = Seq(Employee("1","Abby","Smith","K","3456 main", "Orlando","FL","45235"),
Employee("2","Amaya","Williams","L","123 Orange","Newark", "NJ", "27656"),
Employee("3","Alchemy","Davis","P","Warners","Sanjose","CA", "34789"))
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
import spark.implicits._
val df = spark.sparkContext.parallelize(data).toDF
Now, Let’s define a catalog which bridges the gap between HBase KV store and DataFrame table structure. using this we will also map the column names between the two structures and keys. please refer below example for the snippet. within the catalog, we also specify the HBase table we are going to use and the namespace. here, we are using the “employee” table in the “default” namespace.
def catalog =
s"""{
|"table":{"namespace":"default", "name":"employee"},
|"rowkey":"key",
|"columns":{
|"key":{"cf":"rowkey", "col":"key", "type":"string"},
|"fName":{"cf":"person", "col":"firstName", "type":"string"},
|"lName":{"cf":"person", "col":"lastName", "type":"string"},
|"mName":{"cf":"person", "col":"middleName", "type":"string"},
|"addressLine":{"cf":"address", "col":"addressLine", "type":"string"},
|"city":{"cf":"address", "col":"city", "type":"string"},
|"state":{"cf":"address", "col":"state", "type":"string"},
|"zipCode":{"cf":"address", "col":"zipCode", "type":"string"}
|}
|}""".stripMargin
Finally, let’s store the DataFrame to HBase table using save() function on the data frame. from the below example, format takes "org.apache.spark.sql.execution.datasources.hbase"
DataSource defined in “hbase-spark” API which enables us to use DataFrame with HBase tables. And, df.write.options
take the catalog and specifies to use 4 regions in cluster. Finally, save()
writes it to HBase table.
df.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "4"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
Below is the complete example, and the same is available at GitHub
package com.sparkbyexamples.spark.hbase.dataframe
import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog
import org.apache.spark.sql.SparkSession
object HBaseSparkInsert {
case class Employee(key: String, fName: String, lName: String,
mName:String, addressLine:String, city:String,
state:String, zipCode:String)
def main(args: Array[String]): Unit = {
def catalog =
s"""{
|"table":{"namespace":"default", "name":"employee"},
|"rowkey":"key",
|"columns":{
|"key":{"cf":"rowkey", "col":"key", "type":"string"},
|"fName":{"cf":"person", "col":"firstName", "type":"string"},
|"lName":{"cf":"person", "col":"lastName", "type":"string"},
|"mName":{"cf":"person", "col":"middleName", "type":"string"},
|"addressLine":{"cf":"address", "col":"addressLine", "type":"string"},
|"city":{"cf":"address", "col":"city", "type":"string"},
|"state":{"cf":"address", "col":"state", "type":"string"},
|"zipCode":{"cf":"address", "col":"zipCode", "type":"string"}
|}
|}""".stripMargin
val data = Seq(Employee("1","Abby","Smith","K","3456 main", "Orlando","FL","45235"),
Employee("2","Amaya","Williams","L","123 Orange","Newark", "NJ", "27656"),
Employee("3","Alchemy","Davis","P","Warners","Sanjose","CA", "34789"))
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
import spark.implicits._
val df = spark.sparkContext.parallelize(data).toDF
df.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "4"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
}
}
If you use scan 'employee'
on a shell, you will get the below 3 rows as an output.
Reading the table to DataFrame using “hbase-spark”
In this example, I will explain how to read data from the HBase table, create a DataFrame and finally run some filters using DSL and SQL’s.
Below is a complete example and it is also available at GitHub.
package com.sparkbyexamples.spark.hbase.dataframe
import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog
import org.apache.spark.sql.SparkSession
object HBaseSparkRead {
def main(args: Array[String]): Unit = {
def catalog =
s"""{
|"table":{"namespace":"default", "name":"employee"},
|"rowkey":"key",
|"columns":{
|"key":{"cf":"rowkey", "col":"key", "type":"string"},
|"fName":{"cf":"person", "col":"firstName", "type":"string"},
|"lName":{"cf":"person", "col":"lastName", "type":"string"},
|"mName":{"cf":"person", "col":"middleName", "type":"string"},
|"addressLine":{"cf":"address", "col":"addressLine", "type":"string"},
|"city":{"cf":"address", "col":"city", "type":"string"},
|"state":{"cf":"address", "col":"state", "type":"string"},
|"zipCode":{"cf":"address", "col":"zipCode", "type":"string"}
|}
|}""".stripMargin
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
import spark.implicits._
// Reading from HBase to DataFrame
val hbaseDF = spark.read
.options(Map(HBaseTableCatalog.tableCatalog -> catalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
//Display Schema from DataFrame
hbaseDF.printSchema()
//Collect and show Data from DataFrame
hbaseDF.show(false)
//Applying Filters
hbaseDF.filter($"key" === "1" && $"state" === "FL")
.select("key", "fName", "lName")
.show()
//Create Temporary Table on DataFrame
hbaseDF.createOrReplaceTempView("employeeTable")
//Run SQL
spark.sql("select * from employeeTable where fName = 'Amaya' ").show
}
}
hbaseDF.printSchema()
displays the below schema.
root
|-- key: string (nullable = true)
|-- fName: string (nullable = true)
|-- lName: string (nullable = true)
|-- mName: string (nullable = true)
|-- addressLine: string (nullable = true)
|-- city: string (nullable = true)
|-- state: string (nullable = true)
|-- zipCode: string (nullable = true)
hbaseDF.show(false)
get the below data. Please note the DataFrame field names differences with table column cell names.
+---+-------+--------+-----+-----------+-------+-----+-------+
|key|fName |lName |mName|addressLine|city |state|zipCode|
+---+-------+--------+-----+-----------+-------+-----+-------+
|1 |Abby |Smith |K |3456 main |Orlando|FL |45235 |
|2 |Amaya |Williams|L |123 Orange |Newark |NJ |27656 |
|3 |Alchemy|Davis |P |Warners |Sanjose|CA |34789 |
+---+-------+--------+-----+-----------+-------+-----+-------+
Conclusion
In this tutorial, you have learned how the read from and write DataFrame rows to HBase table using Spark HBase connector and Datasource "org.apache.spark.sql.execution.datasources.hbase"
with Scala example.
This complete project with Maven dependencies and many more HBase examples are available at GitHub “spark-hbase-connector-examples” project
If you like this article, please leave me a message below.
Happy Learning !!
Exception in thread “main” java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.avro.AvroFileFormat could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:232)
at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:40)
at scala.collection.AbstractIterator.foreach(Iterator.scala:932)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)
at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:644)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:728)
at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:832)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:252)
at org.example.hbase_Spark$.main(hbase_Spark.scala:42)
at org.example.hbase_Spark.main(hbase_Spark.scala)
Caused by: java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormat$class
at org.apache.spark.sql.avro.AvroFileFormat.(AvroFileFormat.scala:44)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
… 13 more
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.FileFormat$class
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
… 20 more
I am new to spark. getting the above error can u pls help. Do we need to install hbase in our local for working with this example. I am using windows 10.
Hi Sabaree, in order to run this example successfully you need to need to have HBase setup.
Exception in thread “main” java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.avro.AvroFileFormat could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:232)
at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:40)
at scala.collection.AbstractIterator.foreach(Iterator.scala:932)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)
at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:644)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:728)
at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:832)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:252)
at org.example.hbase_Spark$.main(hbase_Spark.scala:42)
at org.example.hbase_Spark.main(hbase_Spark.scala)
Caused by: java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormat$class
at org.apache.spark.sql.avro.AvroFileFormat.(AvroFileFormat.scala:44)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
… 13 more
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.FileFormat$class
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
… 20 more
I am new to spark. getting the above error can u pls help. Do we need to install hbase in our local for working with this example. I am using windows 10.
Hi Sabaree, in order to run this example successfully you need to need to have HBase setup.
Will be able to install Hbase in Windows? and run this through windows spark with hbase combo?
You can do that but not straight forward. maybe you need to use Cygwin.
Hi NNK again…, i was able to check out your project and tried to run it with the following command:
spark-submit –class com.sparkbyexamples.spark.hbase.dataframe.MyHBaseSparkRead \
–master yarn –deploy-mode cluster \
–executor-memory 4g –num-executors 4 –driver-memory 2g \
–name PipelineRunner –files /etc/hbase/conf/hbase-site.xml $DIR/target/spark-hbase-connector-examples-1.0-SNAPSHOT.jar
but i’m getting the following error:
rg.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:468)
at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:779)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:778)
at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:244)
at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:803)
at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
Caused by: java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:59)
at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:51)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:157)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:703)
Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/spark/datasources/HBaseTableCatalog$
at com.sparkbyexamples.spark.hbase.dataframe.MyHBaseSparkRead$.main(MyHBaseSparkRead.scala:32)
at com.sparkbyexamples.spark.hbase.dataframe.MyHBaseSparkRead.main(MyHBaseSparkRead.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog$
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
please let me know if you have any insights.
Thanks!
Carlos
Hi You are getting error “Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/spark/datasources/HBaseTableCatalog” because you are not sending hbase jar’s to the Spark submit. please add all dependency jar’s to spark submit.
Hello Carlos, one of the HBase libraries has been changed in the Maven repository and many readers experiencing issues with the data source not found an error. Until I find the right library version I recommend use the below articles to read/write HBase tables using Spark. Sorry for the issues.
https://sparkbyexamples.com/spark/create-spark-dataframe-from-hbase-using-hortonworks/
https://sparkbyexamples.com/spark/writing-spark-dataframe-to-hbase-table-hortonworks/
Hello NNK,
I’m also getting the same error (Failed to find data source: org.apache.spark.sql.execution.datasources.hbase). in my project. Any ideas? I’m going to checkout your project and try to run it. But if you have any insights, that would be great. Thanks!
Carlos
Hello Carlos, one of the HBase libraries has been changed in the Maven repository and many readers experiencing issues with the data source not found an error. Until I find the right library version I recommend use the below articles to read/write HBase tables using Spark. Sorry for the issues.
https://sparkbyexamples.com/spark/create-spark-dataframe-from-hbase-using-hortonworks/
https://sparkbyexamples.com/spark/writing-spark-dataframe-to-hbase-table-hortonworks/
Hello NNK, thanks for replying. That’s great. I did try to use the SHC libraries. I used the version 1.1.1-2.1-s_2.11, but I’m getting the following exception:
java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse
This seems like a incompatibility issue because my environment is using spark 2.4.4 and scala 2.11.12.and this library seems to be targeted for spark 2.1. Anyway, I know this is not related to this thread but if you have run into this issue and solved it please let me know.
Thank you!
Carlos
getting the following error :
Exception in thread “main” java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.execution.datasources.hbase. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:244)
at com.sparkbyexamples.spark.hbase.dataframe.HBaseSparkInsert$.main(HBaseSparkInsert.scala:46)
at com.sparkbyexamples.spark.hbase.dataframe.HBaseSparkInsert.main(HBaseSparkInsert.scala)
Hi Santhosh, Have you added hbase dependencies to class path? Also, please specify if you are running job from IDE or using spark-submit?
Hi Santhosh, Have you added hbase dependencies to class path? Also, please specify if you are running job from IDE or using spark-submit?
org.apache.hbase
hbase-client
2.1.0
org.apache.hbase.connectors.spark
hbase-spark
1.0.0
Hi,
I am able to create catalog and also create a data frame but while doing a show its not giving any record. In hbase the table having data.
Hi Sasmit, Have you tried cloning the GitHub project and have executed the example given?
Hi NNK, the above code does not work for me as it says
Failed to find data source: org.apache.spark.sql.execution.datasources.hbase
Am i missing anything here?
Please share your pom.xml file.
Hi I am new to spark.. please help me with the below queries –
1. where should I put the dependencies? I mean in which folder and which xml file?
2. “hbase-spark” – where this library resides?
3. what are the spark amd hbase versions those supported these integration?
Hi Indranil, Thanks for reading. Please checkout the Github project “https://github.com/sparkbyexamples/spark-hbase-connector-examples where I have complete code with maven dependencies. after checkout import it on your favorite IDE and execute the program. If you still have an issue, please let me know.