We will write two applications that will exchange messages through Apache Kafka. The first message will be called kafka-server and will contain both the producer and the consumer. The second will be called kafka-tester, it is designed to make a microservice architecture.

kafka-server

For our projects created through the Spring Initializr, we need the Kafka module. I added Lombok and Web.

The Kafka client consists of two components – the producer (it sends messages to the Kafka server) and the consumer (it listens to the Kafka server and takes new messages from there on the topics it is subscribed to). Our task is to write both components and make them work.

Consumer:

@Configuration public class KafkaConsumerConfig { @Value(“${kafka.server}”) private String kafkaServer; @Value(“${kafka.group.id}”) private String kafkaGroupId; @Bean public KafkaListenerContainerFactory<?> batchFactory() { ConcurrentKafkaListenerContainerFactory<Long, AbstractDto> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); factory.setMessageConverter(new BatchMessagingMessageConverter(converter())); return factory; } @Bean public KafkaListenerContainerFactory<?> singleFactory() { ConcurrentKafkaListenerContainerFactory<Long, AbstractDto> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(false); factory.setMessageConverter(new StringJsonMessageConverter()); return factory; } @Bean public ConsumerFactory<Long, AbstractDto> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() { return new ConcurrentKafkaListenerContainerFactory<>(); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); return props; } @Bean public StringJsonMessageConverter converter() { return new StringJsonMessageConverter(); }

    @Value (“$ {kafka.server}”)

    private String kafkaServer;

    @Value (“$ {kafka.group.id}”)

    private String kafkaGroupId;

    @Bean

    public KafkaListenerContainerFactory <?> batchFactory () {

        ConcurrentKafkaListenerContainerFactory <Long, AbstractDto> factory =

                new ConcurrentKafkaListenerContainerFactory <> ();

        factory.setConsumerFactory (consumerFactory ());

        factory.setBatchListener (true);

        factory.setMessageConverter (new BatchMessagingMessageConverter (converter ()));

        return factory;

    }

    @Bean

    public KafkaListenerContainerFactory <?> singleFactory () {

        ConcurrentKafkaListenerContainerFactory <Long, AbstractDto> factory =

                new ConcurrentKafkaListenerContainerFactory <> ();

        factory.setConsumerFactory (consumerFactory ());

        factory.setBatchListener (false);

        factory.setMessageConverter (new StringJsonMessageConverter ());

        return factory;

    }

    @Bean

    public ConsumerFactory <Long, AbstractDto> consumerFactory () {

        return new DefaultKafkaConsumerFactory <> (consumerConfigs ());

    }

    @Bean

    public KafkaListenerContainerFactory <?> kafkaListenerContainerFactory () {

        return new ConcurrentKafkaListenerContainerFactory <> ();

    }

    @Bean

    public Map <String, Object> consumerConfigs () {

        Map <String, Object> props = new HashMap <> ();

        props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);

        props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);

        props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        props.put (ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);

        props.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

        return props;

    }

    @Bean

    public StringJsonMessageConverter converter () {

        return new StringJsonMessageConverter ();

    }

}

We need 2 fields initialized with static data from kafka.properties.

kafka.server = localhost: 9092

kafka.group.id = server.broadcast

kafka.server is the address on which our server is, in this case, local. By default, Kafka listens on port 9092.

kafka.group.id is a group of consumers, within which one instance of the message is delivered. For example, you have three couriers in one group, and they all listen to the same topic. As soon as a new message appears on the server with this topic, it is delivered to someone in the group. The remaining two consumers are not receiving the message.

Next, we are creating a factory for consumers – ConsumerFactory.

    @Bean

    public ConsumerFactory <Long, AbstractDto> consumerFactory () {

        return new DefaultKafkaConsumerFactory <> (consumerConfigs ());

    }

Initialized with the properties we need, it will serve as a standard factory for consumers in the future.

    @Bean

    public Map <String, Object> consumerConfigs () {

        Map <String, Object> props = new HashMap <> ();

        props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);

        props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);

        props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        props.put (ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);

        props.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

        return props;

    }

consumerConfigs are just Map configs. We provide the server address, group and deserializers.

Further, one of the most important points for a consumer. The consumer can receive both single objects and collections – for example, both StarshipDto and List. And if we get StarshipDto as JSON, then we get List as, roughly speaking, as a JSON array. Therefore, we have at least two message factories – for single messages and for arrays.

    @Bean

    public KafkaListenerContainerFactory<?> singleFactory() {

        ConcurrentKafkaListenerContainerFactory<Long, AbstractDto> factory =

                new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        factory.setBatchListener(false);

        factory.setMessageConverter(new StringJsonMessageConverter());

        return factory;

    }

We create an instance of ConcurrentKafkaListenerContainerFactory, typed Long (message key) and AbstractDto (abstract message value) and initialize its fields with properties. We, of course, initialize the factory with our standard factory (which already contains Map configs), then we mark that we do not listen to packets (the same arrays) and specify a simple JSON converter as the converter.

When we create a factory for packages / arrays (batch), the main difference (apart from the fact that we mark that we listen to packages) is that we specify as a converter a special package converter that will convert packages consisting of from JSON strings.

    @Bean

    public KafkaListenerContainerFactory <?> batchFactory () {

        ConcurrentKafkaListenerContainerFactory <Long, AbstractDto> factory =

                new ConcurrentKafkaListenerContainerFactory <> ();

        factory.setConsumerFactory (consumerFactory ());

        factory.setBatchListener (true);

        factory.setMessageConverter (new BatchMessagingMessageConverter (converter ()));

        return factory;

    }

    @Bean

    public StringJsonMessageConverter converter () {

        return new StringJsonMessageConverter ();

    }

And one moment. When initializing the Spring beans, the bean under the name kafkaListenerContainerFactory may not be counted and may hinder the launch of the application. Surely there are more elegant options for solving the problem, write about them in the comments, for now I just created a bean not burdened with functionality with the same name:

    @Bean

    public KafkaListenerContainerFactory <?> kafkaListenerContainerFactory () {

        return new ConcurrentKafkaListenerContainerFactory <> ();

    }

The consumer is set up. We pass to the producer.

@Configuration

public class KafkaProducerConfig {

    @Value (“$ {kafka.server}”)

    private String kafkaServer;

    @Value (“$ {kafka.producer.id}”)

    private String kafkaProducerId;

    @Bean

    public Map <String, Object> producerConfigs () {

        Map <String, Object> props = new HashMap <> ();

        props.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);

        props.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);

        props.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        props.put (ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerId);

        return props;

    }

    @Bean

    public ProducerFactory <Long, StarshipDto> producerStarshipFactory () {

        return new DefaultKafkaProducerFactory <> (producerConfigs ());

    }

    @Bean

    public KafkaTemplate <Long, StarshipDto> kafkaTemplate () {

        KafkaTemplate <Long, StarshipDto> template = new KafkaTemplate <> (producerStarshipFactory ());

        template.setMessageConverter (new StringJsonMessageConverter ());

        return template;

    }

}

Of the static variables, we need the address of the kafka server and the producer ID. He can be anything.

In the configs, as we see, there is nothing special. Almost the same. But with regard to factories, there is a significant difference. We must register a template for each class, the objects of which we will send to the server, as well as a factory for it. We have one such pair, but there can be dozens of them.

In the template, we mark that we will serialize the objects in JSON, and this, perhaps, is enough.

We have a consumer and a producer, it remains to write a service that will send messages and receive them.

@Service

@ Slf4j

public class StarshipServiceImpl implements StarshipService {

    private final KafkaTemplate <Long, StarshipDto> kafkaStarshipTemplate;

    private final ObjectMapper objectMapper;

    @Autowired

    public StarshipServiceImpl (KafkaTemplate <Long, StarshipDto> kafkaStarshipTemplate,

                               ObjectMapper objectMapper) {

        this.kafkaStarshipTemplate = kafkaStarshipTemplate;

        this.objectMapper = objectMapper;

    }

    @Override

    public void send (StarshipDto dto) {

        kafkaStarshipTemplate.send (“server.starship”, dto);

    }

    @Override

    @KafkaListener (id = “Starship”, topics = {“server.starship”}, containerFactory = “singleFactory”)

    public void consume (StarshipDto dto) {

        log.info (“=> consumed {}”, writeValueAsString (dto));

    }

    private String writeValueAsString (StarshipDto dto) {

        try {

            return objectMapper.writeValueAsString (dto);

        } catch (JsonProcessingException e) {

            e.printStackTrace ();

            throw new RuntimeException (“Writing value to JSON failed:” + dto.toString ());

        }

    }

}

There are only two methods in our service, they are enough for us to explain the client’s work. We autowire the patterns we need:

    private final KafkaTemplate <Long, StarshipDto> kafkaStarshipTemplate;

Producer Method:

    @Override

    public void send (StarshipDto dto) {

        kafkaStarshipTemplate.send (“server.starship”, dto);

    }

All that is required to send a message to the server is to call the send method on the template and transfer the topic and our object there. The object will be serialized in JSON and will fly to the server under the specified topic.

The listening method looks like this:

    @Override

    @KafkaListener (id = “Starship”, topics = {“server.starship”}, containerFactory = “singleFactory”)

    public void consume (StarshipDto dto) {

        log.info (“=> consumed {}”, writeValueAsString (dto));

    }

We mark this method with @KafkaListener annotation, where we indicate any ID that we like, listened topics and a factory that will convert the received message to what we need. In this case, since we accept one object, we need a singleFactory. For List <?>, Specify batchFactory. As a result, we send the object to the kafka-server using the send method and get it using the consume method.

You can write a test in 5 minutes that will demonstrate the full strength of Kafka, but we will go further – spend 10 minutes and write another application that will send messages to the server that our first application will listen to.

kafka-tester

Having the experience of writing the first application, we can easily write the second, especially if we copy the paste and the dto package, register only the producer (we will only send messages) and add the only send method to the service.

    @Scheduled (initialDelay = 10000, fixedDelay = 5000)

    @Override

    public void produce () {

        StarshipDto dto = createDto ();

        log.info (“<= sending {}”, writeValueAsString (dto));

        kafkaStarshipTemplate.send (“server.starship”, dto);

    }

    private StarshipDto createDto () {

        return new StarshipDto (“Starship” + (LocalTime.now (). toNanoOfDay () / 1000000));

    }

After the first 10 seconds, kafka-tester starts sending messages with the names to the Kafka server every 5 seconds (the picture is clickable).

There they listen and receive kafka-server (the picture is also clickable).

I hope that those who dream of starting to write microservices at Kafka will succeed as easily as I did.