consumergroup.go 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197
  1. package kafka
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "math"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. // ErrGroupClosed is returned by ConsumerGroup.Next when the group has already
  15. // been closed.
  16. var ErrGroupClosed = errors.New("consumer group is closed")
  17. // ErrGenerationEnded is returned by the context.Context issued by the
  18. // Generation's Start function when the context has been closed.
  19. var ErrGenerationEnded = errors.New("consumer group generation has ended")
  20. const (
  21. // defaultProtocolType holds the default protocol type documented in the
  22. // kafka protocol
  23. //
  24. // See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-GroupMembershipAPI
  25. defaultProtocolType = "consumer"
  26. // defaultHeartbeatInterval contains the default time between heartbeats. If
  27. // the coordinator does not receive a heartbeat within the session timeout interval,
  28. // the consumer will be considered dead and the coordinator will rebalance the
  29. // group.
  30. //
  31. // As a rule, the heartbeat interval should be no greater than 1/3 the session timeout
  32. defaultHeartbeatInterval = 3 * time.Second
  33. // defaultSessionTimeout contains the default interval the coordinator will wait
  34. // for a heartbeat before marking a consumer as dead
  35. defaultSessionTimeout = 30 * time.Second
  36. // defaultRebalanceTimeout contains the amount of time the coordinator will wait
  37. // for consumers to issue a join group once a rebalance has been requested
  38. defaultRebalanceTimeout = 30 * time.Second
  39. // defaultJoinGroupBackoff is the amount of time to wait after a failed
  40. // consumer group generation before attempting to re-join.
  41. defaultJoinGroupBackoff = 5 * time.Second
  42. // defaultRetentionTime holds the length of time a the consumer group will be
  43. // saved by kafka. This value tells the broker to use its configured value.
  44. defaultRetentionTime = -1 * time.Millisecond
  45. // defaultPartitionWatchTime contains the amount of time the kafka-go will wait to
  46. // query the brokers looking for partition changes.
  47. defaultPartitionWatchTime = 5 * time.Second
  48. // defaultTimeout is the deadline to set when interacting with the
  49. // consumer group coordinator.
  50. defaultTimeout = 5 * time.Second
  51. )
  52. // ConsumerGroupConfig is a configuration object used to create new instances of
  53. // ConsumerGroup.
  54. type ConsumerGroupConfig struct {
  55. // ID is the consumer group ID. It must not be empty.
  56. ID string
  57. // The list of broker addresses used to connect to the kafka cluster. It
  58. // must not be empty.
  59. Brokers []string
  60. // An dialer used to open connections to the kafka server. This field is
  61. // optional, if nil, the default dialer is used instead.
  62. Dialer *Dialer
  63. // Topics is the list of topics that will be consumed by this group. It
  64. // will usually have a single value, but it is permitted to have multiple
  65. // for more complex use cases.
  66. Topics []string
  67. // GroupBalancers is the priority-ordered list of client-side consumer group
  68. // balancing strategies that will be offered to the coordinator. The first
  69. // strategy that all group members support will be chosen by the leader.
  70. //
  71. // Default: [Range, RoundRobin]
  72. GroupBalancers []GroupBalancer
  73. // HeartbeatInterval sets the optional frequency at which the reader sends the consumer
  74. // group heartbeat update.
  75. //
  76. // Default: 3s
  77. HeartbeatInterval time.Duration
  78. // PartitionWatchInterval indicates how often a reader checks for partition changes.
  79. // If a reader sees a partition change (such as a partition add) it will rebalance the group
  80. // picking up new partitions.
  81. //
  82. // Default: 5s
  83. PartitionWatchInterval time.Duration
  84. // WatchForPartitionChanges is used to inform kafka-go that a consumer group should be
  85. // polling the brokers and rebalancing if any partition changes happen to the topic.
  86. WatchPartitionChanges bool
  87. // SessionTimeout optionally sets the length of time that may pass without a heartbeat
  88. // before the coordinator considers the consumer dead and initiates a rebalance.
  89. //
  90. // Default: 30s
  91. SessionTimeout time.Duration
  92. // RebalanceTimeout optionally sets the length of time the coordinator will wait
  93. // for members to join as part of a rebalance. For kafka servers under higher
  94. // load, it may be useful to set this value higher.
  95. //
  96. // Default: 30s
  97. RebalanceTimeout time.Duration
  98. // JoinGroupBackoff optionally sets the length of time to wait before re-joining
  99. // the consumer group after an error.
  100. //
  101. // Default: 5s
  102. JoinGroupBackoff time.Duration
  103. // RetentionTime optionally sets the length of time the consumer group will
  104. // be saved by the broker. -1 will disable the setting and leave the
  105. // retention up to the broker's offsets.retention.minutes property. By
  106. // default, that setting is 1 day for kafka < 2.0 and 7 days for kafka >=
  107. // 2.0.
  108. //
  109. // Default: -1
  110. RetentionTime time.Duration
  111. // StartOffset determines from whence the consumer group should begin
  112. // consuming when it finds a partition without a committed offset. If
  113. // non-zero, it must be set to one of FirstOffset or LastOffset.
  114. //
  115. // Default: FirstOffset
  116. StartOffset int64
  117. // If not nil, specifies a logger used to report internal changes within the
  118. // reader.
  119. Logger Logger
  120. // ErrorLogger is the logger used to report errors. If nil, the reader falls
  121. // back to using Logger instead.
  122. ErrorLogger Logger
  123. // Timeout is the network timeout used when communicating with the consumer
  124. // group coordinator. This value should not be too small since errors
  125. // communicating with the broker will generally cause a consumer group
  126. // rebalance, and it's undesirable that a transient network error intoduce
  127. // that overhead. Similarly, it should not be too large or the consumer
  128. // group may be slow to respond to the coordinator failing over to another
  129. // broker.
  130. //
  131. // Default: 5s
  132. Timeout time.Duration
  133. // connect is a function for dialing the coordinator. This is provided for
  134. // unit testing to mock broker connections.
  135. connect func(dialer *Dialer, brokers ...string) (coordinator, error)
  136. }
  137. // Validate method validates ConsumerGroupConfig properties and sets relevant
  138. // defaults.
  139. func (config *ConsumerGroupConfig) Validate() error {
  140. if len(config.Brokers) == 0 {
  141. return errors.New("cannot create a consumer group with an empty list of broker addresses")
  142. }
  143. if len(config.Topics) == 0 {
  144. return errors.New("cannot create a consumer group without a topic")
  145. }
  146. if config.ID == "" {
  147. return errors.New("cannot create a consumer group without an ID")
  148. }
  149. if config.Dialer == nil {
  150. config.Dialer = DefaultDialer
  151. }
  152. if len(config.GroupBalancers) == 0 {
  153. config.GroupBalancers = []GroupBalancer{
  154. RangeGroupBalancer{},
  155. RoundRobinGroupBalancer{},
  156. }
  157. }
  158. if config.HeartbeatInterval == 0 {
  159. config.HeartbeatInterval = defaultHeartbeatInterval
  160. }
  161. if config.SessionTimeout == 0 {
  162. config.SessionTimeout = defaultSessionTimeout
  163. }
  164. if config.PartitionWatchInterval == 0 {
  165. config.PartitionWatchInterval = defaultPartitionWatchTime
  166. }
  167. if config.RebalanceTimeout == 0 {
  168. config.RebalanceTimeout = defaultRebalanceTimeout
  169. }
  170. if config.JoinGroupBackoff == 0 {
  171. config.JoinGroupBackoff = defaultJoinGroupBackoff
  172. }
  173. if config.RetentionTime == 0 {
  174. config.RetentionTime = defaultRetentionTime
  175. }
  176. if config.HeartbeatInterval < 0 || (config.HeartbeatInterval/time.Millisecond) >= math.MaxInt32 {
  177. return errors.New(fmt.Sprintf("HeartbeatInterval out of bounds: %d", config.HeartbeatInterval))
  178. }
  179. if config.SessionTimeout < 0 || (config.SessionTimeout/time.Millisecond) >= math.MaxInt32 {
  180. return errors.New(fmt.Sprintf("SessionTimeout out of bounds: %d", config.SessionTimeout))
  181. }
  182. if config.RebalanceTimeout < 0 || (config.RebalanceTimeout/time.Millisecond) >= math.MaxInt32 {
  183. return errors.New(fmt.Sprintf("RebalanceTimeout out of bounds: %d", config.RebalanceTimeout))
  184. }
  185. if config.JoinGroupBackoff < 0 || (config.JoinGroupBackoff/time.Millisecond) >= math.MaxInt32 {
  186. return errors.New(fmt.Sprintf("JoinGroupBackoff out of bounds: %d", config.JoinGroupBackoff))
  187. }
  188. if config.RetentionTime < 0 && config.RetentionTime != defaultRetentionTime {
  189. return errors.New(fmt.Sprintf("RetentionTime out of bounds: %d", config.RetentionTime))
  190. }
  191. if config.PartitionWatchInterval < 0 || (config.PartitionWatchInterval/time.Millisecond) >= math.MaxInt32 {
  192. return errors.New(fmt.Sprintf("PartitionWachInterval out of bounds %d", config.PartitionWatchInterval))
  193. }
  194. if config.StartOffset == 0 {
  195. config.StartOffset = FirstOffset
  196. }
  197. if config.StartOffset != FirstOffset && config.StartOffset != LastOffset {
  198. return errors.New(fmt.Sprintf("StartOffset is not valid %d", config.StartOffset))
  199. }
  200. if config.Timeout == 0 {
  201. config.Timeout = defaultTimeout
  202. }
  203. if config.connect == nil {
  204. config.connect = makeConnect(*config)
  205. }
  206. return nil
  207. }
  208. // PartitionAssignment represents the starting state of a partition that has
  209. // been assigned to a consumer.
  210. type PartitionAssignment struct {
  211. // ID is the partition ID.
  212. ID int
  213. // Offset is the initial offset at which this assignment begins. It will
  214. // either be an absolute offset if one has previously been committed for
  215. // the consumer group or a relative offset such as FirstOffset when this
  216. // is the first time the partition have been assigned to a member of the
  217. // group.
  218. Offset int64
  219. }
  220. // genCtx adapts the done channel of the generation to a context.Context. This
  221. // is used by Generation.Start so that we can pass a context to go routines
  222. // instead of passing around channels.
  223. type genCtx struct {
  224. gen *Generation
  225. }
  226. func (c genCtx) Done() <-chan struct{} {
  227. return c.gen.done
  228. }
  229. func (c genCtx) Err() error {
  230. select {
  231. case <-c.gen.done:
  232. return ErrGenerationEnded
  233. default:
  234. return nil
  235. }
  236. }
  237. func (c genCtx) Deadline() (time.Time, bool) {
  238. return time.Time{}, false
  239. }
  240. func (c genCtx) Value(interface{}) interface{} {
  241. return nil
  242. }
  243. // Generation represents a single consumer group generation. The generation
  244. // carries the topic+partition assignments for the given. It also provides
  245. // facilities for committing offsets and for running functions whose lifecycles
  246. // are bound to the generation.
  247. type Generation struct {
  248. // ID is the generation ID as assigned by the consumer group coordinator.
  249. ID int32
  250. // GroupID is the name of the consumer group.
  251. GroupID string
  252. // MemberID is the ID assigned to this consumer by the consumer group
  253. // coordinator.
  254. MemberID string
  255. // Assignments is the initial state of this Generation. The partition
  256. // assignments are grouped by topic.
  257. Assignments map[string][]PartitionAssignment
  258. conn coordinator
  259. once sync.Once
  260. done chan struct{}
  261. wg sync.WaitGroup
  262. retentionMillis int64
  263. log func(func(Logger))
  264. logError func(func(Logger))
  265. }
  266. // close stops the generation and waits for all functions launched via Start to
  267. // terminate.
  268. func (g *Generation) close() {
  269. g.once.Do(func() {
  270. close(g.done)
  271. })
  272. g.wg.Wait()
  273. }
  274. // Start launches the provided function in a go routine and adds accounting such
  275. // that when the function exits, it stops the current generation (if not
  276. // already in the process of doing so).
  277. //
  278. // The provided function MUST support cancellation via the ctx argument and exit
  279. // in a timely manner once the ctx is complete. When the context is closed, the
  280. // context's Error() function will return ErrGenerationEnded.
  281. //
  282. // When closing out a generation, the consumer group will wait for all functions
  283. // launched by Start to exit before the group can move on and join the next
  284. // generation. If the function does not exit promptly, it will stop forward
  285. // progress for this consumer and potentially cause consumer group membership
  286. // churn.
  287. func (g *Generation) Start(fn func(ctx context.Context)) {
  288. g.wg.Add(1)
  289. go func() {
  290. fn(genCtx{g})
  291. // shut down the generation as soon as one function exits. this is
  292. // different from close() in that it doesn't wait on the wg.
  293. g.once.Do(func() {
  294. close(g.done)
  295. })
  296. g.wg.Done()
  297. }()
  298. }
  299. // CommitOffsets commits the provided topic+partition+offset combos to the
  300. // consumer group coordinator. This can be used to reset the consumer to
  301. // explicit offsets.
  302. func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error {
  303. if len(offsets) == 0 {
  304. return nil
  305. }
  306. topics := make([]offsetCommitRequestV2Topic, 0, len(offsets))
  307. for topic, partitions := range offsets {
  308. t := offsetCommitRequestV2Topic{Topic: topic}
  309. for partition, offset := range partitions {
  310. t.Partitions = append(t.Partitions, offsetCommitRequestV2Partition{
  311. Partition: int32(partition),
  312. Offset: offset,
  313. })
  314. }
  315. topics = append(topics, t)
  316. }
  317. request := offsetCommitRequestV2{
  318. GroupID: g.GroupID,
  319. GenerationID: g.ID,
  320. MemberID: g.MemberID,
  321. RetentionTime: g.retentionMillis,
  322. Topics: topics,
  323. }
  324. _, err := g.conn.offsetCommit(request)
  325. if err == nil {
  326. // if logging is enabled, print out the partitions that were committed.
  327. g.log(func(l Logger) {
  328. var report []string
  329. for _, t := range request.Topics {
  330. report = append(report, fmt.Sprintf("\ttopic: %s", t.Topic))
  331. for _, p := range t.Partitions {
  332. report = append(report, fmt.Sprintf("\t\tpartition %d: %d", p.Partition, p.Offset))
  333. }
  334. }
  335. l.Printf("committed offsets for group %s: \n%s", g.GroupID, strings.Join(report, "\n"))
  336. })
  337. }
  338. return err
  339. }
  340. // heartbeatLoop checks in with the consumer group coordinator at the provided
  341. // interval. It exits if it ever encounters an error, which would signal the
  342. // end of the generation.
  343. func (g *Generation) heartbeatLoop(interval time.Duration) {
  344. g.Start(func(ctx context.Context) {
  345. g.log(func(l Logger) {
  346. l.Printf("started heartbeat for group, %v [%v]", g.GroupID, interval)
  347. })
  348. defer g.log(func(l Logger) {
  349. l.Printf("stopped heartbeat for group %s\n", g.GroupID)
  350. })
  351. ticker := time.NewTicker(interval)
  352. defer ticker.Stop()
  353. for {
  354. select {
  355. case <-ctx.Done():
  356. return
  357. case <-ticker.C:
  358. _, err := g.conn.heartbeat(heartbeatRequestV0{
  359. GroupID: g.GroupID,
  360. GenerationID: g.ID,
  361. MemberID: g.MemberID,
  362. })
  363. if err != nil {
  364. return
  365. }
  366. }
  367. }
  368. })
  369. }
  370. // partitionWatcher queries kafka and watches for partition changes, triggering
  371. // a rebalance if changes are found. Similar to heartbeat it's okay to return on
  372. // error here as if you are unable to ask a broker for basic metadata you're in
  373. // a bad spot and should rebalance. Commonly you will see an error here if there
  374. // is a problem with the connection to the coordinator and a rebalance will
  375. // establish a new connection to the coordinator.
  376. func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
  377. g.Start(func(ctx context.Context) {
  378. g.log(func(l Logger) {
  379. l.Printf("started partition watcher for group, %v, topic %v [%v]", g.GroupID, topic, interval)
  380. })
  381. defer g.log(func(l Logger) {
  382. l.Printf("stopped partition watcher for group, %v, topic %v", g.GroupID, topic)
  383. })
  384. ticker := time.NewTicker(interval)
  385. defer ticker.Stop()
  386. ops, err := g.conn.readPartitions(topic)
  387. if err != nil {
  388. g.logError(func(l Logger) {
  389. l.Printf("Problem getting partitions during startup, %v\n, Returning and setting up nextGeneration", err)
  390. })
  391. return
  392. }
  393. oParts := len(ops)
  394. for {
  395. select {
  396. case <-ctx.Done():
  397. return
  398. case <-ticker.C:
  399. ops, err := g.conn.readPartitions(topic)
  400. switch err {
  401. case nil, UnknownTopicOrPartition:
  402. if len(ops) != oParts {
  403. g.log(func(l Logger) {
  404. l.Printf("Partition changes found, reblancing group: %v.", g.GroupID)
  405. })
  406. return
  407. }
  408. default:
  409. g.logError(func(l Logger) {
  410. l.Printf("Problem getting partitions while checking for changes, %v", err)
  411. })
  412. if _, ok := err.(Error); ok {
  413. continue
  414. }
  415. // other errors imply that we lost the connection to the coordinator, so we
  416. // should abort and reconnect.
  417. return
  418. }
  419. }
  420. }
  421. })
  422. }
  423. // coordinator is a subset of the functionality in Conn in order to facilitate
  424. // testing the consumer group...especially for error conditions that are
  425. // difficult to instigate with a live broker running in docker.
  426. type coordinator interface {
  427. io.Closer
  428. findCoordinator(findCoordinatorRequestV0) (findCoordinatorResponseV0, error)
  429. joinGroup(joinGroupRequestV1) (joinGroupResponseV1, error)
  430. syncGroup(syncGroupRequestV0) (syncGroupResponseV0, error)
  431. leaveGroup(leaveGroupRequestV0) (leaveGroupResponseV0, error)
  432. heartbeat(heartbeatRequestV0) (heartbeatResponseV0, error)
  433. offsetFetch(offsetFetchRequestV1) (offsetFetchResponseV1, error)
  434. offsetCommit(offsetCommitRequestV2) (offsetCommitResponseV2, error)
  435. readPartitions(...string) ([]Partition, error)
  436. }
  437. // timeoutCoordinator wraps the Conn to ensure that every operation has a
  438. // deadline. Otherwise, it would be possible for requests to block indefinitely
  439. // if the remote server never responds. There are many spots where the consumer
  440. // group needs to interact with the broker, so it feels less error prone to
  441. // factor all of the deadline management into this shared location as opposed to
  442. // peppering it all through where the code actually interacts with the broker.
  443. type timeoutCoordinator struct {
  444. timeout time.Duration
  445. sessionTimeout time.Duration
  446. rebalanceTimeout time.Duration
  447. conn *Conn
  448. }
  449. func (t *timeoutCoordinator) Close() error {
  450. return t.conn.Close()
  451. }
  452. func (t *timeoutCoordinator) findCoordinator(req findCoordinatorRequestV0) (findCoordinatorResponseV0, error) {
  453. if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
  454. return findCoordinatorResponseV0{}, err
  455. }
  456. return t.conn.findCoordinator(req)
  457. }
  458. func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponseV1, error) {
  459. // in the case of join group, the consumer group coordinator may wait up
  460. // to rebalance timeout in order to wait for all members to join.
  461. if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.rebalanceTimeout)); err != nil {
  462. return joinGroupResponseV1{}, err
  463. }
  464. return t.conn.joinGroup(req)
  465. }
  466. func (t *timeoutCoordinator) syncGroup(req syncGroupRequestV0) (syncGroupResponseV0, error) {
  467. // in the case of sync group, the consumer group leader is given up to
  468. // the session timeout to respond before the coordinator will give up.
  469. if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.sessionTimeout)); err != nil {
  470. return syncGroupResponseV0{}, err
  471. }
  472. return t.conn.syncGroup(req)
  473. }
  474. func (t *timeoutCoordinator) leaveGroup(req leaveGroupRequestV0) (leaveGroupResponseV0, error) {
  475. if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
  476. return leaveGroupResponseV0{}, err
  477. }
  478. return t.conn.leaveGroup(req)
  479. }
  480. func (t *timeoutCoordinator) heartbeat(req heartbeatRequestV0) (heartbeatResponseV0, error) {
  481. if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
  482. return heartbeatResponseV0{}, err
  483. }
  484. return t.conn.heartbeat(req)
  485. }
  486. func (t *timeoutCoordinator) offsetFetch(req offsetFetchRequestV1) (offsetFetchResponseV1, error) {
  487. if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
  488. return offsetFetchResponseV1{}, err
  489. }
  490. return t.conn.offsetFetch(req)
  491. }
  492. func (t *timeoutCoordinator) offsetCommit(req offsetCommitRequestV2) (offsetCommitResponseV2, error) {
  493. if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
  494. return offsetCommitResponseV2{}, err
  495. }
  496. return t.conn.offsetCommit(req)
  497. }
  498. func (t *timeoutCoordinator) readPartitions(topics ...string) ([]Partition, error) {
  499. if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
  500. return nil, err
  501. }
  502. return t.conn.ReadPartitions(topics...)
  503. }
  504. // NewConsumerGroup creates a new ConsumerGroup. It returns an error if the
  505. // provided configuration is invalid. It does not attempt to connect to the
  506. // Kafka cluster. That happens asynchronously, and any errors will be reported
  507. // by Next.
  508. func NewConsumerGroup(config ConsumerGroupConfig) (*ConsumerGroup, error) {
  509. if err := config.Validate(); err != nil {
  510. return nil, err
  511. }
  512. cg := &ConsumerGroup{
  513. config: config,
  514. next: make(chan *Generation),
  515. errs: make(chan error),
  516. done: make(chan struct{}),
  517. }
  518. cg.wg.Add(1)
  519. go func() {
  520. cg.run()
  521. cg.wg.Done()
  522. }()
  523. return cg, nil
  524. }
  525. // ConsumerGroup models a Kafka consumer group. A caller doesn't interact with
  526. // the group directly. Rather, they interact with a Generation. Every time a
  527. // member enters or exits the group, it results in a new Generation. The
  528. // Generation is where partition assignments and offset management occur.
  529. // Callers will use Next to get a handle to the Generation.
  530. type ConsumerGroup struct {
  531. config ConsumerGroupConfig
  532. next chan *Generation
  533. errs chan error
  534. closeOnce sync.Once
  535. wg sync.WaitGroup
  536. done chan struct{}
  537. }
  538. // Close terminates the current generation by causing this member to leave and
  539. // releases all local resources used to participate in the consumer group.
  540. // Close will also end the current generation if it is still active.
  541. func (cg *ConsumerGroup) Close() error {
  542. cg.closeOnce.Do(func() {
  543. close(cg.done)
  544. })
  545. cg.wg.Wait()
  546. return nil
  547. }
  548. // Next waits for the next consumer group generation. There will never be two
  549. // active generations. Next will never return a new generation until the
  550. // previous one has completed.
  551. //
  552. // If there are errors setting up the next generation, they will be surfaced
  553. // here.
  554. //
  555. // If the ConsumerGroup has been closed, then Next will return ErrGroupClosed.
  556. func (cg *ConsumerGroup) Next(ctx context.Context) (*Generation, error) {
  557. select {
  558. case <-ctx.Done():
  559. return nil, ctx.Err()
  560. case <-cg.done:
  561. return nil, ErrGroupClosed
  562. case err := <-cg.errs:
  563. return nil, err
  564. case next := <-cg.next:
  565. return next, nil
  566. }
  567. }
  568. func (cg *ConsumerGroup) run() {
  569. // the memberID is the only piece of information that is maintained across
  570. // generations. it starts empty and will be assigned on the first nextGeneration
  571. // when the joinGroup request is processed. it may change again later if
  572. // the CG coordinator fails over or if the member is evicted. otherwise, it
  573. // will be constant for the lifetime of this group.
  574. var memberID string
  575. var err error
  576. for {
  577. memberID, err = cg.nextGeneration(memberID)
  578. // backoff will be set if this go routine should sleep before continuing
  579. // to the next generation. it will be non-nil in the case of an error
  580. // joining or syncing the group.
  581. var backoff <-chan time.Time
  582. switch err {
  583. case nil:
  584. // no error...the previous generation finished normally.
  585. continue
  586. case ErrGroupClosed:
  587. // the CG has been closed...leave the group and exit loop.
  588. _ = cg.leaveGroup(memberID)
  589. return
  590. case RebalanceInProgress:
  591. // in case of a RebalanceInProgress, don't leave the group or
  592. // change the member ID, but report the error. the next attempt
  593. // to join the group will then be subject to the rebalance
  594. // timeout, so the broker will be responsible for throttling
  595. // this loop.
  596. default:
  597. // leave the group and report the error if we had gotten far
  598. // enough so as to have a member ID. also clear the member id
  599. // so we don't attempt to use it again. in order to avoid
  600. // a tight error loop, backoff before the next attempt to join
  601. // the group.
  602. _ = cg.leaveGroup(memberID)
  603. memberID = ""
  604. backoff = time.After(cg.config.JoinGroupBackoff)
  605. }
  606. // ensure that we exit cleanly in case the CG is done and no one is
  607. // waiting to receive on the unbuffered error channel.
  608. select {
  609. case <-cg.done:
  610. return
  611. case cg.errs <- err:
  612. }
  613. // backoff if needed, being sure to exit cleanly if the CG is done.
  614. if backoff != nil {
  615. select {
  616. case <-cg.done:
  617. // exit cleanly if the group is closed.
  618. return
  619. case <-backoff:
  620. }
  621. }
  622. }
  623. }
  624. func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
  625. // get a new connection to the coordinator on each loop. the previous
  626. // generation could have exited due to losing the connection, so this
  627. // ensures that we always have a clean starting point. it means we will
  628. // re-connect in certain cases, but that shouldn't be an issue given that
  629. // rebalances are relatively infrequent under normal operating
  630. // conditions.
  631. conn, err := cg.coordinator()
  632. if err != nil {
  633. cg.withErrorLogger(func(log Logger) {
  634. log.Printf("Unable to establish connection to consumer group coordinator for group %s: %v", cg.config.ID, err)
  635. })
  636. return memberID, err // a prior memberID may still be valid, so don't return ""
  637. }
  638. defer conn.Close()
  639. var generationID int32
  640. var groupAssignments GroupMemberAssignments
  641. var assignments map[string][]int32
  642. // join group. this will join the group and prepare assignments if our
  643. // consumer is elected leader. it may also change or assign the member ID.
  644. memberID, generationID, groupAssignments, err = cg.joinGroup(conn, memberID)
  645. if err != nil {
  646. cg.withErrorLogger(func(log Logger) {
  647. log.Printf("Failed to join group %s: %v", cg.config.ID, err)
  648. })
  649. return memberID, err
  650. }
  651. cg.withLogger(func(log Logger) {
  652. log.Printf("Joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID)
  653. })
  654. // sync group
  655. assignments, err = cg.syncGroup(conn, memberID, generationID, groupAssignments)
  656. if err != nil {
  657. cg.withErrorLogger(func(log Logger) {
  658. log.Printf("Failed to sync group %s: %v", cg.config.ID, err)
  659. })
  660. return memberID, err
  661. }
  662. // fetch initial offsets.
  663. var offsets map[string]map[int]int64
  664. offsets, err = cg.fetchOffsets(conn, assignments)
  665. if err != nil {
  666. cg.withErrorLogger(func(log Logger) {
  667. log.Printf("Failed to fetch offsets for group %s: %v", cg.config.ID, err)
  668. })
  669. return memberID, err
  670. }
  671. // create the generation.
  672. gen := Generation{
  673. ID: generationID,
  674. GroupID: cg.config.ID,
  675. MemberID: memberID,
  676. Assignments: cg.makeAssignments(assignments, offsets),
  677. conn: conn,
  678. done: make(chan struct{}),
  679. retentionMillis: int64(cg.config.RetentionTime / time.Millisecond),
  680. log: cg.withLogger,
  681. logError: cg.withErrorLogger,
  682. }
  683. // spawn all of the go routines required to facilitate this generation. if
  684. // any of these functions exit, then the generation is determined to be
  685. // complete.
  686. gen.heartbeatLoop(cg.config.HeartbeatInterval)
  687. if cg.config.WatchPartitionChanges {
  688. for _, topic := range cg.config.Topics {
  689. gen.partitionWatcher(cg.config.PartitionWatchInterval, topic)
  690. }
  691. }
  692. // make this generation available for retrieval. if the CG is closed before
  693. // we can send it on the channel, exit. that case is required b/c the next
  694. // channel is unbuffered. if the caller to Next has already bailed because
  695. // it's own teardown logic has been invoked, this would deadlock otherwise.
  696. select {
  697. case <-cg.done:
  698. gen.close()
  699. return memberID, ErrGroupClosed // ErrGroupClosed will trigger leave logic.
  700. case cg.next <- &gen:
  701. }
  702. // wait for generation to complete. if the CG is closed before the
  703. // generation is finished, exit and leave the group.
  704. select {
  705. case <-cg.done:
  706. gen.close()
  707. return memberID, ErrGroupClosed // ErrGroupClosed will trigger leave logic.
  708. case <-gen.done:
  709. // time for next generation! make sure all the current go routines exit
  710. // before continuing onward.
  711. gen.close()
  712. return memberID, nil
  713. }
  714. }
  715. // connect returns a connection to ANY broker
  716. func makeConnect(config ConsumerGroupConfig) func(dialer *Dialer, brokers ...string) (coordinator, error) {
  717. return func(dialer *Dialer, brokers ...string) (coordinator, error) {
  718. var err error
  719. for _, broker := range brokers {
  720. var conn *Conn
  721. if conn, err = dialer.Dial("tcp", broker); err == nil {
  722. return &timeoutCoordinator{
  723. conn: conn,
  724. timeout: config.Timeout,
  725. sessionTimeout: config.SessionTimeout,
  726. rebalanceTimeout: config.RebalanceTimeout,
  727. }, nil
  728. }
  729. }
  730. return nil, err // err will be non-nil
  731. }
  732. }
  733. // coordinator establishes a connection to the coordinator for this consumer
  734. // group.
  735. func (cg *ConsumerGroup) coordinator() (coordinator, error) {
  736. // NOTE : could try to cache the coordinator to avoid the double connect
  737. // here. since consumer group balances happen infrequently and are
  738. // an expensive operation, we're not currently optimizing that case
  739. // in order to keep the code simpler.
  740. conn, err := cg.config.connect(cg.config.Dialer, cg.config.Brokers...)
  741. if err != nil {
  742. return nil, err
  743. }
  744. defer conn.Close()
  745. out, err := conn.findCoordinator(findCoordinatorRequestV0{
  746. CoordinatorKey: cg.config.ID,
  747. })
  748. if err == nil && out.ErrorCode != 0 {
  749. err = Error(out.ErrorCode)
  750. }
  751. if err != nil {
  752. return nil, err
  753. }
  754. address := fmt.Sprintf("%v:%v", out.Coordinator.Host, out.Coordinator.Port)
  755. return cg.config.connect(cg.config.Dialer, address)
  756. }
  757. // joinGroup attempts to join the reader to the consumer group.
  758. // Returns GroupMemberAssignments is this Reader was selected as
  759. // the leader. Otherwise, GroupMemberAssignments will be nil.
  760. //
  761. // Possible kafka error codes returned:
  762. // * GroupLoadInProgress:
  763. // * GroupCoordinatorNotAvailable:
  764. // * NotCoordinatorForGroup:
  765. // * InconsistentGroupProtocol:
  766. // * InvalidSessionTimeout:
  767. // * GroupAuthorizationFailed:
  768. func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
  769. request, err := cg.makeJoinGroupRequestV1(memberID)
  770. if err != nil {
  771. return "", 0, nil, err
  772. }
  773. response, err := conn.joinGroup(request)
  774. if err == nil && response.ErrorCode != 0 {
  775. err = Error(response.ErrorCode)
  776. }
  777. if err != nil {
  778. return "", 0, nil, err
  779. }
  780. memberID = response.MemberID
  781. generationID := response.GenerationID
  782. cg.withLogger(func(l Logger) {
  783. l.Printf("joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID)
  784. })
  785. var assignments GroupMemberAssignments
  786. if iAmLeader := response.MemberID == response.LeaderID; iAmLeader {
  787. v, err := cg.assignTopicPartitions(conn, response)
  788. if err != nil {
  789. return memberID, 0, nil, err
  790. }
  791. assignments = v
  792. cg.withLogger(func(l Logger) {
  793. for memberID, assignment := range assignments {
  794. for topic, partitions := range assignment {
  795. l.Printf("assigned member/topic/partitions %v/%v/%v", memberID, topic, partitions)
  796. }
  797. }
  798. })
  799. }
  800. cg.withLogger(func(l Logger) {
  801. l.Printf("joinGroup succeeded for response, %v. generationID=%v, memberID=%v", cg.config.ID, response.GenerationID, response.MemberID)
  802. })
  803. return memberID, generationID, assignments, nil
  804. }
  805. // makeJoinGroupRequestV1 handles the logic of constructing a joinGroup
  806. // request
  807. func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupRequestV1, error) {
  808. request := joinGroupRequestV1{
  809. GroupID: cg.config.ID,
  810. MemberID: memberID,
  811. SessionTimeout: int32(cg.config.SessionTimeout / time.Millisecond),
  812. RebalanceTimeout: int32(cg.config.RebalanceTimeout / time.Millisecond),
  813. ProtocolType: defaultProtocolType,
  814. }
  815. for _, balancer := range cg.config.GroupBalancers {
  816. userData, err := balancer.UserData()
  817. if err != nil {
  818. return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %v", balancer.ProtocolName(), err)
  819. }
  820. request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{
  821. ProtocolName: balancer.ProtocolName(),
  822. ProtocolMetadata: groupMetadata{
  823. Version: 1,
  824. Topics: cg.config.Topics,
  825. UserData: userData,
  826. }.bytes(),
  827. })
  828. }
  829. return request, nil
  830. }
  831. // assignTopicPartitions uses the selected GroupBalancer to assign members to
  832. // their various partitions
  833. func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponseV1) (GroupMemberAssignments, error) {
  834. cg.withLogger(func(l Logger) {
  835. l.Printf("selected as leader for group, %s\n", cg.config.ID)
  836. })
  837. balancer, ok := findGroupBalancer(group.GroupProtocol, cg.config.GroupBalancers)
  838. if !ok {
  839. // NOTE : this shouldn't happen in practice...the broker should not
  840. // return successfully from joinGroup unless all members support
  841. // at least one common protocol.
  842. return nil, fmt.Errorf("unable to find selected balancer, %v, for group, %v", group.GroupProtocol, cg.config.ID)
  843. }
  844. members, err := cg.makeMemberProtocolMetadata(group.Members)
  845. if err != nil {
  846. return nil, err
  847. }
  848. topics := extractTopics(members)
  849. partitions, err := conn.readPartitions(topics...)
  850. // it's not a failure if the topic doesn't exist yet. it results in no
  851. // assignments for the topic. this matches the behavior of the official
  852. // clients: java, python, and librdkafka.
  853. // a topic watcher can trigger a rebalance when the topic comes into being.
  854. if err != nil && err != UnknownTopicOrPartition {
  855. return nil, err
  856. }
  857. cg.withLogger(func(l Logger) {
  858. l.Printf("using '%v' balancer to assign group, %v", group.GroupProtocol, cg.config.ID)
  859. for _, member := range members {
  860. l.Printf("found member: %v/%#v", member.ID, member.UserData)
  861. }
  862. for _, partition := range partitions {
  863. l.Printf("found topic/partition: %v/%v", partition.Topic, partition.ID)
  864. }
  865. })
  866. return balancer.AssignGroups(members, partitions), nil
  867. }
  868. // makeMemberProtocolMetadata maps encoded member metadata ([]byte) into []GroupMember
  869. func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMemberV1) ([]GroupMember, error) {
  870. members := make([]GroupMember, 0, len(in))
  871. for _, item := range in {
  872. metadata := groupMetadata{}
  873. reader := bufio.NewReader(bytes.NewReader(item.MemberMetadata))
  874. if remain, err := (&metadata).readFrom(reader, len(item.MemberMetadata)); err != nil || remain != 0 {
  875. return nil, fmt.Errorf("unable to read metadata for member, %v: %v", item.MemberID, err)
  876. }
  877. members = append(members, GroupMember{
  878. ID: item.MemberID,
  879. Topics: metadata.Topics,
  880. UserData: metadata.UserData,
  881. })
  882. }
  883. return members, nil
  884. }
  885. // syncGroup completes the consumer group nextGeneration by accepting the
  886. // memberAssignments (if this Reader is the leader) and returning this
  887. // Readers subscriptions topic => partitions
  888. //
  889. // Possible kafka error codes returned:
  890. // * GroupCoordinatorNotAvailable:
  891. // * NotCoordinatorForGroup:
  892. // * IllegalGeneration:
  893. // * RebalanceInProgress:
  894. // * GroupAuthorizationFailed:
  895. func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) {
  896. request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments)
  897. response, err := conn.syncGroup(request)
  898. if err == nil && response.ErrorCode != 0 {
  899. err = Error(response.ErrorCode)
  900. }
  901. if err != nil {
  902. return nil, err
  903. }
  904. assignments := groupAssignment{}
  905. reader := bufio.NewReader(bytes.NewReader(response.MemberAssignments))
  906. if _, err := (&assignments).readFrom(reader, len(response.MemberAssignments)); err != nil {
  907. return nil, err
  908. }
  909. if len(assignments.Topics) == 0 {
  910. cg.withLogger(func(l Logger) {
  911. l.Printf("received empty assignments for group, %v as member %s for generation %d", cg.config.ID, memberID, generationID)
  912. })
  913. }
  914. cg.withLogger(func(l Logger) {
  915. l.Printf("sync group finished for group, %v", cg.config.ID)
  916. })
  917. return assignments.Topics, nil
  918. }
  919. func (cg *ConsumerGroup) makeSyncGroupRequestV0(memberID string, generationID int32, memberAssignments GroupMemberAssignments) syncGroupRequestV0 {
  920. request := syncGroupRequestV0{
  921. GroupID: cg.config.ID,
  922. GenerationID: generationID,
  923. MemberID: memberID,
  924. }
  925. if memberAssignments != nil {
  926. request.GroupAssignments = make([]syncGroupRequestGroupAssignmentV0, 0, 1)
  927. for memberID, topics := range memberAssignments {
  928. topics32 := make(map[string][]int32)
  929. for topic, partitions := range topics {
  930. partitions32 := make([]int32, len(partitions))
  931. for i := range partitions {
  932. partitions32[i] = int32(partitions[i])
  933. }
  934. topics32[topic] = partitions32
  935. }
  936. request.GroupAssignments = append(request.GroupAssignments, syncGroupRequestGroupAssignmentV0{
  937. MemberID: memberID,
  938. MemberAssignments: groupAssignment{
  939. Version: 1,
  940. Topics: topics32,
  941. }.bytes(),
  942. })
  943. }
  944. cg.withLogger(func(logger Logger) {
  945. logger.Printf("Syncing %d assignments for generation %d as member %s", len(request.GroupAssignments), generationID, memberID)
  946. })
  947. }
  948. return request
  949. }
  950. func (cg *ConsumerGroup) fetchOffsets(conn coordinator, subs map[string][]int32) (map[string]map[int]int64, error) {
  951. req := offsetFetchRequestV1{
  952. GroupID: cg.config.ID,
  953. Topics: make([]offsetFetchRequestV1Topic, 0, len(cg.config.Topics)),
  954. }
  955. for _, topic := range cg.config.Topics {
  956. req.Topics = append(req.Topics, offsetFetchRequestV1Topic{
  957. Topic: topic,
  958. Partitions: subs[topic],
  959. })
  960. }
  961. offsets, err := conn.offsetFetch(req)
  962. if err != nil {
  963. return nil, err
  964. }
  965. offsetsByTopic := make(map[string]map[int]int64)
  966. for _, res := range offsets.Responses {
  967. offsetsByPartition := map[int]int64{}
  968. offsetsByTopic[res.Topic] = offsetsByPartition
  969. for _, pr := range res.PartitionResponses {
  970. for _, partition := range subs[res.Topic] {
  971. if partition == pr.Partition {
  972. offset := pr.Offset
  973. if offset < 0 {
  974. offset = cg.config.StartOffset
  975. }
  976. offsetsByPartition[int(partition)] = offset
  977. }
  978. }
  979. }
  980. }
  981. return offsetsByTopic, nil
  982. }
  983. func (cg *ConsumerGroup) makeAssignments(assignments map[string][]int32, offsets map[string]map[int]int64) map[string][]PartitionAssignment {
  984. topicAssignments := make(map[string][]PartitionAssignment)
  985. for _, topic := range cg.config.Topics {
  986. topicPartitions := assignments[topic]
  987. topicAssignments[topic] = make([]PartitionAssignment, 0, len(topicPartitions))
  988. for _, partition := range topicPartitions {
  989. var offset int64
  990. partitionOffsets, ok := offsets[topic]
  991. if ok {
  992. offset, ok = partitionOffsets[int(partition)]
  993. }
  994. if !ok {
  995. offset = cg.config.StartOffset
  996. }
  997. topicAssignments[topic] = append(topicAssignments[topic], PartitionAssignment{
  998. ID: int(partition),
  999. Offset: offset,
  1000. })
  1001. }
  1002. }
  1003. return topicAssignments
  1004. }
  1005. func (cg *ConsumerGroup) leaveGroup(memberID string) error {
  1006. // don't attempt to leave the group if no memberID was ever assigned.
  1007. if memberID == "" {
  1008. return nil
  1009. }
  1010. cg.withLogger(func(log Logger) {
  1011. log.Printf("Leaving group %s, member %s", cg.config.ID, memberID)
  1012. })
  1013. // IMPORTANT : leaveGroup establishes its own connection to the coordinator
  1014. // because it is often called after some other operation failed.
  1015. // said failure could be the result of connection-level issues,
  1016. // so we want to re-establish the connection to ensure that we
  1017. // are able to process the cleanup step.
  1018. coordinator, err := cg.coordinator()
  1019. if err != nil {
  1020. return err
  1021. }
  1022. _, err = coordinator.leaveGroup(leaveGroupRequestV0{
  1023. GroupID: cg.config.ID,
  1024. MemberID: memberID,
  1025. })
  1026. if err != nil {
  1027. cg.withErrorLogger(func(log Logger) {
  1028. log.Printf("leave group failed for group, %v, and member, %v: %v", cg.config.ID, memberID, err)
  1029. })
  1030. }
  1031. _ = coordinator.Close()
  1032. return err
  1033. }
  1034. func (cg *ConsumerGroup) withLogger(do func(Logger)) {
  1035. if cg.config.Logger != nil {
  1036. do(cg.config.Logger)
  1037. }
  1038. }
  1039. func (cg *ConsumerGroup) withErrorLogger(do func(Logger)) {
  1040. if cg.config.ErrorLogger != nil {
  1041. do(cg.config.ErrorLogger)
  1042. } else {
  1043. cg.withLogger(do)
  1044. }
  1045. }