Spring Cloud - Streams with Apache Kafka

Introduction

In a distributed environment, services need to communicate with each other. The communication can either happen synchronously or asynchronously. In this section, we will look at how services can communicate by asynchronously using message brokers.

Two major benefits of performing asynchronous communication −

  • Producer and Consumer speed can differ − If the consumer of the data is slow or fast, it does not affect the producer processing and vice versa. Both can work at their own individual speeds without affecting each other.
  • Producer does not need to handle requests from various consumers − There maybe multiple consumers who want to read the same set of data from the producer. With a message broker in between, the producer does not need to take care of the load these consumers generate. Plus, any outages at producer level would not block the consumer from reading older producer data, as this data would be available in the message brokers.

Apache Kafka and RabbitMQ are two well-known message brokers used for making asynchronous communication. In this tutorial, we will use Apache Kafka.

Kafka – Dependency Setting

Let’s use the case of Restaurant that we have been using earlier. So, let us say we have our Customer Service and the Restaurant Service communicating via asynchronous communication. To do that, we will use Apache Kafka. And we will need to use that in both services, i.e., Customer Service and Restaurant Service.

To use Apache Kafka, we will update the POM of both services and add the following dependency.


<dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

We also need to have Kafka instances running. There are multiple ways through which it can be done, but we will prefer starting Kafka using Docker container. Here are a few images we can consider using −


Whichever image we use, the important thing here to note is that once the image is up and running, please ensure that the Kafka cluster is accessible at localhost:9092

Now that we have the Kafka cluster running on our image, let's move to the core example.

Binding & Binders

There are three important concepts when it comes to Spring Cloud streams −

  • External Messaging System − This is the component which is managed externally and is responsible to store the events/messages produced by the application that can be read by their subscriber/consumer. Note that this is not managed within the app/Spring. Few examples being Apache Kafka, RabbitMQ
  • Binders − This is the component which provides integration with messaging system, for example, consisting of IP address of messaging system, authentication, etc.
  • Bindings − This component uses the Binders to produce messages to the messaging system or consume the message from a specific topic/queue.

All the above properties are defined in the application properties file.

Example

Let us use the case of Restaurant that we have been using earlier. So, let us suppose whenever a new service is added to the Customer Service, we want to notify the customer info to the nearby Restaurants about him/her.

For this purpose, let us update our Customer Service first to include and use Kafka. Note that we will use Customer Service as a producer of the data. That is, whenever we add the Customer via API, it will also be added to the Kafka.

spring:
   application:
      name: customer-service
   cloud:
      stream:
         source: customerBinding-out-0
         kafka:
            binder:
            brokers: localhost:9092
            replicationFactor: 1
      bindings:
         customerBinding-out-0:
            destination: customer
            producer:
               partitionCount: 3

server:
   port: ${app_port}
eureka:
   client:
      serviceURL:
         defaultZone: http://localhost:8900/eureka

Points to note −

  • We have defined a binder with the address of our local Kafka instances.
  • We have also defined the binding ‘customerBinding-out-0’ which uses ‘customer’ topic to output the messages in.
  • We have also mentioned our binding in the stream.source so that we can imperatively use that in our code.

Once this is done, let us now update our controller by adding a new method ‘addCustomer’ which is responsible to serve the POST request. And then, from the post request, we send the data to the Kafka Broker.

package com.javahubpoint;
import java.util.HashMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
 
@RestController
class RestaurantCustomerInstancesController {
   @Autowired
   private StreamBridge streamBridge;
   static HashMap<Long, Customer> mockCustomerData = new HashMap();

   static{
      mockCustomerData.put(1L, new Customer(1, "Jane", "DC"));
      mockCustomerData.put(2L, new Customer(2, "John", "SFO"));
      mockCustomerData.put(3L, new Customer(3, "Kate", "NY"));
   }

   @RequestMapping("/customer/{id}")
   public Customer getCustomerInfo(@PathVariable("id") Long id) {
      System.out.println("Querying customer for id with: " + id);
      return mockCustomerData.get(id);
   }

   @RequestMapping(path = "/customer/{id}", method = RequestMethod.POST)
   public Customer addCustomer(@PathVariable("id") Long id) {
      // add default name
      Customer defaultCustomer = new Customer(id, "Dwayne", "NY");
      streamBridge.send("customerBinding-out-0", defaultCustomer);
      return defaultCustomer;
   }
}

Points to note

  • We are Autowiring StreamBridge which is what we will use to send the messages.
  • The parameters we use in the ‘send’ method also specify the binding we want to use to send the data to.

Now let us update our Restaurant Service to include and subscribe to ‘customer’ topic. Note that we will use Restaurant Service as a consumer of the data. That is, whenever we add the Customer via API, the Restaurant Service would come to know about it via Kafka.

First, let us update the application properties file.


spring:
   application:
      name: restaurant-service
   cloud:
      function:
         definition: customerBinding
      stream:
         kafka:
            binder:
               brokers: localhost:9092
               replicationFactor: 1
            bindings:
               customerBinding-in-0:
               destination: customer

server:
   port: ${app_port}
eureka:
   client:
      serviceURL:
         defaultZone: http://localhost:8900/eureka

Once this is done, let us now update our controller by adding a new method ‘customerBinding’ which is responsible to fetch the request and provide a function which will print the request along with its metadata details.

package com.javahubpoint;
import java.util.HashMap;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
 
@RestController
class RestaurantController {
   @Autowired
   CustomerService customerService;
   @Autowired
   private StreamBridge streamBridge;
   static HashMap<Long, Restaurant> mockRestaurantData = new HashMap();

   static{
      mockRestaurantData.put(1L, new Restaurant(1, "Pandas", "DC"));
      mockRestaurantData.put(2L, new Restaurant(2, "Indies", "SFO"));
      mockRestaurantData.put(3L, new Restaurant(3, "Little Italy", "DC"));
      mockRestaurantData.put(4L, new Restaurant(4, "Pizeeria", "NY"));
   }

   @RequestMapping("/restaurant/customer/{id}")
   public List<Restaurant> getRestaurantForCustomer(@PathVariable("id") Long id) {
      System.out.println("Got request for customer with id: " + id);
      String customerCity = customerService.getCustomerById(id).getCity();
      return mockRestaurantData.entrySet().stream().filter(
entry -> entry.getValue().getCity().equals(customerCity))
.map(entry -> entry.getValue())
.collect(Collectors.toList());
   }

   @RequestMapping("/restaurant/cust/{id}")
   public void getRestaurantForCust(@PathVariable("id") Long id) {
      streamBridge.send("ordersBinding-out-0", id);
   }

   @Bean
   public Consumer<Message<Customer>> customerBinding() {
      return msg -> {
         System.out.println(msg);
      };
   }
}

Points to note −

  • We are using ‘customerBinding’ which is supposed to pass on the function which would be called when a message arrives for this binding.
  • The name that we use for this function/bean also needs to be used in the YAML file while creating the bundling and specifying the topic.

Now, let us execute the above code as always, start the Eureka Server. Note that this is not hard requirement and is present here for the sake of completeness.

Then, let us compile and start updating Customer Service using the following command −

mvn clean install ; java -Dapp_port=8083 -jar .\target\spring-cloud-eurekaclient-
1.0.jar --spring.config.location=classpath:application-kafka.yml


Then, let us compile and start updating Restaurant Service using the following command −

mvn clean install; java -Dapp_port=8082 -jar .\target\spring-cloud-feign-client-
1.0.jar --spring.config.location=classpath:application-kafka.yml


And we are set, let us now test our code pieces by hitting the API −

curl -X POST http://localhost:8083/customer/1

Here is the output that we will get for this API −

{
   "id": 1,
   "name": "Dwayne",
   "city": "NY"
}

And now, let us check the logs for the Restaurant Service −

GenericMessage [payload=Customer [id=1, name=Dwayne, city=NY],
headers={kafka_offset=1,...

So, effectively, you see that using Kafka Broker, Restaurant Service was notified about the newly added Customer.

Partitions & Consumer Groups

Partitions and Consumer Groups are two important concepts that you should be aware of while using Spring Cloud streams.

Partitions − They are used to partition the data so that we can divide the work between multiple consumers.

Let us see how to partition the data in Spring Cloud. Say, we want to partition the data based on the Customer ID. So, let us update our Customer Service for the same. 

Let us update our Customer Service application property to specify the key for our data.


spring:
   application:
      name: customer-service
   cloud:
      function:
         definition: ordersBinding
      stream:
         source: customerBinding-out-0
         kafka:
            binder:
               brokers: localhost:9092
               replicationFactor: 1
         bindings:
            customerBinding-out-0:
               destination: customer
               producer:
                  partitionKeyExpression: 'getPayload().getId()'
                  partitionCount: 3
server:
   port: ${app_port}
eureka:
   client:
      serviceURL:
         defaultZone: http://localhost:8900/eureka

For specifying the key, i.e., “partitionKeyExpression” we provide Spring Expression Language. The expression assumes the type as GenericMessage<Customer> since we are sending the Customer data in the message. Note that GenericMessage is the Spring Framework class used for wrapping the payload and the headers in a single object. So, we get the payload from this message which is of the type Customer and then we call the getId() method on the customer.

Now, let us also update our consumer, i.e., the Restaurant Service to log more info while consuming the request.

Now, let us execute the above code as always, start the Eureka Server. Note that this is not a hard requirement and is present here for the sake of completeness.

Then, let us compile and start updating Customer Service using the following command −

mvn clean install ; java -Dapp_port=8083 -jar .\target\spring-cloud-eurekaclient-
1.0.jar --spring.config.location=classpath:application-kafka.yml


Then, let us compile and start updating Restaurant Service using the following command −

mvn clean install; java -Dapp_port=8082 -jar .\target\spring-cloud-feign-client-
1.0.jar --spring.config.location=classpath:application-kafka.yml

And we are set, let us now test our code pieces. As part of testing, here is what we will do −

  • Insert a customer with Id 1: curl -X POST http://localhost:8083/customer/1
  • Insert a customer with Id 1: curl -X POST http://localhost:8083/customer/1
  • Insert a customer with Id 1: curl -X POST http://localhost:8083/customer/5
  • Insert a customer with Id 1: curl -X POST http://localhost:8083/customer/3
  • Insert a customer with Id 1: curl -X POST http://localhost:8083/customer/1

We do not care much about the output of the API. Rather, we care more about the partition to which the data is sent to. Since we are using customer ID as the key, we expect that the customer with the same ID would end up in the same partition.

And now, let us check the logs for the Restaurant Service −

Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400

Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323

Partition Id: 1

Customer: Customer [id=1, name=Dwayne, city=NY]

Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400

Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323

Partition Id: 1

Customer: Customer [id=1, name=Dwayne, city=NY]

Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400

Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323

Partition Id: 2

Customer: Customer [id=5, name=Dwayne, city=NY]

Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400

Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323

Partition Id: 0

Customer: Customer [id=3, name=Dwayne, city=NY]

Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323

Partition Id: 1

Customer: Customer [id=1, name=Dwayne, city=NY]

So, as we see, Customer with Id 1 ended up in the same partition every time, i.e., partition 1.


Consumer Group − A consumer group is the logical grouping of consumers reading the same topic for the same purpose. Data in a topic is partitioned between the consumers in a consumer group so that only one consumer from a given consumer group can read a partition of a topic.

To define a consumer group, all we need to do is define a group in the bindings where we use the Kafka topic name. For example, let us define the consumer group name in our application file for our controller.


spring:
   application:
      name: restaurant-service
   cloud:
      function:
         definition: customerBinding
      stream:
         kafka:
            binder:
               brokers: localhost:9092
               replicationFactor: 1
            bindings:
               customerBinding-in-0:
               destination: customer
               group: restController
server:
   port: ${app_port}
eureka:
   client:
      serviceURL:
         defaultZone: http://localhost:8900/eureka

Let us recompile and start the Restaurant Service. Now, let us generate the event by hitting the POST API on the Customer Service −

Insert a customer with Id 1: curl -X POST http://localhost:8083/customer/1

Now, if we check the logs of our Restaurant Service, we will see the following −

Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400

Consumer Group: restContoller

Partition Id: 1

Customer: Customer [id=1, name=Dwayne, city=NY]

So, as we see from the output, we have a consumer group called ‘rest-contoller’ created, whose consumers are responsible to read the topics. In the above case, we just had a single instance of the service running, so all the partition of the ‘customer’ topic was assigned to the same instance. But, if we have multiple partitions, we will have partitions distributed among the workers.