Very useful (and dense) talk that goes into the details of Kafka consumer groups.
Production/consumption always occurs against the leader replica (for a partition); all other replicas are simply standbys.
The ~consumer_offsets~ topic contains offsets for each connected consumer (group), which provides fault tolerance.
Consumers in a group figure out what partitions to consume using a homegrown protocol.
This first used Zookeeper, which couldn’t scale (expected load is 30-50k consumer groups, with 100+ consumers per group)
v2 was a custom protocol that had a broker make all the assignments, which was unsuitable
v3 (current) moves this responsibility over to a consumer in each group
Protocol steps:
FindCoordinator: hash group id, modulo no. of partitions in ~consumer_offsets~ to get a partition. leader of this partition is the coordinator (guaranteed to be live)
JoinGroup: agree on an assignment algorithm + receive a member id + choose one consumer as the leader (linked list of consumers in the order they joined; head of the list is the leader, remove when a consumer dies)
SyncGroup: leader decides the paritition assignment and tells the coordinator, which gives this assigment to all other consumers
Heartbeat: consumers tell the coordinator they’re still alive. Response can be just OK, or a command to rebalance, which is triggered when a consumer enters/leaves the group, topic metadata changes, etc.
LeaveGroup: consumer deliberately leaves the group
Can specify multiple assignment strategies when joining a group, in priority order. This is useful for upgrades, when each consumer may support a different set of assignment strategies. The coordinator settles on the highest-priority strategy that all consumers can support.
As far as I can tell, consumers don’t talk directly to each other
Override ~AbstractCoordinator~ to implement a custom coordination (at the client level, not the broker) strategy (essentially customize consumer behavior for each of the 5 steps above)
Create a new ~ConsumerPartitionAssignor~ to implement just a custom partition assignment strategy
There was some explanation of ways this protocol is used /outside/ the core use case towards the end, but went by too fast for me to actually grasp:
The schema registry overrides ~AbstractCoordinator~ to provide a leader election system (does this depend on a Kafka broker or is it entirely standalone?)
Kafka connect uses this to assign generic tasks (copy table X from a given source database, for example) to workers, so there’s some scope to use this more generically, not just with partitions.
Kafka streams uses a custom assignor to have a consumer join data from two source topics. This is possible because the protocol allows for user metadata, which this strategy (ab)uses.
RocksDB for local state, but I didn’t really understand the context/motivation
I’m curious about fault-tolerance strategies for when the coordinator goes down, which this talk didn’t cover.