Kafka consumer and producer example with a custom serializer

Kafka allows us to create our own serializer and deserializer so that we can produce and consume different data types like Json, POJO, avro e.t.c . In this post will see how to produce and consumer “User” POJO object. To stream POJO objects one needs to create custom serializer and deserializer.

Maven Dependency:

<dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-clients</artifactId>
     <version>2.1.0</version>
</dependency>
<dependency>
     <groupId>org.codehaus.jackson</groupId>
     <artifactId>jackson-mapper-asl</artifactId>
      <version>1.9.13</version>
</dependency>

1. First will create User POJO class.


package com.sparkbyexamples.kafka.beans
class User() {
  private var name:String = ""
  private var age:Int = 0
  def this(name: String, age: Int) {
    this()
    this.name =name
    this.age = age
  }
  def getName: String = this.name
  def getAge: Int = this.age
  override def toString: String = "User(" + name + ", " + age + ")"
}

2. Create User serializer class by extending Kafka Serializer


package com.sparkbyexamples.kafka.jackson
import java.util
import com.sparkbyexamples.kafka.beans.User
import org.apache.kafka.common.serialization.Serializer
import org.codehaus.jackson.map.ObjectMapper
class UserSerializer extends Serializer[User]{
  override def configure(map: util.Map[String, _], b: Boolean): Unit = {
  }
  override def serialize(s: String, t: User): Array[Byte] = {
    if(t==null)
      null
    else
     {
       val objectMapper = new ObjectMapper()
       objectMapper.writeValueAsString(t).getBytes
     }
  }
  override def close(): Unit = {
  }
}

3. Create User deserializer class by extending Kafka Deserializer


package com.sparkbyexamples.kafka.jackson
import java.util
import com.sparkbyexamples.kafka.beans.User
import org.apache.kafka.common.serialization.Deserializer
import org.codehaus.jackson.map.ObjectMapper
class UserDeserializer extends Deserializer[User] {
  override def configure(map: util.Map[String, _], b: Boolean): Unit = {
  }
  override def deserialize(s: String, bytes: Array[Byte]): User = {
    val mapper = new ObjectMapper()
    val user = mapper.readValue(bytes, classOf[User])
    user
  }
  override def close(): Unit = {
  }
}

4. Create a Kafka consumer and use UserDeserializer for “value.deserializer” property.


package com.sparkbyexamples.kafka.jackson
import java.util.Properties
import com.sparkbyexamples.kafka.beans.User
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._
object KafkaConsumerWithUserObject extends App {
  val prop:Properties = new Properties()
  prop.put("group.id", "test")
  prop.put("bootstrap.servers","192.168.1.100:9092") prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
prop.put("value.deserializer","com.nelamalli.kafka.jackson.UserDeserializer")
  prop.put("enable.auto.commit", "true")
  prop.put("auto.commit.interval.ms", "1000")
  val consumer = new KafkaConsumer[String,User](prop)
  val topics = List("user_user")
  try{
    consumer.subscribe(topics.asJava)
    while(true){
      val records = consumer.poll(10)
      for(record<-records.asScala){
        println("Topic: "+record.topic()+", Key: "+record.key() +", Value: "+record.value().getName +
          ", Offset: "+record.offset() +", Partition: "+record.partition())
      }
    }
  }catch{
    case e:Exception => e.printStackTrace()
  }finally {
    consumer.close()
  }
}

5. Create a Kafka producer and use UserSerializer for “value.serializer” property.


package com.sparkbyexamples.kafka.jackson
import java.util.Properties
import com.sparkbyexamples.kafka.beans.User
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object KafkaProducerWithUserObject {
  val props:Properties = new Properties()
  props.put("bootstrap.servers","192.168.1.100:9092")
  props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer","com.sparkbyexamples.kafka.jackson.UserSerializer")
  props.put("acks","all")
  val producer = new KafkaProducer[String, User](props)
  try{
    for(i <- 0 to 100) {
      val user = new User("My Name - "+i,i)
      val record = new ProducerRecord[String, User]("user_topic",i.toString,user)
      val metadata = producer.send(record)
      printf(s"sent record(key=%s value=%s) " +
        "meta(partition=%d, offset=%d)\n",
        record.key(), record.value(), metadata.get().partition(),
        metadata.get().offset());
    }
  }catch{
    case e:Exception => e.printStackTrace()
  }finally {
    producer.close()
  }
}

6. Run KafkaConsumerWithUserObject

7. Run KafkaProducerWithUserObject

The complete code can be downloaded from GitHub

kafka-scala-serializer

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

Leave a Reply

This Post Has One Comment

  1. DK

    Can you post some examples using avro schema registry in a consume application

You are currently viewing Kafka consumer and producer example with a custom serializer