The Magical Rebalance Protocol of Apache Kafka

  • Very useful (and dense) talk that goes into the details of Kafka consumer groups.
  • Production/consumption always occurs against the leader replica (for a partition); all other replicas are simply standbys.
  • The ~consumer_offsets~ topic contains offsets for each connected consumer (group), which provides fault tolerance.
  • Consumers in a group figure out what partitions to consume using a homegrown protocol.
    • This first used Zookeeper, which couldn’t scale (expected load is 30-50k consumer groups, with 100+ consumers per group)
    • v2 was a custom protocol that had a broker make all the assignments, which was unsuitable
    • v3 (current) moves this responsibility over to a consumer in each group
  • Protocol steps:
    • FindCoordinator: hash group id, modulo no. of partitions in ~consumer_offsets~ to get a partition. leader of this partition is the coordinator (guaranteed to be live)
    • JoinGroup: agree on an assignment algorithm + receive a member id + choose one consumer as the leader (linked list of consumers in the order they joined; head of the list is the leader, remove when a consumer dies)
    • SyncGroup: leader decides the paritition assignment and tells the coordinator, which gives this assigment to all other consumers
    • Heartbeat: consumers tell the coordinator they’re still alive. Response can be just OK, or a command to rebalance, which is triggered when a consumer enters/leaves the group, topic metadata changes, etc.
    • LeaveGroup: consumer deliberately leaves the group
  • Can specify multiple assignment strategies when joining a group, in priority order. This is useful for upgrades, when each consumer may support a different set of assignment strategies. The coordinator settles on the highest-priority strategy that all consumers can support.
  • As far as I can tell, consumers don’t talk directly to each other
  • Override ~AbstractCoordinator~ to implement a custom coordination (at the client level, not the broker) strategy (essentially customize consumer behavior for each of the 5 steps above)
  • Create a new ~ConsumerPartitionAssignor~ to implement just a custom partition assignment strategy
  • There was some explanation of ways this protocol is used /outside/ the core use case towards the end, but went by too fast for me to actually grasp:
    • The schema registry overrides ~AbstractCoordinator~ to provide a leader election system (does this depend on a Kafka broker or is it entirely standalone?)
    • Kafka connect uses this to assign generic tasks (copy table X from a given source database, for example) to workers, so there’s some scope to use this more generically, not just with partitions.
    • Kafka streams uses a custom assignor to have a consumer join data from two source topics. This is possible because the protocol allows for user metadata, which this strategy (ab)uses.
      • RocksDB for local state, but I didn’t really understand the context/motivation
  • I’m curious about fault-tolerance strategies for when the coordinator goes down, which this talk didn’t cover.
Edit