Kafka allows us to create our own serializer and deserializer so that we can produce and consume different data types like Json, POJO, avro e.t.c . In this post will see how to produce and consumer “User” POJO object. To stream POJO objects one needs to create custom serializer and deserializer.
Maven Dependency:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
1. First will create User POJO class.
package com.sparkbyexamples.kafka.beans
class User() {
private var name:String = ""
private var age:Int = 0
def this(name: String, age: Int) {
this()
this.name =name
this.age = age
}
def getName: String = this.name
def getAge: Int = this.age
override def toString: String = "User(" + name + ", " + age + ")"
}
2. Create User serializer class by extending Kafka Serializer
package com.sparkbyexamples.kafka.jackson
import java.util
import com.sparkbyexamples.kafka.beans.User
import org.apache.kafka.common.serialization.Serializer
import org.codehaus.jackson.map.ObjectMapper
class UserSerializer extends Serializer[User]{
override def configure(map: util.Map[String, _], b: Boolean): Unit = {
}
override def serialize(s: String, t: User): Array[Byte] = {
if(t==null)
null
else
{
val objectMapper = new ObjectMapper()
objectMapper.writeValueAsString(t).getBytes
}
}
override def close(): Unit = {
}
}
3. Create User deserializer class by extending Kafka Deserializer
package com.sparkbyexamples.kafka.jackson
import java.util
import com.sparkbyexamples.kafka.beans.User
import org.apache.kafka.common.serialization.Deserializer
import org.codehaus.jackson.map.ObjectMapper
class UserDeserializer extends Deserializer[User] {
override def configure(map: util.Map[String, _], b: Boolean): Unit = {
}
override def deserialize(s: String, bytes: Array[Byte]): User = {
val mapper = new ObjectMapper()
val user = mapper.readValue(bytes, classOf[User])
user
}
override def close(): Unit = {
}
}
4. Create a Kafka consumer and use UserDeserializer for “value.deserializer” property.
package com.sparkbyexamples.kafka.jackson
import java.util.Properties
import com.sparkbyexamples.kafka.beans.User
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._
object KafkaConsumerWithUserObject extends App {
val prop:Properties = new Properties()
prop.put("group.id", "test")
prop.put("bootstrap.servers","192.168.1.100:9092") prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
prop.put("value.deserializer","com.nelamalli.kafka.jackson.UserDeserializer")
prop.put("enable.auto.commit", "true")
prop.put("auto.commit.interval.ms", "1000")
val consumer = new KafkaConsumer[String,User](prop)
val topics = List("user_user")
try{
consumer.subscribe(topics.asJava)
while(true){
val records = consumer.poll(10)
for(record e.printStackTrace()
}finally {
consumer.close()
}
}
5. Create a Kafka producer and use UserSerializer for “value.serializer” property.
package com.sparkbyexamples.kafka.jackson
import java.util.Properties
import com.sparkbyexamples.kafka.beans.User
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object KafkaProducerWithUserObject {
val props:Properties = new Properties()
props.put("bootstrap.servers","192.168.1.100:9092")
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer","com.sparkbyexamples.kafka.jackson.UserSerializer")
props.put("acks","all")
val producer = new KafkaProducer[String, User](props)
try{
for(i e.printStackTrace()
}finally {
producer.close()
}
}
6. Run KafkaConsumerWithUserObject
7. Run KafkaProducerWithUserObject
The complete code can be downloaded from GitHub
Can you post some examples using avro schema registry in a consume application