123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339 |
- package kafka
- import (
- "sort"
- )
- type GroupMember struct {
-
- ID string
-
- Topics []string
-
-
- UserData []byte
- }
- type GroupMemberAssignments map[string]map[string][]int
- type GroupBalancer interface {
-
- ProtocolName() string
-
-
-
-
-
-
- UserData() ([]byte, error)
-
-
- AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments
- }
- type RangeGroupBalancer struct{}
- func (r RangeGroupBalancer) ProtocolName() string {
- return "range"
- }
- func (r RangeGroupBalancer) UserData() ([]byte, error) {
- return nil, nil
- }
- func (r RangeGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments {
- groupAssignments := GroupMemberAssignments{}
- membersByTopic := findMembersByTopic(members)
- for topic, members := range membersByTopic {
- partitions := findPartitions(topic, topicPartitions)
- partitionCount := len(partitions)
- memberCount := len(members)
- for memberIndex, member := range members {
- assignmentsByTopic, ok := groupAssignments[member.ID]
- if !ok {
- assignmentsByTopic = map[string][]int{}
- groupAssignments[member.ID] = assignmentsByTopic
- }
- minIndex := memberIndex * partitionCount / memberCount
- maxIndex := (memberIndex + 1) * partitionCount / memberCount
- for partitionIndex, partition := range partitions {
- if partitionIndex >= minIndex && partitionIndex < maxIndex {
- assignmentsByTopic[topic] = append(assignmentsByTopic[topic], partition)
- }
- }
- }
- }
- return groupAssignments
- }
- type RoundRobinGroupBalancer struct{}
- func (r RoundRobinGroupBalancer) ProtocolName() string {
- return "roundrobin"
- }
- func (r RoundRobinGroupBalancer) UserData() ([]byte, error) {
- return nil, nil
- }
- func (r RoundRobinGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments {
- groupAssignments := GroupMemberAssignments{}
- membersByTopic := findMembersByTopic(members)
- for topic, members := range membersByTopic {
- partitionIDs := findPartitions(topic, topicPartitions)
- memberCount := len(members)
- for memberIndex, member := range members {
- assignmentsByTopic, ok := groupAssignments[member.ID]
- if !ok {
- assignmentsByTopic = map[string][]int{}
- groupAssignments[member.ID] = assignmentsByTopic
- }
- for partitionIndex, partition := range partitionIDs {
- if (partitionIndex % memberCount) == memberIndex {
- assignmentsByTopic[topic] = append(assignmentsByTopic[topic], partition)
- }
- }
- }
- }
- return groupAssignments
- }
- type RackAffinityGroupBalancer struct {
-
-
-
- Rack string
- }
- func (r RackAffinityGroupBalancer) ProtocolName() string {
- return "rack-affinity"
- }
- func (r RackAffinityGroupBalancer) AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments {
- membersByTopic := make(map[string][]GroupMember)
- for _, m := range members {
- for _, t := range m.Topics {
- membersByTopic[t] = append(membersByTopic[t], m)
- }
- }
- partitionsByTopic := make(map[string][]Partition)
- for _, p := range partitions {
- partitionsByTopic[p.Topic] = append(partitionsByTopic[p.Topic], p)
- }
- assignments := GroupMemberAssignments{}
- for topic := range membersByTopic {
- topicAssignments := r.assignTopic(membersByTopic[topic], partitionsByTopic[topic])
- for member, parts := range topicAssignments {
- memberAssignments, ok := assignments[member]
- if !ok {
- memberAssignments = make(map[string][]int)
- assignments[member] = memberAssignments
- }
- memberAssignments[topic] = parts
- }
- }
- return assignments
- }
- func (r RackAffinityGroupBalancer) UserData() ([]byte, error) {
- return []byte(r.Rack), nil
- }
- func (r *RackAffinityGroupBalancer) assignTopic(members []GroupMember, partitions []Partition) map[string][]int {
- zonedPartitions := make(map[string][]int)
- for _, part := range partitions {
- zone := part.Leader.Rack
- zonedPartitions[zone] = append(zonedPartitions[zone], part.ID)
- }
- zonedConsumers := make(map[string][]string)
- for _, member := range members {
- zone := string(member.UserData)
- zonedConsumers[zone] = append(zonedConsumers[zone], member.ID)
- }
- targetPerMember := len(partitions) / len(members)
- remainder := len(partitions) % len(members)
- assignments := make(map[string][]int)
-
-
-
- for zone, parts := range zonedPartitions {
- consumers := zonedConsumers[zone]
- if len(consumers) == 0 {
- continue
- }
-
-
- partsPerMember := len(parts) / len(consumers)
- if partsPerMember > targetPerMember {
- partsPerMember = targetPerMember
- }
- for _, consumer := range consumers {
- assignments[consumer] = append(assignments[consumer], parts[:partsPerMember]...)
- parts = parts[partsPerMember:]
- }
-
-
-
- leftover := len(parts)
- if partsPerMember == targetPerMember {
- if leftover > remainder {
- leftover = remainder
- }
- if leftover > len(consumers) {
- leftover = len(consumers)
- }
- remainder -= leftover
- }
-
-
-
- for i := 0; i < leftover; i++ {
- assignments[consumers[i]] = append(assignments[consumers[i]], parts[i])
- }
- parts = parts[leftover:]
- if len(parts) == 0 {
- delete(zonedPartitions, zone)
- } else {
- zonedPartitions[zone] = parts
- }
- }
-
- var remaining []int
- for _, partitions := range zonedPartitions {
- remaining = append(remaining, partitions...)
- }
- for _, member := range members {
- assigned := assignments[member.ID]
- delta := targetPerMember - len(assigned)
-
-
-
- if delta >= 0 && remainder > 0 {
- delta++
- remainder--
- }
- if delta > 0 {
- assignments[member.ID] = append(assigned, remaining[:delta]...)
- remaining = remaining[delta:]
- }
- }
- return assignments
- }
- func findPartitions(topic string, partitions []Partition) []int {
- var ids []int
- for _, partition := range partitions {
- if partition.Topic == topic {
- ids = append(ids, partition.ID)
- }
- }
- return ids
- }
- func findMembersByTopic(members []GroupMember) map[string][]GroupMember {
- membersByTopic := map[string][]GroupMember{}
- for _, member := range members {
- for _, topic := range member.Topics {
- membersByTopic[topic] = append(membersByTopic[topic], member)
- }
- }
-
-
-
-
-
-
-
-
-
-
-
-
- for _, members := range membersByTopic {
- sort.Slice(members, func(i, j int) bool {
- return members[i].ID < members[j].ID
- })
- }
- return membersByTopic
- }
- func findGroupBalancer(protocolName string, balancers []GroupBalancer) (GroupBalancer, bool) {
- for _, balancer := range balancers {
- if balancer.ProtocolName() == protocolName {
- return balancer, true
- }
- }
- return nil, false
- }
|