This article describes Spark SQL Batch Processing using Apache Kafka Data Source on DataFrame. Unlike Spark structure stream processing, we may need to process batch jobs that consume the messages from Apache Kafka topic and produces messages to Apache Kafka topic in batch mode. To do this we should use read
instead of readStream
similarly write
instead of writeStream
on DataFrame .
- Spark SQL Batch Processing – Producing Messages to Kafka Topic.
- Spark SQL Batch Processing – Consuming Messages from Kafka Topic
Maven Dependency for Apache Kafka :
Spark SQL Batch Processing – Producing Messages to Kafka Topic.
package com.sparkbyexamples.spark.streaming.batch
import org.apache.spark.sql.SparkSession
object WriteDataFrameToKafka {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
val data = Seq (("iphone", "2007"),("iphone 3G","2008"),
("iphone 3GS","2009"),
("iphone 4","2010"),
("iphone 4S","2011"),
("iphone 5","2012"),
("iphone 8","2014"),
("iphone 10","2017"))
val df = spark.createDataFrame(data).toDF("key","value")
since we are using dataframe which is already in text,
selectExpr is optional.
If the bytes of the Kafka records represent UTF8 strings,
we can simply use a cast to convert the binary data
into the correct type.
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
Spark SQL Batch Processing – Consuming Messages from Kafka Topic
package com.sparkbyexamples.spark.streaming.batch
import org.apache.spark.sql.SparkSession
object ReadDataFromKafka {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
val df = spark
.option("kafka.bootstrap.servers", "")
.option("subscribe", "text_topic")
val df2 = df.selectExpr("CAST(key AS STRING)",
"CAST(value AS STRING)","topic")
How message does it read from Kafka topic in a single read ? Is it one message at a time or many at a time ?