123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339 |
- package kafka
- import (
- "sort"
- )
- // GroupMember describes a single participant in a consumer group.
- type GroupMember struct {
- // ID is the unique ID for this member as taken from the JoinGroup response.
- ID string
- // Topics is a list of topics that this member is consuming.
- Topics []string
- // UserData contains any information that the GroupBalancer sent to the
- // consumer group coordinator.
- UserData []byte
- }
- // GroupMemberAssignments holds MemberID => topic => partitions
- type GroupMemberAssignments map[string]map[string][]int
- // GroupBalancer encapsulates the client side rebalancing logic
- type GroupBalancer interface {
- // ProtocolName of the GroupBalancer
- ProtocolName() string
- // UserData provides the GroupBalancer an opportunity to embed custom
- // UserData into the metadata.
- //
- // Will be used by JoinGroup to begin the consumer group handshake.
- //
- // See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-JoinGroupRequest
- UserData() ([]byte, error)
- // DefineMemberships returns which members will be consuming
- // which topic partitions
- AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments
- }
- // RangeGroupBalancer groups consumers by partition
- //
- // Example: 5 partitions, 2 consumers
- // C0: [0, 1, 2]
- // C1: [3, 4]
- //
- // Example: 6 partitions, 3 consumers
- // C0: [0, 1]
- // C1: [2, 3]
- // C2: [4, 5]
- //
- type RangeGroupBalancer struct{}
- func (r RangeGroupBalancer) ProtocolName() string {
- return "range"
- }
- func (r RangeGroupBalancer) UserData() ([]byte, error) {
- return nil, nil
- }
- func (r RangeGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments {
- groupAssignments := GroupMemberAssignments{}
- membersByTopic := findMembersByTopic(members)
- for topic, members := range membersByTopic {
- partitions := findPartitions(topic, topicPartitions)
- partitionCount := len(partitions)
- memberCount := len(members)
- for memberIndex, member := range members {
- assignmentsByTopic, ok := groupAssignments[member.ID]
- if !ok {
- assignmentsByTopic = map[string][]int{}
- groupAssignments[member.ID] = assignmentsByTopic
- }
- minIndex := memberIndex * partitionCount / memberCount
- maxIndex := (memberIndex + 1) * partitionCount / memberCount
- for partitionIndex, partition := range partitions {
- if partitionIndex >= minIndex && partitionIndex < maxIndex {
- assignmentsByTopic[topic] = append(assignmentsByTopic[topic], partition)
- }
- }
- }
- }
- return groupAssignments
- }
- // RoundrobinGroupBalancer divides partitions evenly among consumers
- //
- // Example: 5 partitions, 2 consumers
- // C0: [0, 2, 4]
- // C1: [1, 3]
- //
- // Example: 6 partitions, 3 consumers
- // C0: [0, 3]
- // C1: [1, 4]
- // C2: [2, 5]
- //
- type RoundRobinGroupBalancer struct{}
- func (r RoundRobinGroupBalancer) ProtocolName() string {
- return "roundrobin"
- }
- func (r RoundRobinGroupBalancer) UserData() ([]byte, error) {
- return nil, nil
- }
- func (r RoundRobinGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments {
- groupAssignments := GroupMemberAssignments{}
- membersByTopic := findMembersByTopic(members)
- for topic, members := range membersByTopic {
- partitionIDs := findPartitions(topic, topicPartitions)
- memberCount := len(members)
- for memberIndex, member := range members {
- assignmentsByTopic, ok := groupAssignments[member.ID]
- if !ok {
- assignmentsByTopic = map[string][]int{}
- groupAssignments[member.ID] = assignmentsByTopic
- }
- for partitionIndex, partition := range partitionIDs {
- if (partitionIndex % memberCount) == memberIndex {
- assignmentsByTopic[topic] = append(assignmentsByTopic[topic], partition)
- }
- }
- }
- }
- return groupAssignments
- }
- // RackAffinityGroupBalancer makes a best effort to pair up consumers with
- // partitions whose leader is in the same rack. This strategy can have
- // performance benefits by minimizing round trip latency between the consumer
- // and the broker. In environments where network traffic across racks incurs
- // charges (such as cross AZ data transfer in AWS), this strategy is also a cost
- // optimization measure because it keeps network traffic within the local rack
- // where possible.
- //
- // The primary objective is to spread partitions evenly across consumers with a
- // secondary focus on maximizing the number of partitions where the leader and
- // the consumer are in the same rack. For best affinity, it's recommended to
- // have a balanced spread of consumers and partition leaders across racks.
- //
- // This balancer requires Kafka version 0.10.0.0+ or later. Earlier versions do
- // not return the brokers' racks in the metadata request.
- type RackAffinityGroupBalancer struct {
- // Rack is the name of the rack where this consumer is running. It will be
- // communicated to the consumer group leader via the UserData so that
- // assignments can be made with affinity to the partition leader.
- Rack string
- }
- func (r RackAffinityGroupBalancer) ProtocolName() string {
- return "rack-affinity"
- }
- func (r RackAffinityGroupBalancer) AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments {
- membersByTopic := make(map[string][]GroupMember)
- for _, m := range members {
- for _, t := range m.Topics {
- membersByTopic[t] = append(membersByTopic[t], m)
- }
- }
- partitionsByTopic := make(map[string][]Partition)
- for _, p := range partitions {
- partitionsByTopic[p.Topic] = append(partitionsByTopic[p.Topic], p)
- }
- assignments := GroupMemberAssignments{}
- for topic := range membersByTopic {
- topicAssignments := r.assignTopic(membersByTopic[topic], partitionsByTopic[topic])
- for member, parts := range topicAssignments {
- memberAssignments, ok := assignments[member]
- if !ok {
- memberAssignments = make(map[string][]int)
- assignments[member] = memberAssignments
- }
- memberAssignments[topic] = parts
- }
- }
- return assignments
- }
- func (r RackAffinityGroupBalancer) UserData() ([]byte, error) {
- return []byte(r.Rack), nil
- }
- func (r *RackAffinityGroupBalancer) assignTopic(members []GroupMember, partitions []Partition) map[string][]int {
- zonedPartitions := make(map[string][]int)
- for _, part := range partitions {
- zone := part.Leader.Rack
- zonedPartitions[zone] = append(zonedPartitions[zone], part.ID)
- }
- zonedConsumers := make(map[string][]string)
- for _, member := range members {
- zone := string(member.UserData)
- zonedConsumers[zone] = append(zonedConsumers[zone], member.ID)
- }
- targetPerMember := len(partitions) / len(members)
- remainder := len(partitions) % len(members)
- assignments := make(map[string][]int)
- // assign as many as possible in zone. this will assign up to partsPerMember
- // to each consumer. it will also prefer to allocate remainder partitions
- // in zone if possible.
- for zone, parts := range zonedPartitions {
- consumers := zonedConsumers[zone]
- if len(consumers) == 0 {
- continue
- }
- // don't over-allocate. cap partition assignments at the calculated
- // target.
- partsPerMember := len(parts) / len(consumers)
- if partsPerMember > targetPerMember {
- partsPerMember = targetPerMember
- }
- for _, consumer := range consumers {
- assignments[consumer] = append(assignments[consumer], parts[:partsPerMember]...)
- parts = parts[partsPerMember:]
- }
- // if we had enough partitions for each consumer in this zone to hit its
- // target, attempt to use any leftover partitions to satisfy the total
- // remainder by adding at most 1 partition per consumer.
- leftover := len(parts)
- if partsPerMember == targetPerMember {
- if leftover > remainder {
- leftover = remainder
- }
- if leftover > len(consumers) {
- leftover = len(consumers)
- }
- remainder -= leftover
- }
- // this loop covers the case where we're assigning extra partitions or
- // if there weren't enough to satisfy the targetPerMember and the zoned
- // partitions didn't divide evenly.
- for i := 0; i < leftover; i++ {
- assignments[consumers[i]] = append(assignments[consumers[i]], parts[i])
- }
- parts = parts[leftover:]
- if len(parts) == 0 {
- delete(zonedPartitions, zone)
- } else {
- zonedPartitions[zone] = parts
- }
- }
- // assign out remainders regardless of zone.
- var remaining []int
- for _, partitions := range zonedPartitions {
- remaining = append(remaining, partitions...)
- }
- for _, member := range members {
- assigned := assignments[member.ID]
- delta := targetPerMember - len(assigned)
- // if it were possible to assign the remainder in zone, it's been taken
- // care of already. now we will portion out any remainder to a member
- // that can take it.
- if delta >= 0 && remainder > 0 {
- delta++
- remainder--
- }
- if delta > 0 {
- assignments[member.ID] = append(assigned, remaining[:delta]...)
- remaining = remaining[delta:]
- }
- }
- return assignments
- }
- // findPartitions extracts the partition ids associated with the topic from the
- // list of Partitions provided
- func findPartitions(topic string, partitions []Partition) []int {
- var ids []int
- for _, partition := range partitions {
- if partition.Topic == topic {
- ids = append(ids, partition.ID)
- }
- }
- return ids
- }
- // findMembersByTopic groups the memberGroupMetadata by topic
- func findMembersByTopic(members []GroupMember) map[string][]GroupMember {
- membersByTopic := map[string][]GroupMember{}
- for _, member := range members {
- for _, topic := range member.Topics {
- membersByTopic[topic] = append(membersByTopic[topic], member)
- }
- }
- // normalize ordering of members to enabling grouping across topics by partitions
- //
- // Want:
- // C0 [T0/P0, T1/P0]
- // C1 [T0/P1, T1/P1]
- //
- // Not:
- // C0 [T0/P0, T1/P1]
- // C1 [T0/P1, T1/P0]
- //
- // Even though the later is still round robin, the partitions are crossed
- //
- for _, members := range membersByTopic {
- sort.Slice(members, func(i, j int) bool {
- return members[i].ID < members[j].ID
- })
- }
- return membersByTopic
- }
- // findGroupBalancer returns the GroupBalancer with the specified protocolName
- // from the slice provided
- func findGroupBalancer(protocolName string, balancers []GroupBalancer) (GroupBalancer, bool) {
- for _, balancer := range balancers {
- if balancer.ProtocolName() == protocolName {
- return balancer, true
- }
- }
- return nil, false
- }
|