You are currently viewing Kafka consumer and producer example with a custom serializer

Kafka allows us to create our own serializer and deserializer so that we can produce and consume different data types like Json, POJO, avro e.t.c . In this post will see how to produce and consumer “User” POJO object. To stream POJO objects one needs to create custom serializer and deserializer.

Advertisements

Maven Dependency:

<dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-clients</artifactId>
     <version>2.1.0</version>
</dependency>
<dependency>
     <groupId>org.codehaus.jackson</groupId>
     <artifactId>jackson-mapper-asl</artifactId>
      <version>1.9.13</version>
</dependency>

1. First will create User POJO class.


package com.sparkbyexamples.kafka.beans
class User() {
  private var name:String = ""
  private var age:Int = 0
  def this(name: String, age: Int) {
    this()
    this.name =name
    this.age = age
  }
  def getName: String = this.name
  def getAge: Int = this.age
  override def toString: String = "User(" + name + ", " + age + ")"
}

2. Create User serializer class by extending Kafka Serializer


package com.sparkbyexamples.kafka.jackson
import java.util
import com.sparkbyexamples.kafka.beans.User
import org.apache.kafka.common.serialization.Serializer
import org.codehaus.jackson.map.ObjectMapper
class UserSerializer extends Serializer[User]{
  override def configure(map: util.Map[String, _], b: Boolean): Unit = {
  }
  override def serialize(s: String, t: User): Array[Byte] = {
    if(t==null)
      null
    else
     {
       val objectMapper = new ObjectMapper()
       objectMapper.writeValueAsString(t).getBytes
     }
  }
  override def close(): Unit = {
  }
}

3. Create User deserializer class by extending Kafka Deserializer


package com.sparkbyexamples.kafka.jackson
import java.util
import com.sparkbyexamples.kafka.beans.User
import org.apache.kafka.common.serialization.Deserializer
import org.codehaus.jackson.map.ObjectMapper
class UserDeserializer extends Deserializer[User] {
  override def configure(map: util.Map[String, _], b: Boolean): Unit = {
  }
  override def deserialize(s: String, bytes: Array[Byte]): User = {
    val mapper = new ObjectMapper()
    val user = mapper.readValue(bytes, classOf[User])
    user
  }
  override def close(): Unit = {
  }
}

4. Create a Kafka consumer and use UserDeserializer for “value.deserializer” property.


package com.sparkbyexamples.kafka.jackson
import java.util.Properties
import com.sparkbyexamples.kafka.beans.User
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._
object KafkaConsumerWithUserObject extends App {
  val prop:Properties = new Properties()
  prop.put("group.id", "test")
  prop.put("bootstrap.servers","192.168.1.100:9092") prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
prop.put("value.deserializer","com.nelamalli.kafka.jackson.UserDeserializer")
  prop.put("enable.auto.commit", "true")
  prop.put("auto.commit.interval.ms", "1000")
  val consumer = new KafkaConsumer[String,User](prop)
  val topics = List("user_user")
  try{
    consumer.subscribe(topics.asJava)
    while(true){
      val records = consumer.poll(10)
      for(record e.printStackTrace()
  }finally {
    consumer.close()
  }
}

5. Create a Kafka producer and use UserSerializer for “value.serializer” property.


package com.sparkbyexamples.kafka.jackson
import java.util.Properties
import com.sparkbyexamples.kafka.beans.User
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object KafkaProducerWithUserObject {
  val props:Properties = new Properties()
  props.put("bootstrap.servers","192.168.1.100:9092")
  props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer","com.sparkbyexamples.kafka.jackson.UserSerializer")
  props.put("acks","all")
  val producer = new KafkaProducer[String, User](props)
  try{
    for(i  e.printStackTrace()
  }finally {
    producer.close()
  }
}

6. Run KafkaConsumerWithUserObject

7. Run KafkaProducerWithUserObject

The complete code can be downloaded from GitHub

Naveen Nelamali

Naveen Nelamali (NNK) is a Data Engineer with 20+ years of experience in transforming data into actionable insights. Over the years, He has honed his expertise in designing, implementing, and maintaining data pipelines with frameworks like Apache Spark, PySpark, Pandas, R, Hive and Machine Learning. Naveen journey in the field of data engineering has been a continuous learning, innovation, and a strong commitment to data integrity. In this blog, he shares his experiences with the data as he come across. Follow Naveen @ LinkedIn and Medium

This Post Has One Comment

  1. DK

    Can you post some examples using avro schema registry in a consume application

Comments are closed.