Consumer
- Receiving the messages
- @KafkaListener
- 8 types Interfaces that are used to read consumer instances(messges) – based on parameters passed to metthod
- MessageLister (only ConsumerRecord<K, V>> data parameter)
- ConsumerAwareMessageLister (1 additional parameter : Acknowledgement acknowledgement, Consumer<?, ?> consumer)
- AcknowledgingMessageLister (1 additional parameter : Acknowledgement acknowledgement)
- ConsumerAwareAcknowledgingMessageLister (2 additional parameter : Acknowledgement acknowledgement & Consumer<?, ?> consumer )
- other 4 are batch listener corresponding to above

Descriptions:
- Using one of AckListener Interface[Acknowledgement in parameter ]- means manual ack to be sent[using code] – neither managed by container nor auto-commit
- Using one of ConsumerListener Interface [ Consumer in parameter ]- means- Access to the
Consumerobject is provided. - Without Batch listeners means – messages read individually
Statement:(chatgpt for manual ack) “When a Kafka message is fetched by a consumer, its offset is not automatically committed to Kafka. Instead, the consumer is responsible for committing the offset after successfully processing the message“
acknowledgment.acknowledge()
//This method signals to Kafka that the message has been //successfully processed and can be marked as "committed."
Message Listener Containers
Two MessageListenerContainer implementations are provided:
KafkaMessageListenerContainerConcurrentMessageListenerContainer
The KafkaMessageListenerContainer receives all message from all topics or partitions on a single thread. The ConcurrentMessageListenerContainer delegates to one or more KafkaMessageListenerContainer instances to provide multi-threaded consumption.
Starting with versions 2.3.8, 2.4.6, the ConcurrentMessageListenerContainer now supports Static Membership when the concurrency is greater than one. The group.instance.id is suffixed with -n with n starting at 1. This, together with an increased session.timeout.ms, can be used to reduce rebalance events, for example, when application instances are restarted.
Note: properties of container:
It also has a concurrency property. For example, container.setConcurrency(3) creates three KafkaMessageListenerContainer instances.
points:
- assignment of consumer to partition – based on constructor used
- consition for consurrency- if topic is 3 and partiton is 5 for each – you might tend to use 15 as concurrency but it’s 5 you should be – see the explanation in doc since other 10 are idle for specific congdition
- Not to use -default Kafka
PartitionAssignoris theRangeAssignor– assign 5 instead of 15- .property that controls consumer assignment to partition/topic- (
ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)
- .property that controls consumer assignment to partition/topic- (
- Not to use -default Kafka
Starting with version 2.3, the ContainerProperties provides an idleBetweenPolls option to let the main loop in the listener container to sleep between KafkaConsumer.poll() calls. An actual sleep interval is selected as the minimum from the provided option and difference between the max.poll.interval.ms consumer config and the current records batch processing time.
Committing Offsets
The consumer poll() method returns one or more ConsumerRecords. The MessageListener is called for each record. The following lists describes the action taken by the container for each AckMode (when transactions are not being used):
RECORD: Commit the offset when the listener returns after processing the record.BATCH: Commit the offset when all the records returned by thepoll()have been processed.TIME: Commit the offset when all the records returned by thepoll()have been processed, as long as theackTimesince the last commit has been exceeded.COUNT: Commit the offset when all the records returned by thepoll()have been processed, as long asackCountrecords have been received since the last commit.COUNT_TIME: Similar toTIMEandCOUNT, but the commit is performed if either condition istrue.MANUAL: The message listener is responsible toacknowledge()theAcknowledgment. After that, the same semantics asBATCHare applied.MANUAL_IMMEDIATE: Commit the offset immediately when theAcknowledgment.acknowledge()method is called by the listener.