Spring Kafka

Consumer

  1. Receiving the messages
    • @KafkaListener
    • 8 types Interfaces that are used to read consumer instances(messges) – based on parameters passed to metthod
      1. MessageLister (only ConsumerRecord<K, V>> data parameter)
      2. ConsumerAwareMessageLister (1 additional parameter : Acknowledgement acknowledgement, Consumer<?, ?> consumer)
      3. AcknowledgingMessageLister (1 additional parameter : Acknowledgement acknowledgement)
      4. ConsumerAwareAcknowledgingMessageLister (2 additional parameter : Acknowledgement acknowledgement & Consumer<?, ?> consumer )
      5. other 4 are batch listener corresponding to above

Descriptions:

  1. Using one of AckListener Interface[Acknowledgement in parameter ]- means manual ack to be sent[using code] – neither managed by container nor auto-commit
  2. Using one of ConsumerListener Interface [ Consumer in parameter ]- means- Access to the Consumer object is provided.
  3. Without Batch listeners means – messages read individually

Statement:(chatgpt for manual ack) “When a Kafka message is fetched by a consumer, its offset is not automatically committed to Kafka. Instead, the consumer is responsible for committing the offset after successfully processing the message

acknowledgment.acknowledge()
//This method signals to Kafka that the message has been //successfully processed and can be marked as "committed."


Message Listener Containers

Two MessageListenerContainer implementations are provided:

  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer

The KafkaMessageListenerContainer receives all message from all topics or partitions on a single thread. The ConcurrentMessageListenerContainer delegates to one or more KafkaMessageListenerContainer instances to provide multi-threaded consumption.

Starting with versions 2.3.8, 2.4.6, the ConcurrentMessageListenerContainer now supports Static Membership when the concurrency is greater than one. The group.instance.id is suffixed with -n with n starting at 1. This, together with an increased session.timeout.ms, can be used to reduce rebalance events, for example, when application instances are restarted.

Note: properties of container:

It also has a concurrency property. For example, container.setConcurrency(3) creates three KafkaMessageListenerContainer instances.

points:

  1. assignment of consumer to partition – based on constructor used
  2. consition for consurrency- if topic is 3 and partiton is 5 for each – you might tend to use 15 as concurrency but it’s 5 you should be – see the explanation in doc since other 10 are idle for specific congdition
    • Not to use -default Kafka PartitionAssignor is the RangeAssignor – assign 5 instead of 15
      • .property that controls consumer assignment to partition/topic- (ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)

Starting with version 2.3, the ContainerProperties provides an idleBetweenPolls option to let the main loop in the listener container to sleep between KafkaConsumer.poll() calls. An actual sleep interval is selected as the minimum from the provided option and difference between the max.poll.interval.ms consumer config and the current records batch processing time.

Committing Offsets

The consumer poll() method returns one or more ConsumerRecords. The MessageListener is called for each record. The following lists describes the action taken by the container for each AckMode (when transactions are not being used):

  • RECORD: Commit the offset when the listener returns after processing the record.
  • BATCH: Commit the offset when all the records returned by the poll() have been processed.
  • TIME: Commit the offset when all the records returned by the poll() have been processed, as long as the ackTime since the last commit has been exceeded.
  • COUNT: Commit the offset when all the records returned by the poll() have been processed, as long as ackCount records have been received since the last commit.
  • COUNT_TIME: Similar to TIME and COUNT, but the commit is performed if either condition is true.
  • MANUAL: The message listener is responsible to acknowledge() the Acknowledgment. After that, the same semantics as BATCH are applied.
  • MANUAL_IMMEDIATE: Commit the offset immediately when the Acknowledgment.acknowledge() method is called by the listener.


Published by

Unknown's avatar

sevanand yadav

software engineer working as web developer having specialization in spring MVC with mysql,hibernate

Leave a comment