Apache Kafka Producer and Consumer in Scala

Kafka consumer producer

In this Scala & Kafa tutorial, you will learn how to write Kafka messages to Kafka topic (producer) and read messages from topic (consumer) using Scala example; producer sends messages to Kafka topics in the form of records, a record is a key-value pair along with topic name and consumer receives a messages from a topic.

Prerequisites: If you don’t have the Kafka cluster setup, follow the link to set up the single broker cluster.

Table of contents:

Start zookeeper with the default configuration.

ZooKeeper is a high-performance coordination service for distributed applications and Kafka uses ZooKeeper to store the metadata information of the cluster. Kafka comes with the Zookeeper built-in, all we need is to start the service with the default configuration.

bin/zookeeper-server-start.sh config/zookeeper.properties

Start Kafka broker with the default configuration.

A Kafka cluster consists of one or more brokers(Kafka servers) and the broker organizes messages to respective topics and persists all the Kafka messages in a topic log file for 7 days. Depends on your replication factor of the topic, the messages are replicated to multiple brokers.


bin/kafka-server-start.sh config/server.properties

Create a Kafka topic “text_topic”

All Kafka messages are organized into topics and topics are partitioned and replicated across multiple brokers in a cluster. The producer sends messages to topic and consumer reads messages from the topic. The replication factor defines how many copies of the message to be stored and Partitions allow you to parallelize a topic by splitting the data in a particular topic across multiple brokers.

Execute this command to create a topic with replication factor 1 and partition 1 (we have just 1 broker cluster).


bin/kafka-topics.sh --create --zookeeper localhost:2181 \
                  --replication-factor 1 --partitions 1 \
                   --topic text_topic

Kafka Maven Dependency

To work with Kafka we would use the following Kafka client maven dependency. Added this dependency to your scala project.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.0</version>
</dependency>

Kafka Producer Scala example

This Kafka Producer scala example publishes messages to a topic as a Record. Record is a key-value pair where the key is optional and value is mandatory. In this example we have key and value are string hence, we are using StringSerializer. In case if you have a key as a long value then you should use LongSerializer, the same applies for value as-well.


import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object KafkaProducerApp extends App {

  val props:Properties = new Properties()
  props.put("bootstrap.servers","localhost:9092")
  props.put("key.serializer",
         "org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer",
         "org.apache.kafka.common.serialization.StringSerializer")
  props.put("acks","all")
  val producer = new KafkaProducer[String, String](props)
  val topic = "text_topic"
  try {
    for (i <- 0 to 15) {
      val record = new ProducerRecord[String, String](topic, i.toString, "My Site is sparkbyexamples.com " + i)
      val metadata = producer.send(record)
      printf(s"sent record(key=%s value=%s) " +
        "meta(partition=%d, offset=%d)\n",
        record.key(), record.value(), 
        metadata.get().partition(),
        metadata.get().offset())
    }
  }catch{
    case e:Exception => e.printStackTrace()
  }finally {
    producer.close()
  }
}

Producer send method returns metadata where we can find; which partition message has written to and offset.

Kafka Consumer scala example

This Kafka Consumer scala example subscribes to a topic and receives a message (record) that arrives into a topic. This message contains key, value, partition, and off-set. All messages in Kafka are serialized hence, a consumer should use deserializer to convert to the appropriate data type. Here we are using StringDeserializer for both key and value.


import java.util.{Collections, Properties}
import java.util.regex.Pattern
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._
object KafkaConsumerSubscribeApp extends App {

  val props:Properties = new Properties()
  props.put("group.id", "test")
  props.put("bootstrap.servers","localhost:9092")
  props.put("key.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer") 
  props.put("value.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("enable.auto.commit", "true")
  props.put("auto.commit.interval.ms", "1000")
  val consumer = new KafkaConsumer(props)
  val topics = List("topic_text")
  try {
    consumer.subscribe(topics.asJava)
    while (true) {
      val records = consumer.poll(10)
      for (record <- records.asScala) {
        println("Topic: " + record.topic() + 
                 ",Key: " + record.key() +  
                 ",Value: " + record.value() +
                 ", Offset: " + record.offset() + 
                 ", Partition: " + record.partition())
      }
    }
  }catch{
    case e:Exception => e.printStackTrace()
  }finally {
    consumer.close()
  }
}

How to Run Kafka Producer and Consumer?

  1. Run KafkaConsumerSubscribeApp.scala program

When you run this program, it waits for messages to arrive in “text_topic” topic.

2. Run KafkaProducerApp.scala program

Run KafkaProducerApp.scala program which produces messages into “text_topic”. Now, you should see the messages that were produced in the console. And on another console, you should see the messages that are consuming.

The complete code can be downloaded from GitHub

Happy Learning !!

Kafka consumer producer

NNK

SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

Leave a Reply

This Post Has 7 Comments

  1. mahesh

    when implementing kafka acks =all.. do we need to write the response on the same queue of producer or different queue?

  2. J

    Thanks for this, and keep rocking!

  3. Anonymous

    should this be lower case p?

    Props.put(“value.deserializer”,
    “org.apache.kafka.common.serialization.StringDeserializer”)

    1. NNK

      Thanks for reading the article and suggesting a correction. Yes, you are right, it should be a small case. It was a typo and have corrected.

  4. Anonymous

    nice work

    1. NNK

      Thank you !!

You are currently viewing Apache Kafka Producer and Consumer in Scala