groupbalancer.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. package kafka
  2. import (
  3. "sort"
  4. )
  5. // GroupMember describes a single participant in a consumer group.
  6. type GroupMember struct {
  7. // ID is the unique ID for this member as taken from the JoinGroup response.
  8. ID string
  9. // Topics is a list of topics that this member is consuming.
  10. Topics []string
  11. // UserData contains any information that the GroupBalancer sent to the
  12. // consumer group coordinator.
  13. UserData []byte
  14. }
  15. // GroupMemberAssignments holds MemberID => topic => partitions
  16. type GroupMemberAssignments map[string]map[string][]int
  17. // GroupBalancer encapsulates the client side rebalancing logic
  18. type GroupBalancer interface {
  19. // ProtocolName of the GroupBalancer
  20. ProtocolName() string
  21. // UserData provides the GroupBalancer an opportunity to embed custom
  22. // UserData into the metadata.
  23. //
  24. // Will be used by JoinGroup to begin the consumer group handshake.
  25. //
  26. // See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-JoinGroupRequest
  27. UserData() ([]byte, error)
  28. // DefineMemberships returns which members will be consuming
  29. // which topic partitions
  30. AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments
  31. }
  32. // RangeGroupBalancer groups consumers by partition
  33. //
  34. // Example: 5 partitions, 2 consumers
  35. // C0: [0, 1, 2]
  36. // C1: [3, 4]
  37. //
  38. // Example: 6 partitions, 3 consumers
  39. // C0: [0, 1]
  40. // C1: [2, 3]
  41. // C2: [4, 5]
  42. //
  43. type RangeGroupBalancer struct{}
  44. func (r RangeGroupBalancer) ProtocolName() string {
  45. return "range"
  46. }
  47. func (r RangeGroupBalancer) UserData() ([]byte, error) {
  48. return nil, nil
  49. }
  50. func (r RangeGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments {
  51. groupAssignments := GroupMemberAssignments{}
  52. membersByTopic := findMembersByTopic(members)
  53. for topic, members := range membersByTopic {
  54. partitions := findPartitions(topic, topicPartitions)
  55. partitionCount := len(partitions)
  56. memberCount := len(members)
  57. for memberIndex, member := range members {
  58. assignmentsByTopic, ok := groupAssignments[member.ID]
  59. if !ok {
  60. assignmentsByTopic = map[string][]int{}
  61. groupAssignments[member.ID] = assignmentsByTopic
  62. }
  63. minIndex := memberIndex * partitionCount / memberCount
  64. maxIndex := (memberIndex + 1) * partitionCount / memberCount
  65. for partitionIndex, partition := range partitions {
  66. if partitionIndex >= minIndex && partitionIndex < maxIndex {
  67. assignmentsByTopic[topic] = append(assignmentsByTopic[topic], partition)
  68. }
  69. }
  70. }
  71. }
  72. return groupAssignments
  73. }
  74. // RoundrobinGroupBalancer divides partitions evenly among consumers
  75. //
  76. // Example: 5 partitions, 2 consumers
  77. // C0: [0, 2, 4]
  78. // C1: [1, 3]
  79. //
  80. // Example: 6 partitions, 3 consumers
  81. // C0: [0, 3]
  82. // C1: [1, 4]
  83. // C2: [2, 5]
  84. //
  85. type RoundRobinGroupBalancer struct{}
  86. func (r RoundRobinGroupBalancer) ProtocolName() string {
  87. return "roundrobin"
  88. }
  89. func (r RoundRobinGroupBalancer) UserData() ([]byte, error) {
  90. return nil, nil
  91. }
  92. func (r RoundRobinGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments {
  93. groupAssignments := GroupMemberAssignments{}
  94. membersByTopic := findMembersByTopic(members)
  95. for topic, members := range membersByTopic {
  96. partitionIDs := findPartitions(topic, topicPartitions)
  97. memberCount := len(members)
  98. for memberIndex, member := range members {
  99. assignmentsByTopic, ok := groupAssignments[member.ID]
  100. if !ok {
  101. assignmentsByTopic = map[string][]int{}
  102. groupAssignments[member.ID] = assignmentsByTopic
  103. }
  104. for partitionIndex, partition := range partitionIDs {
  105. if (partitionIndex % memberCount) == memberIndex {
  106. assignmentsByTopic[topic] = append(assignmentsByTopic[topic], partition)
  107. }
  108. }
  109. }
  110. }
  111. return groupAssignments
  112. }
  113. // RackAffinityGroupBalancer makes a best effort to pair up consumers with
  114. // partitions whose leader is in the same rack. This strategy can have
  115. // performance benefits by minimizing round trip latency between the consumer
  116. // and the broker. In environments where network traffic across racks incurs
  117. // charges (such as cross AZ data transfer in AWS), this strategy is also a cost
  118. // optimization measure because it keeps network traffic within the local rack
  119. // where possible.
  120. //
  121. // The primary objective is to spread partitions evenly across consumers with a
  122. // secondary focus on maximizing the number of partitions where the leader and
  123. // the consumer are in the same rack. For best affinity, it's recommended to
  124. // have a balanced spread of consumers and partition leaders across racks.
  125. //
  126. // This balancer requires Kafka version 0.10.0.0+ or later. Earlier versions do
  127. // not return the brokers' racks in the metadata request.
  128. type RackAffinityGroupBalancer struct {
  129. // Rack is the name of the rack where this consumer is running. It will be
  130. // communicated to the consumer group leader via the UserData so that
  131. // assignments can be made with affinity to the partition leader.
  132. Rack string
  133. }
  134. func (r RackAffinityGroupBalancer) ProtocolName() string {
  135. return "rack-affinity"
  136. }
  137. func (r RackAffinityGroupBalancer) AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments {
  138. membersByTopic := make(map[string][]GroupMember)
  139. for _, m := range members {
  140. for _, t := range m.Topics {
  141. membersByTopic[t] = append(membersByTopic[t], m)
  142. }
  143. }
  144. partitionsByTopic := make(map[string][]Partition)
  145. for _, p := range partitions {
  146. partitionsByTopic[p.Topic] = append(partitionsByTopic[p.Topic], p)
  147. }
  148. assignments := GroupMemberAssignments{}
  149. for topic := range membersByTopic {
  150. topicAssignments := r.assignTopic(membersByTopic[topic], partitionsByTopic[topic])
  151. for member, parts := range topicAssignments {
  152. memberAssignments, ok := assignments[member]
  153. if !ok {
  154. memberAssignments = make(map[string][]int)
  155. assignments[member] = memberAssignments
  156. }
  157. memberAssignments[topic] = parts
  158. }
  159. }
  160. return assignments
  161. }
  162. func (r RackAffinityGroupBalancer) UserData() ([]byte, error) {
  163. return []byte(r.Rack), nil
  164. }
  165. func (r *RackAffinityGroupBalancer) assignTopic(members []GroupMember, partitions []Partition) map[string][]int {
  166. zonedPartitions := make(map[string][]int)
  167. for _, part := range partitions {
  168. zone := part.Leader.Rack
  169. zonedPartitions[zone] = append(zonedPartitions[zone], part.ID)
  170. }
  171. zonedConsumers := make(map[string][]string)
  172. for _, member := range members {
  173. zone := string(member.UserData)
  174. zonedConsumers[zone] = append(zonedConsumers[zone], member.ID)
  175. }
  176. targetPerMember := len(partitions) / len(members)
  177. remainder := len(partitions) % len(members)
  178. assignments := make(map[string][]int)
  179. // assign as many as possible in zone. this will assign up to partsPerMember
  180. // to each consumer. it will also prefer to allocate remainder partitions
  181. // in zone if possible.
  182. for zone, parts := range zonedPartitions {
  183. consumers := zonedConsumers[zone]
  184. if len(consumers) == 0 {
  185. continue
  186. }
  187. // don't over-allocate. cap partition assignments at the calculated
  188. // target.
  189. partsPerMember := len(parts) / len(consumers)
  190. if partsPerMember > targetPerMember {
  191. partsPerMember = targetPerMember
  192. }
  193. for _, consumer := range consumers {
  194. assignments[consumer] = append(assignments[consumer], parts[:partsPerMember]...)
  195. parts = parts[partsPerMember:]
  196. }
  197. // if we had enough partitions for each consumer in this zone to hit its
  198. // target, attempt to use any leftover partitions to satisfy the total
  199. // remainder by adding at most 1 partition per consumer.
  200. leftover := len(parts)
  201. if partsPerMember == targetPerMember {
  202. if leftover > remainder {
  203. leftover = remainder
  204. }
  205. if leftover > len(consumers) {
  206. leftover = len(consumers)
  207. }
  208. remainder -= leftover
  209. }
  210. // this loop covers the case where we're assigning extra partitions or
  211. // if there weren't enough to satisfy the targetPerMember and the zoned
  212. // partitions didn't divide evenly.
  213. for i := 0; i < leftover; i++ {
  214. assignments[consumers[i]] = append(assignments[consumers[i]], parts[i])
  215. }
  216. parts = parts[leftover:]
  217. if len(parts) == 0 {
  218. delete(zonedPartitions, zone)
  219. } else {
  220. zonedPartitions[zone] = parts
  221. }
  222. }
  223. // assign out remainders regardless of zone.
  224. var remaining []int
  225. for _, partitions := range zonedPartitions {
  226. remaining = append(remaining, partitions...)
  227. }
  228. for _, member := range members {
  229. assigned := assignments[member.ID]
  230. delta := targetPerMember - len(assigned)
  231. // if it were possible to assign the remainder in zone, it's been taken
  232. // care of already. now we will portion out any remainder to a member
  233. // that can take it.
  234. if delta >= 0 && remainder > 0 {
  235. delta++
  236. remainder--
  237. }
  238. if delta > 0 {
  239. assignments[member.ID] = append(assigned, remaining[:delta]...)
  240. remaining = remaining[delta:]
  241. }
  242. }
  243. return assignments
  244. }
  245. // findPartitions extracts the partition ids associated with the topic from the
  246. // list of Partitions provided
  247. func findPartitions(topic string, partitions []Partition) []int {
  248. var ids []int
  249. for _, partition := range partitions {
  250. if partition.Topic == topic {
  251. ids = append(ids, partition.ID)
  252. }
  253. }
  254. return ids
  255. }
  256. // findMembersByTopic groups the memberGroupMetadata by topic
  257. func findMembersByTopic(members []GroupMember) map[string][]GroupMember {
  258. membersByTopic := map[string][]GroupMember{}
  259. for _, member := range members {
  260. for _, topic := range member.Topics {
  261. membersByTopic[topic] = append(membersByTopic[topic], member)
  262. }
  263. }
  264. // normalize ordering of members to enabling grouping across topics by partitions
  265. //
  266. // Want:
  267. // C0 [T0/P0, T1/P0]
  268. // C1 [T0/P1, T1/P1]
  269. //
  270. // Not:
  271. // C0 [T0/P0, T1/P1]
  272. // C1 [T0/P1, T1/P0]
  273. //
  274. // Even though the later is still round robin, the partitions are crossed
  275. //
  276. for _, members := range membersByTopic {
  277. sort.Slice(members, func(i, j int) bool {
  278. return members[i].ID < members[j].ID
  279. })
  280. }
  281. return membersByTopic
  282. }
  283. // findGroupBalancer returns the GroupBalancer with the specified protocolName
  284. // from the slice provided
  285. func findGroupBalancer(protocolName string, balancers []GroupBalancer) (GroupBalancer, bool) {
  286. for _, balancer := range balancers {
  287. if balancer.ProtocolName() == protocolName {
  288. return balancer, true
  289. }
  290. }
  291. return nil, false
  292. }