reader.go 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502
  1. package kafka
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "sort"
  9. "strconv"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. )
  14. const (
  15. LastOffset int64 = -1 // The most recent offset available for a partition.
  16. FirstOffset int64 = -2 // The least recent offset available for a partition.
  17. )
  18. const (
  19. // defaultCommitRetries holds the number commit attempts to make
  20. // before giving up
  21. defaultCommitRetries = 3
  22. )
  23. const (
  24. // defaultFetchMinBytes of 1 byte means that fetch requests are answered as
  25. // soon as a single byte of data is available or the fetch request times out
  26. // waiting for data to arrive.
  27. defaultFetchMinBytes = 1
  28. )
  29. var (
  30. errOnlyAvailableWithGroup = errors.New("unavailable when GroupID is not set")
  31. errNotAvailableWithGroup = errors.New("unavailable when GroupID is set")
  32. )
  33. const (
  34. // defaultReadBackoffMax/Min sets the boundaries for how long the reader wait before
  35. // polling for new messages
  36. defaultReadBackoffMin = 100 * time.Millisecond
  37. defaultReadBackoffMax = 1 * time.Second
  38. )
  39. // Reader provides a high-level API for consuming messages from kafka.
  40. //
  41. // A Reader automatically manages reconnections to a kafka server, and
  42. // blocking methods have context support for asynchronous cancellations.
  43. type Reader struct {
  44. // immutable fields of the reader
  45. config ReaderConfig
  46. // communication channels between the parent reader and its subreaders
  47. msgs chan readerMessage
  48. // mutable fields of the reader (synchronized on the mutex)
  49. mutex sync.Mutex
  50. join sync.WaitGroup
  51. cancel context.CancelFunc
  52. stop context.CancelFunc
  53. done chan struct{}
  54. commits chan commitRequest
  55. version int64 // version holds the generation of the spawned readers
  56. offset int64
  57. lag int64
  58. closed bool
  59. // Without a group subscription (when Reader.config.GroupID == ""),
  60. // when errors occur, the Reader gets a synthetic readerMessage with
  61. // a non-nil err set. With group subscriptions however, when an error
  62. // occurs in Reader.run, there's no reader running (sic, cf. reader vs.
  63. // Reader) and there's no way to let the high-level methods like
  64. // FetchMessage know that an error indeed occurred. If an error in run
  65. // occurs, it will be non-block-sent to this unbuffered channel, where
  66. // the high-level methods can select{} on it and notify the caller.
  67. runError chan error
  68. // reader stats are all made of atomic values, no need for synchronization.
  69. once uint32
  70. stctx context.Context
  71. // reader stats are all made of atomic values, no need for synchronization.
  72. // Use a pointer to ensure 64-bit alignment of the values.
  73. stats *readerStats
  74. }
  75. // useConsumerGroup indicates whether the Reader is part of a consumer group.
  76. func (r *Reader) useConsumerGroup() bool { return r.config.GroupID != "" }
  77. // useSyncCommits indicates whether the Reader is configured to perform sync or
  78. // async commits.
  79. func (r *Reader) useSyncCommits() bool { return r.config.CommitInterval == 0 }
  80. func (r *Reader) unsubscribe() {
  81. r.cancel()
  82. r.join.Wait()
  83. // it would be interesting to drain the r.msgs channel at this point since
  84. // it will contain buffered messages for partitions that may not be
  85. // re-assigned to this reader in the next consumer group generation.
  86. // however, draining the channel could race with the client calling
  87. // ReadMessage, which could result in messages delivered and/or committed
  88. // with gaps in the offset. for now, we will err on the side of caution and
  89. // potentially have those messages be reprocessed in the next generation by
  90. // another consumer to avoid such a race.
  91. }
  92. func (r *Reader) subscribe(assignments []PartitionAssignment) {
  93. offsetsByPartition := make(map[int]int64)
  94. for _, assignment := range assignments {
  95. offsetsByPartition[assignment.ID] = assignment.Offset
  96. }
  97. r.mutex.Lock()
  98. r.start(offsetsByPartition)
  99. r.mutex.Unlock()
  100. r.withLogger(func(l Logger) {
  101. l.Printf("subscribed to partitions: %+v", offsetsByPartition)
  102. })
  103. }
  104. func (r *Reader) waitThrottleTime(throttleTimeMS int32) {
  105. if throttleTimeMS == 0 {
  106. return
  107. }
  108. t := time.NewTimer(time.Duration(throttleTimeMS) * time.Millisecond)
  109. defer t.Stop()
  110. select {
  111. case <-r.stctx.Done():
  112. return
  113. case <-t.C:
  114. }
  115. }
  116. // commitOffsetsWithRetry attempts to commit the specified offsets and retries
  117. // up to the specified number of times
  118. func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash, retries int) (err error) {
  119. const (
  120. backoffDelayMin = 100 * time.Millisecond
  121. backoffDelayMax = 5 * time.Second
  122. )
  123. for attempt := 0; attempt < retries; attempt++ {
  124. if attempt != 0 {
  125. if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) {
  126. return
  127. }
  128. }
  129. if err = gen.CommitOffsets(offsetStash); err == nil {
  130. return
  131. }
  132. }
  133. return // err will not be nil
  134. }
  135. // offsetStash holds offsets by topic => partition => offset
  136. type offsetStash map[string]map[int]int64
  137. // merge updates the offsetStash with the offsets from the provided messages
  138. func (o offsetStash) merge(commits []commit) {
  139. for _, c := range commits {
  140. offsetsByPartition, ok := o[c.topic]
  141. if !ok {
  142. offsetsByPartition = map[int]int64{}
  143. o[c.topic] = offsetsByPartition
  144. }
  145. if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset {
  146. offsetsByPartition[c.partition] = c.offset
  147. }
  148. }
  149. }
  150. // reset clears the contents of the offsetStash
  151. func (o offsetStash) reset() {
  152. for key := range o {
  153. delete(o, key)
  154. }
  155. }
  156. // commitLoopImmediate handles each commit synchronously
  157. func (r *Reader) commitLoopImmediate(ctx context.Context, gen *Generation) {
  158. offsets := offsetStash{}
  159. for {
  160. select {
  161. case <-ctx.Done():
  162. return
  163. case req := <-r.commits:
  164. offsets.merge(req.commits)
  165. req.errch <- r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries)
  166. offsets.reset()
  167. }
  168. }
  169. }
  170. // commitLoopInterval handles each commit asynchronously with a period defined
  171. // by ReaderConfig.CommitInterval
  172. func (r *Reader) commitLoopInterval(ctx context.Context, gen *Generation) {
  173. ticker := time.NewTicker(r.config.CommitInterval)
  174. defer ticker.Stop()
  175. // the offset stash should not survive rebalances b/c the consumer may
  176. // receive new assignments.
  177. offsets := offsetStash{}
  178. commit := func() {
  179. if err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries); err != nil {
  180. r.withErrorLogger(func(l Logger) { l.Printf(err.Error()) })
  181. } else {
  182. offsets.reset()
  183. }
  184. }
  185. for {
  186. select {
  187. case <-ctx.Done():
  188. // drain the commit channel in order to prepare the final commit.
  189. for hasCommits := true; hasCommits; {
  190. select {
  191. case req := <-r.commits:
  192. offsets.merge(req.commits)
  193. default:
  194. hasCommits = false
  195. }
  196. }
  197. commit()
  198. return
  199. case <-ticker.C:
  200. commit()
  201. case req := <-r.commits:
  202. offsets.merge(req.commits)
  203. }
  204. }
  205. }
  206. // commitLoop processes commits off the commit chan
  207. func (r *Reader) commitLoop(ctx context.Context, gen *Generation) {
  208. r.withLogger(func(l Logger) {
  209. l.Printf("started commit for group %s\n", r.config.GroupID)
  210. })
  211. defer r.withLogger(func(l Logger) {
  212. l.Printf("stopped commit for group %s\n", r.config.GroupID)
  213. })
  214. if r.config.CommitInterval == 0 {
  215. r.commitLoopImmediate(ctx, gen)
  216. } else {
  217. r.commitLoopInterval(ctx, gen)
  218. }
  219. }
  220. // run provides the main consumer group management loop. Each iteration performs the
  221. // handshake to join the Reader to the consumer group.
  222. //
  223. // This function is responsible for closing the consumer group upon exit.
  224. func (r *Reader) run(cg *ConsumerGroup) {
  225. defer close(r.done)
  226. defer cg.Close()
  227. r.withLogger(func(l Logger) {
  228. l.Printf("entering loop for consumer group, %v\n", r.config.GroupID)
  229. })
  230. for {
  231. // Limit the number of attempts at waiting for the next
  232. // consumer generation.
  233. var err error
  234. var gen *Generation
  235. for attempt := 1; attempt <= r.config.MaxAttempts; attempt++ {
  236. gen, err = cg.Next(r.stctx)
  237. if err == nil {
  238. break
  239. }
  240. if err == r.stctx.Err() {
  241. return
  242. }
  243. r.stats.errors.observe(1)
  244. r.withErrorLogger(func(l Logger) {
  245. l.Printf(err.Error())
  246. })
  247. // Continue with next attempt...
  248. }
  249. if err != nil {
  250. // All attempts have failed.
  251. select {
  252. case r.runError <- err:
  253. // If somebody's receiving on the runError, let
  254. // them know the error occurred.
  255. default:
  256. // Otherwise, don't block to allow healing.
  257. }
  258. continue
  259. }
  260. r.stats.rebalances.observe(1)
  261. r.subscribe(gen.Assignments[r.config.Topic])
  262. gen.Start(func(ctx context.Context) {
  263. r.commitLoop(ctx, gen)
  264. })
  265. gen.Start(func(ctx context.Context) {
  266. // wait for the generation to end and then unsubscribe.
  267. select {
  268. case <-ctx.Done():
  269. // continue to next generation
  270. case <-r.stctx.Done():
  271. // this will be the last loop because the reader is closed.
  272. }
  273. r.unsubscribe()
  274. })
  275. }
  276. }
  277. // ReaderConfig is a configuration object used to create new instances of
  278. // Reader.
  279. type ReaderConfig struct {
  280. // The list of broker addresses used to connect to the kafka cluster.
  281. Brokers []string
  282. // GroupID holds the optional consumer group id. If GroupID is specified, then
  283. // Partition should NOT be specified e.g. 0
  284. GroupID string
  285. // The topic to read messages from.
  286. Topic string
  287. // Partition to read messages from. Either Partition or GroupID may
  288. // be assigned, but not both
  289. Partition int
  290. // An dialer used to open connections to the kafka server. This field is
  291. // optional, if nil, the default dialer is used instead.
  292. Dialer *Dialer
  293. // The capacity of the internal message queue, defaults to 100 if none is
  294. // set.
  295. QueueCapacity int
  296. // Min and max number of bytes to fetch from kafka in each request.
  297. MinBytes int
  298. MaxBytes int
  299. // Maximum amount of time to wait for new data to come when fetching batches
  300. // of messages from kafka.
  301. MaxWait time.Duration
  302. // ReadLagInterval sets the frequency at which the reader lag is updated.
  303. // Setting this field to a negative value disables lag reporting.
  304. ReadLagInterval time.Duration
  305. // GroupBalancers is the priority-ordered list of client-side consumer group
  306. // balancing strategies that will be offered to the coordinator. The first
  307. // strategy that all group members support will be chosen by the leader.
  308. //
  309. // Default: [Range, RoundRobin]
  310. //
  311. // Only used when GroupID is set
  312. GroupBalancers []GroupBalancer
  313. // HeartbeatInterval sets the optional frequency at which the reader sends the consumer
  314. // group heartbeat update.
  315. //
  316. // Default: 3s
  317. //
  318. // Only used when GroupID is set
  319. HeartbeatInterval time.Duration
  320. // CommitInterval indicates the interval at which offsets are committed to
  321. // the broker. If 0, commits will be handled synchronously.
  322. //
  323. // Default: 0
  324. //
  325. // Only used when GroupID is set
  326. CommitInterval time.Duration
  327. // PartitionWatchInterval indicates how often a reader checks for partition changes.
  328. // If a reader sees a partition change (such as a partition add) it will rebalance the group
  329. // picking up new partitions.
  330. //
  331. // Default: 5s
  332. //
  333. // Only used when GroupID is set and WatchPartitionChanges is set.
  334. PartitionWatchInterval time.Duration
  335. // WatchForPartitionChanges is used to inform kafka-go that a consumer group should be
  336. // polling the brokers and rebalancing if any partition changes happen to the topic.
  337. WatchPartitionChanges bool
  338. // SessionTimeout optionally sets the length of time that may pass without a heartbeat
  339. // before the coordinator considers the consumer dead and initiates a rebalance.
  340. //
  341. // Default: 30s
  342. //
  343. // Only used when GroupID is set
  344. SessionTimeout time.Duration
  345. // RebalanceTimeout optionally sets the length of time the coordinator will wait
  346. // for members to join as part of a rebalance. For kafka servers under higher
  347. // load, it may be useful to set this value higher.
  348. //
  349. // Default: 30s
  350. //
  351. // Only used when GroupID is set
  352. RebalanceTimeout time.Duration
  353. // JoinGroupBackoff optionally sets the length of time to wait between re-joining
  354. // the consumer group after an error.
  355. //
  356. // Default: 5s
  357. JoinGroupBackoff time.Duration
  358. // RetentionTime optionally sets the length of time the consumer group will be saved
  359. // by the broker
  360. //
  361. // Default: 24h
  362. //
  363. // Only used when GroupID is set
  364. RetentionTime time.Duration
  365. // StartOffset determines from whence the consumer group should begin
  366. // consuming when it finds a partition without a committed offset. If
  367. // non-zero, it must be set to one of FirstOffset or LastOffset.
  368. //
  369. // Default: FirstOffset
  370. //
  371. // Only used when GroupID is set
  372. StartOffset int64
  373. // BackoffDelayMin optionally sets the smallest amount of time the reader will wait before
  374. // polling for new messages
  375. //
  376. // Default: 100ms
  377. ReadBackoffMin time.Duration
  378. // BackoffDelayMax optionally sets the maximum amount of time the reader will wait before
  379. // polling for new messages
  380. //
  381. // Default: 1s
  382. ReadBackoffMax time.Duration
  383. // If not nil, specifies a logger used to report internal changes within the
  384. // reader.
  385. Logger Logger
  386. // ErrorLogger is the logger used to report errors. If nil, the reader falls
  387. // back to using Logger instead.
  388. ErrorLogger Logger
  389. // IsolationLevel controls the visibility of transactional records.
  390. // ReadUncommitted makes all records visible. With ReadCommitted only
  391. // non-transactional and committed records are visible.
  392. IsolationLevel IsolationLevel
  393. // Limit of how many attempts will be made before delivering the error.
  394. //
  395. // The default is to try 3 times.
  396. MaxAttempts int
  397. }
  398. // Validate method validates ReaderConfig properties.
  399. func (config *ReaderConfig) Validate() error {
  400. if len(config.Brokers) == 0 {
  401. return errors.New("cannot create a new kafka reader with an empty list of broker addresses")
  402. }
  403. if len(config.Topic) == 0 {
  404. return errors.New("cannot create a new kafka reader with an empty topic")
  405. }
  406. if config.Partition < 0 || config.Partition >= math.MaxInt32 {
  407. return errors.New(fmt.Sprintf("partition number out of bounds: %d", config.Partition))
  408. }
  409. if config.MinBytes < 0 {
  410. return errors.New(fmt.Sprintf("invalid negative minimum batch size (min = %d)", config.MinBytes))
  411. }
  412. if config.MaxBytes < 0 {
  413. return errors.New(fmt.Sprintf("invalid negative maximum batch size (max = %d)", config.MaxBytes))
  414. }
  415. if config.GroupID != "" && config.Partition != 0 {
  416. return errors.New("either Partition or GroupID may be specified, but not both")
  417. }
  418. if config.MinBytes > config.MaxBytes {
  419. return errors.New(fmt.Sprintf("minimum batch size greater than the maximum (min = %d, max = %d)", config.MinBytes, config.MaxBytes))
  420. }
  421. if config.ReadBackoffMax < 0 {
  422. return errors.New(fmt.Sprintf("ReadBackoffMax out of bounds: %d", config.ReadBackoffMax))
  423. }
  424. if config.ReadBackoffMin < 0 {
  425. return errors.New(fmt.Sprintf("ReadBackoffMin out of bounds: %d", config.ReadBackoffMin))
  426. }
  427. return nil
  428. }
  429. // ReaderStats is a data structure returned by a call to Reader.Stats that exposes
  430. // details about the behavior of the reader.
  431. type ReaderStats struct {
  432. Dials int64 `metric:"kafka.reader.dial.count" type:"counter"`
  433. Fetches int64 `metric:"kafka.reader.fetch.count" type:"counter"`
  434. Messages int64 `metric:"kafka.reader.message.count" type:"counter"`
  435. Bytes int64 `metric:"kafka.reader.message.bytes" type:"counter"`
  436. Rebalances int64 `metric:"kafka.reader.rebalance.count" type:"counter"`
  437. Timeouts int64 `metric:"kafka.reader.timeout.count" type:"counter"`
  438. Errors int64 `metric:"kafka.reader.error.count" type:"counter"`
  439. DialTime DurationStats `metric:"kafka.reader.dial.seconds"`
  440. ReadTime DurationStats `metric:"kafka.reader.read.seconds"`
  441. WaitTime DurationStats `metric:"kafka.reader.wait.seconds"`
  442. FetchSize SummaryStats `metric:"kafka.reader.fetch.size"`
  443. FetchBytes SummaryStats `metric:"kafka.reader.fetch.bytes"`
  444. Offset int64 `metric:"kafka.reader.offset" type:"gauge"`
  445. Lag int64 `metric:"kafka.reader.lag" type:"gauge"`
  446. MinBytes int64 `metric:"kafka.reader.fetch_bytes.min" type:"gauge"`
  447. MaxBytes int64 `metric:"kafka.reader.fetch_bytes.max" type:"gauge"`
  448. MaxWait time.Duration `metric:"kafka.reader.fetch_wait.max" type:"gauge"`
  449. QueueLength int64 `metric:"kafka.reader.queue.length" type:"gauge"`
  450. QueueCapacity int64 `metric:"kafka.reader.queue.capacity" type:"gauge"`
  451. ClientID string `tag:"client_id"`
  452. Topic string `tag:"topic"`
  453. Partition string `tag:"partition"`
  454. // The original `Fetches` field had a typo where the metric name was called
  455. // "kafak..." instead of "kafka...", in order to offer time to fix monitors
  456. // that may be relying on this mistake we are temporarily introducing this
  457. // field.
  458. DeprecatedFetchesWithTypo int64 `metric:"kafak.reader.fetch.count" type:"counter"`
  459. }
  460. // readerStats is a struct that contains statistics on a reader.
  461. type readerStats struct {
  462. dials counter
  463. fetches counter
  464. messages counter
  465. bytes counter
  466. rebalances counter
  467. timeouts counter
  468. errors counter
  469. dialTime summary
  470. readTime summary
  471. waitTime summary
  472. fetchSize summary
  473. fetchBytes summary
  474. offset gauge
  475. lag gauge
  476. partition string
  477. }
  478. // NewReader creates and returns a new Reader configured with config.
  479. // The offset is initialized to FirstOffset.
  480. func NewReader(config ReaderConfig) *Reader {
  481. if err := config.Validate(); err != nil {
  482. panic(err)
  483. }
  484. if config.GroupID != "" {
  485. if len(config.GroupBalancers) == 0 {
  486. config.GroupBalancers = []GroupBalancer{
  487. RangeGroupBalancer{},
  488. RoundRobinGroupBalancer{},
  489. }
  490. }
  491. }
  492. if config.Dialer == nil {
  493. config.Dialer = DefaultDialer
  494. }
  495. if config.MaxBytes == 0 {
  496. config.MaxBytes = 1e6 // 1 MB
  497. }
  498. if config.MinBytes == 0 {
  499. config.MinBytes = defaultFetchMinBytes
  500. }
  501. if config.MaxWait == 0 {
  502. config.MaxWait = 10 * time.Second
  503. }
  504. if config.ReadLagInterval == 0 {
  505. config.ReadLagInterval = 1 * time.Minute
  506. }
  507. if config.ReadBackoffMin == 0 {
  508. config.ReadBackoffMin = defaultReadBackoffMin
  509. }
  510. if config.ReadBackoffMax == 0 {
  511. config.ReadBackoffMax = defaultReadBackoffMax
  512. }
  513. if config.ReadBackoffMax < config.ReadBackoffMin {
  514. panic(fmt.Errorf("ReadBackoffMax %d smaller than ReadBackoffMin %d", config.ReadBackoffMax, config.ReadBackoffMin))
  515. }
  516. if config.QueueCapacity == 0 {
  517. config.QueueCapacity = 100
  518. }
  519. if config.MaxAttempts == 0 {
  520. config.MaxAttempts = 3
  521. }
  522. // when configured as a consumer group; stats should report a partition of -1
  523. readerStatsPartition := config.Partition
  524. if config.GroupID != "" {
  525. readerStatsPartition = -1
  526. }
  527. // when configured as a consume group, start version as 1 to ensure that only
  528. // the rebalance function will start readers
  529. version := int64(0)
  530. if config.GroupID != "" {
  531. version = 1
  532. }
  533. stctx, stop := context.WithCancel(context.Background())
  534. r := &Reader{
  535. config: config,
  536. msgs: make(chan readerMessage, config.QueueCapacity),
  537. cancel: func() {},
  538. commits: make(chan commitRequest, config.QueueCapacity),
  539. stop: stop,
  540. offset: FirstOffset,
  541. stctx: stctx,
  542. stats: &readerStats{
  543. dialTime: makeSummary(),
  544. readTime: makeSummary(),
  545. waitTime: makeSummary(),
  546. fetchSize: makeSummary(),
  547. fetchBytes: makeSummary(),
  548. // Generate the string representation of the partition number only
  549. // once when the reader is created.
  550. partition: strconv.Itoa(readerStatsPartition),
  551. },
  552. version: version,
  553. }
  554. if r.useConsumerGroup() {
  555. r.done = make(chan struct{})
  556. r.runError = make(chan error)
  557. cg, err := NewConsumerGroup(ConsumerGroupConfig{
  558. ID: r.config.GroupID,
  559. Brokers: r.config.Brokers,
  560. Dialer: r.config.Dialer,
  561. Topics: []string{r.config.Topic},
  562. GroupBalancers: r.config.GroupBalancers,
  563. HeartbeatInterval: r.config.HeartbeatInterval,
  564. PartitionWatchInterval: r.config.PartitionWatchInterval,
  565. WatchPartitionChanges: r.config.WatchPartitionChanges,
  566. SessionTimeout: r.config.SessionTimeout,
  567. RebalanceTimeout: r.config.RebalanceTimeout,
  568. JoinGroupBackoff: r.config.JoinGroupBackoff,
  569. RetentionTime: r.config.RetentionTime,
  570. StartOffset: r.config.StartOffset,
  571. Logger: r.config.Logger,
  572. ErrorLogger: r.config.ErrorLogger,
  573. })
  574. if err != nil {
  575. panic(err)
  576. }
  577. go r.run(cg)
  578. }
  579. return r
  580. }
  581. // Config returns the reader's configuration.
  582. func (r *Reader) Config() ReaderConfig {
  583. return r.config
  584. }
  585. // Close closes the stream, preventing the program from reading any more
  586. // messages from it.
  587. func (r *Reader) Close() error {
  588. atomic.StoreUint32(&r.once, 1)
  589. r.mutex.Lock()
  590. closed := r.closed
  591. r.closed = true
  592. r.mutex.Unlock()
  593. r.cancel()
  594. r.stop()
  595. r.join.Wait()
  596. if r.done != nil {
  597. <-r.done
  598. }
  599. if !closed {
  600. close(r.msgs)
  601. }
  602. return nil
  603. }
  604. // ReadMessage reads and return the next message from the r. The method call
  605. // blocks until a message becomes available, or an error occurs. The program
  606. // may also specify a context to asynchronously cancel the blocking operation.
  607. //
  608. // The method returns io.EOF to indicate that the reader has been closed.
  609. //
  610. // If consumer groups are used, ReadMessage will automatically commit the
  611. // offset when called. Note that this could result in an offset being committed
  612. // before the message is fully processed.
  613. //
  614. // If more fine grained control of when offsets are committed is required, it
  615. // is recommended to use FetchMessage with CommitMessages instead.
  616. func (r *Reader) ReadMessage(ctx context.Context) (Message, error) {
  617. m, err := r.FetchMessage(ctx)
  618. if err != nil {
  619. return Message{}, err
  620. }
  621. if r.useConsumerGroup() {
  622. if err := r.CommitMessages(ctx, m); err != nil {
  623. return Message{}, err
  624. }
  625. }
  626. return m, nil
  627. }
  628. // FetchMessage reads and return the next message from the r. The method call
  629. // blocks until a message becomes available, or an error occurs. The program
  630. // may also specify a context to asynchronously cancel the blocking operation.
  631. //
  632. // The method returns io.EOF to indicate that the reader has been closed.
  633. //
  634. // FetchMessage does not commit offsets automatically when using consumer groups.
  635. // Use CommitMessages to commit the offset.
  636. func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {
  637. r.activateReadLag()
  638. for {
  639. r.mutex.Lock()
  640. if !r.closed && r.version == 0 {
  641. r.start(map[int]int64{r.config.Partition: r.offset})
  642. }
  643. version := r.version
  644. r.mutex.Unlock()
  645. select {
  646. case <-ctx.Done():
  647. return Message{}, ctx.Err()
  648. case err := <-r.runError:
  649. return Message{}, err
  650. case m, ok := <-r.msgs:
  651. if !ok {
  652. return Message{}, io.EOF
  653. }
  654. if m.version >= version {
  655. r.mutex.Lock()
  656. switch {
  657. case m.error != nil:
  658. case version == r.version:
  659. r.offset = m.message.Offset + 1
  660. r.lag = m.watermark - r.offset
  661. }
  662. r.mutex.Unlock()
  663. switch m.error {
  664. case nil:
  665. case io.EOF:
  666. // io.EOF is used as a marker to indicate that the stream
  667. // has been closed, in case it was received from the inner
  668. // reader we don't want to confuse the program and replace
  669. // the error with io.ErrUnexpectedEOF.
  670. m.error = io.ErrUnexpectedEOF
  671. }
  672. return m.message, m.error
  673. }
  674. }
  675. }
  676. }
  677. // CommitMessages commits the list of messages passed as argument. The program
  678. // may pass a context to asynchronously cancel the commit operation when it was
  679. // configured to be blocking.
  680. func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error {
  681. if !r.useConsumerGroup() {
  682. return errOnlyAvailableWithGroup
  683. }
  684. var errch <-chan error
  685. var creq = commitRequest{
  686. commits: makeCommits(msgs...),
  687. }
  688. if r.useSyncCommits() {
  689. ch := make(chan error, 1)
  690. errch, creq.errch = ch, ch
  691. }
  692. select {
  693. case r.commits <- creq:
  694. case <-ctx.Done():
  695. return ctx.Err()
  696. case <-r.stctx.Done():
  697. // This context is used to ensure we don't allow commits after the
  698. // reader was closed.
  699. return io.ErrClosedPipe
  700. }
  701. if !r.useSyncCommits() {
  702. return nil
  703. }
  704. select {
  705. case <-ctx.Done():
  706. return ctx.Err()
  707. case err := <-errch:
  708. return err
  709. }
  710. }
  711. // ReadLag returns the current lag of the reader by fetching the last offset of
  712. // the topic and partition and computing the difference between that value and
  713. // the offset of the last message returned by ReadMessage.
  714. //
  715. // This method is intended to be used in cases where a program may be unable to
  716. // call ReadMessage to update the value returned by Lag, but still needs to get
  717. // an up to date estimation of how far behind the reader is. For example when
  718. // the consumer is not ready to process the next message.
  719. //
  720. // The function returns a lag of zero when the reader's current offset is
  721. // negative.
  722. func (r *Reader) ReadLag(ctx context.Context) (lag int64, err error) {
  723. if r.useConsumerGroup() {
  724. return 0, errNotAvailableWithGroup
  725. }
  726. type offsets struct {
  727. first int64
  728. last int64
  729. }
  730. offch := make(chan offsets, 1)
  731. errch := make(chan error, 1)
  732. go func() {
  733. var off offsets
  734. var err error
  735. for _, broker := range r.config.Brokers {
  736. var conn *Conn
  737. if conn, err = r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition); err != nil {
  738. continue
  739. }
  740. deadline, _ := ctx.Deadline()
  741. conn.SetDeadline(deadline)
  742. off.first, off.last, err = conn.ReadOffsets()
  743. conn.Close()
  744. if err == nil {
  745. break
  746. }
  747. }
  748. if err != nil {
  749. errch <- err
  750. } else {
  751. offch <- off
  752. }
  753. }()
  754. select {
  755. case off := <-offch:
  756. switch cur := r.Offset(); {
  757. case cur == FirstOffset:
  758. lag = off.last - off.first
  759. case cur == LastOffset:
  760. lag = 0
  761. default:
  762. lag = off.last - cur
  763. }
  764. case err = <-errch:
  765. case <-ctx.Done():
  766. err = ctx.Err()
  767. }
  768. return
  769. }
  770. // Offset returns the current absolute offset of the reader, or -1
  771. // if r is backed by a consumer group.
  772. func (r *Reader) Offset() int64 {
  773. if r.useConsumerGroup() {
  774. return -1
  775. }
  776. r.mutex.Lock()
  777. offset := r.offset
  778. r.mutex.Unlock()
  779. r.withLogger(func(log Logger) {
  780. log.Printf("looking up offset of kafka reader for partition %d of %s: %d", r.config.Partition, r.config.Topic, offset)
  781. })
  782. return offset
  783. }
  784. // Lag returns the lag of the last message returned by ReadMessage, or -1
  785. // if r is backed by a consumer group.
  786. func (r *Reader) Lag() int64 {
  787. if r.useConsumerGroup() {
  788. return -1
  789. }
  790. r.mutex.Lock()
  791. lag := r.lag
  792. r.mutex.Unlock()
  793. return lag
  794. }
  795. // SetOffset changes the offset from which the next batch of messages will be
  796. // read. The method fails with io.ErrClosedPipe if the reader has already been closed.
  797. //
  798. // From version 0.2.0, FirstOffset and LastOffset can be used to indicate the first
  799. // or last available offset in the partition. Please note while -1 and -2 were accepted
  800. // to indicate the first or last offset in previous versions, the meanings of the numbers
  801. // were swapped in 0.2.0 to match the meanings in other libraries and the Kafka protocol
  802. // specification.
  803. func (r *Reader) SetOffset(offset int64) error {
  804. if r.useConsumerGroup() {
  805. return errNotAvailableWithGroup
  806. }
  807. var err error
  808. r.mutex.Lock()
  809. if r.closed {
  810. err = io.ErrClosedPipe
  811. } else if offset != r.offset {
  812. r.withLogger(func(log Logger) {
  813. log.Printf("setting the offset of the kafka reader for partition %d of %s from %d to %d",
  814. r.config.Partition, r.config.Topic, r.offset, offset)
  815. })
  816. r.offset = offset
  817. if r.version != 0 {
  818. r.start(map[int]int64{r.config.Partition: r.offset})
  819. }
  820. r.activateReadLag()
  821. }
  822. r.mutex.Unlock()
  823. return err
  824. }
  825. // SetOffsetAt changes the offset from which the next batch of messages will be
  826. // read given the timestamp t.
  827. //
  828. // The method fails if the unable to connect partition leader, or unable to read the offset
  829. // given the ts, or if the reader has been closed.
  830. func (r *Reader) SetOffsetAt(ctx context.Context, t time.Time) error {
  831. r.mutex.Lock()
  832. if r.closed {
  833. r.mutex.Unlock()
  834. return io.ErrClosedPipe
  835. }
  836. r.mutex.Unlock()
  837. for _, broker := range r.config.Brokers {
  838. conn, err := r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition)
  839. if err != nil {
  840. continue
  841. }
  842. deadline, _ := ctx.Deadline()
  843. conn.SetDeadline(deadline)
  844. offset, err := conn.ReadOffset(t)
  845. conn.Close()
  846. if err != nil {
  847. return err
  848. }
  849. return r.SetOffset(offset)
  850. }
  851. return fmt.Errorf("error setting offset for timestamp %+v", t)
  852. }
  853. // Stats returns a snapshot of the reader stats since the last time the method
  854. // was called, or since the reader was created if it is called for the first
  855. // time.
  856. //
  857. // A typical use of this method is to spawn a goroutine that will periodically
  858. // call Stats on a kafka reader and report the metrics to a stats collection
  859. // system.
  860. func (r *Reader) Stats() ReaderStats {
  861. stats := ReaderStats{
  862. Dials: r.stats.dials.snapshot(),
  863. Fetches: r.stats.fetches.snapshot(),
  864. Messages: r.stats.messages.snapshot(),
  865. Bytes: r.stats.bytes.snapshot(),
  866. Rebalances: r.stats.rebalances.snapshot(),
  867. Timeouts: r.stats.timeouts.snapshot(),
  868. Errors: r.stats.errors.snapshot(),
  869. DialTime: r.stats.dialTime.snapshotDuration(),
  870. ReadTime: r.stats.readTime.snapshotDuration(),
  871. WaitTime: r.stats.waitTime.snapshotDuration(),
  872. FetchSize: r.stats.fetchSize.snapshot(),
  873. FetchBytes: r.stats.fetchBytes.snapshot(),
  874. Offset: r.stats.offset.snapshot(),
  875. Lag: r.stats.lag.snapshot(),
  876. MinBytes: int64(r.config.MinBytes),
  877. MaxBytes: int64(r.config.MaxBytes),
  878. MaxWait: r.config.MaxWait,
  879. QueueLength: int64(len(r.msgs)),
  880. QueueCapacity: int64(cap(r.msgs)),
  881. ClientID: r.config.Dialer.ClientID,
  882. Topic: r.config.Topic,
  883. Partition: r.stats.partition,
  884. }
  885. // TODO: remove when we get rid of the deprecated field.
  886. stats.DeprecatedFetchesWithTypo = stats.Fetches
  887. return stats
  888. }
  889. func (r *Reader) withLogger(do func(Logger)) {
  890. if r.config.Logger != nil {
  891. do(r.config.Logger)
  892. }
  893. }
  894. func (r *Reader) withErrorLogger(do func(Logger)) {
  895. if r.config.ErrorLogger != nil {
  896. do(r.config.ErrorLogger)
  897. } else {
  898. r.withLogger(do)
  899. }
  900. }
  901. func (r *Reader) activateReadLag() {
  902. if r.config.ReadLagInterval > 0 && atomic.CompareAndSwapUint32(&r.once, 0, 1) {
  903. // read lag will only be calculated when not using consumer groups
  904. // todo discuss how capturing read lag should interact with rebalancing
  905. if !r.useConsumerGroup() {
  906. go r.readLag(r.stctx)
  907. }
  908. }
  909. }
  910. func (r *Reader) readLag(ctx context.Context) {
  911. ticker := time.NewTicker(r.config.ReadLagInterval)
  912. defer ticker.Stop()
  913. for {
  914. timeout, cancel := context.WithTimeout(ctx, r.config.ReadLagInterval/2)
  915. lag, err := r.ReadLag(timeout)
  916. cancel()
  917. if err != nil {
  918. r.stats.errors.observe(1)
  919. r.withErrorLogger(func(log Logger) {
  920. log.Printf("kafka reader failed to read lag of partition %d of %s: %s", r.config.Partition, r.config.Topic, err)
  921. })
  922. } else {
  923. r.stats.lag.observe(lag)
  924. }
  925. select {
  926. case <-ticker.C:
  927. case <-ctx.Done():
  928. return
  929. }
  930. }
  931. }
  932. func (r *Reader) start(offsetsByPartition map[int]int64) {
  933. if r.closed {
  934. // don't start child reader if parent Reader is closed
  935. return
  936. }
  937. ctx, cancel := context.WithCancel(context.Background())
  938. r.cancel() // always cancel the previous reader
  939. r.cancel = cancel
  940. r.version++
  941. r.join.Add(len(offsetsByPartition))
  942. for partition, offset := range offsetsByPartition {
  943. go func(ctx context.Context, partition int, offset int64, join *sync.WaitGroup) {
  944. defer join.Done()
  945. (&reader{
  946. dialer: r.config.Dialer,
  947. logger: r.config.Logger,
  948. errorLogger: r.config.ErrorLogger,
  949. brokers: r.config.Brokers,
  950. topic: r.config.Topic,
  951. partition: partition,
  952. minBytes: r.config.MinBytes,
  953. maxBytes: r.config.MaxBytes,
  954. maxWait: r.config.MaxWait,
  955. backoffDelayMin: r.config.ReadBackoffMin,
  956. backoffDelayMax: r.config.ReadBackoffMax,
  957. version: r.version,
  958. msgs: r.msgs,
  959. stats: r.stats,
  960. isolationLevel: r.config.IsolationLevel,
  961. maxAttempts: r.config.MaxAttempts,
  962. }).run(ctx, offset)
  963. }(ctx, partition, offset, &r.join)
  964. }
  965. }
  966. // A reader reads messages from kafka and produces them on its channels, it's
  967. // used as an way to asynchronously fetch messages while the main program reads
  968. // them using the high level reader API.
  969. type reader struct {
  970. dialer *Dialer
  971. logger Logger
  972. errorLogger Logger
  973. brokers []string
  974. topic string
  975. partition int
  976. minBytes int
  977. maxBytes int
  978. maxWait time.Duration
  979. backoffDelayMin time.Duration
  980. backoffDelayMax time.Duration
  981. version int64
  982. msgs chan<- readerMessage
  983. stats *readerStats
  984. isolationLevel IsolationLevel
  985. maxAttempts int
  986. }
  987. type readerMessage struct {
  988. version int64
  989. message Message
  990. watermark int64
  991. error error
  992. }
  993. func (r *reader) run(ctx context.Context, offset int64) {
  994. // This is the reader's main loop, it only ends if the context is canceled
  995. // and will keep attempting to reader messages otherwise.
  996. //
  997. // Retrying indefinitely has the nice side effect of preventing Read calls
  998. // on the parent reader to block if connection to the kafka server fails,
  999. // the reader keeps reporting errors on the error channel which will then
  1000. // be surfaced to the program.
  1001. // If the reader wasn't retrying then the program would block indefinitely
  1002. // on a Read call after reading the first error.
  1003. for attempt := 0; true; attempt++ {
  1004. if attempt != 0 {
  1005. if !sleep(ctx, backoff(attempt, r.backoffDelayMin, r.backoffDelayMax)) {
  1006. return
  1007. }
  1008. }
  1009. r.withLogger(func(log Logger) {
  1010. log.Printf("initializing kafka reader for partition %d of %s starting at offset %d", r.partition, r.topic, offset)
  1011. })
  1012. conn, start, err := r.initialize(ctx, offset)
  1013. switch err {
  1014. case nil:
  1015. case OffsetOutOfRange:
  1016. // This would happen if the requested offset is passed the last
  1017. // offset on the partition leader. In that case we're just going
  1018. // to retry later hoping that enough data has been produced.
  1019. r.withErrorLogger(func(log Logger) {
  1020. log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, OffsetOutOfRange)
  1021. })
  1022. continue
  1023. default:
  1024. // Perform a configured number of attempts before
  1025. // reporting first errors, this helps mitigate
  1026. // situations where the kafka server is temporarily
  1027. // unavailable.
  1028. if attempt >= r.maxAttempts {
  1029. r.sendError(ctx, err)
  1030. } else {
  1031. r.stats.errors.observe(1)
  1032. r.withErrorLogger(func(log Logger) {
  1033. log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, err)
  1034. })
  1035. }
  1036. continue
  1037. }
  1038. // Resetting the attempt counter ensures that if a failure occurs after
  1039. // a successful initialization we don't keep increasing the backoff
  1040. // timeout.
  1041. attempt = 0
  1042. // Now we're sure to have an absolute offset number, may anything happen
  1043. // to the connection we know we'll want to restart from this offset.
  1044. offset = start
  1045. errcount := 0
  1046. readLoop:
  1047. for {
  1048. if !sleep(ctx, backoff(errcount, r.backoffDelayMin, r.backoffDelayMax)) {
  1049. conn.Close()
  1050. return
  1051. }
  1052. switch offset, err = r.read(ctx, offset, conn); err {
  1053. case nil:
  1054. errcount = 0
  1055. case io.EOF:
  1056. // done with this batch of messages...carry on. note that this
  1057. // block relies on the batch repackaging real io.EOF errors as
  1058. // io.UnexpectedEOF. otherwise, we would end up swallowing real
  1059. // errors here.
  1060. errcount = 0
  1061. case UnknownTopicOrPartition:
  1062. r.withErrorLogger(func(log Logger) {
  1063. log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, offset, r.brokers)
  1064. })
  1065. conn.Close()
  1066. // The next call to .initialize will re-establish a connection to the proper
  1067. // topic/partition broker combo.
  1068. r.stats.rebalances.observe(1)
  1069. break readLoop
  1070. case NotLeaderForPartition:
  1071. r.withErrorLogger(func(log Logger) {
  1072. log.Printf("failed to read from current broker for partition %d of %s at offset %d, not the leader", r.partition, r.topic, offset)
  1073. })
  1074. conn.Close()
  1075. // The next call to .initialize will re-establish a connection to the proper
  1076. // partition leader.
  1077. r.stats.rebalances.observe(1)
  1078. break readLoop
  1079. case RequestTimedOut:
  1080. // Timeout on the kafka side, this can be safely retried.
  1081. errcount = 0
  1082. r.withLogger(func(log Logger) {
  1083. log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d", r.partition, r.topic, offset)
  1084. })
  1085. r.stats.timeouts.observe(1)
  1086. continue
  1087. case OffsetOutOfRange:
  1088. first, last, err := r.readOffsets(conn)
  1089. if err != nil {
  1090. r.withErrorLogger(func(log Logger) {
  1091. log.Printf("the kafka reader got an error while attempting to determine whether it was reading before the first offset or after the last offset of partition %d of %s: %s", r.partition, r.topic, err)
  1092. })
  1093. conn.Close()
  1094. break readLoop
  1095. }
  1096. switch {
  1097. case offset < first:
  1098. r.withErrorLogger(func(log Logger) {
  1099. log.Printf("the kafka reader is reading before the first offset for partition %d of %s, skipping from offset %d to %d (%d messages)", r.partition, r.topic, offset, first, first-offset)
  1100. })
  1101. offset, errcount = first, 0
  1102. continue // retry immediately so we don't keep falling behind due to the backoff
  1103. case offset < last:
  1104. errcount = 0
  1105. continue // more messages have already become available, retry immediately
  1106. default:
  1107. // We may be reading past the last offset, will retry later.
  1108. r.withErrorLogger(func(log Logger) {
  1109. log.Printf("the kafka reader is reading passed the last offset for partition %d of %s at offset %d", r.partition, r.topic, offset)
  1110. })
  1111. }
  1112. case context.Canceled:
  1113. // Another reader has taken over, we can safely quit.
  1114. conn.Close()
  1115. return
  1116. case errUnknownCodec:
  1117. // The compression codec is either unsupported or has not been
  1118. // imported. This is a fatal error b/c the reader cannot
  1119. // proceed.
  1120. r.sendError(ctx, err)
  1121. break readLoop
  1122. default:
  1123. if _, ok := err.(Error); ok {
  1124. r.sendError(ctx, err)
  1125. } else {
  1126. r.withErrorLogger(func(log Logger) {
  1127. log.Printf("the kafka reader got an unknown error reading partition %d of %s at offset %d: %s", r.partition, r.topic, offset, err)
  1128. })
  1129. r.stats.errors.observe(1)
  1130. conn.Close()
  1131. break readLoop
  1132. }
  1133. }
  1134. errcount++
  1135. }
  1136. }
  1137. }
  1138. func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, start int64, err error) {
  1139. for i := 0; i != len(r.brokers) && conn == nil; i++ {
  1140. var broker = r.brokers[i]
  1141. var first, last int64
  1142. t0 := time.Now()
  1143. conn, err = r.dialer.DialLeader(ctx, "tcp", broker, r.topic, r.partition)
  1144. t1 := time.Now()
  1145. r.stats.dials.observe(1)
  1146. r.stats.dialTime.observeDuration(t1.Sub(t0))
  1147. if err != nil {
  1148. continue
  1149. }
  1150. if first, last, err = r.readOffsets(conn); err != nil {
  1151. conn.Close()
  1152. conn = nil
  1153. break
  1154. }
  1155. switch {
  1156. case offset == FirstOffset:
  1157. offset = first
  1158. case offset == LastOffset:
  1159. offset = last
  1160. case offset < first:
  1161. offset = first
  1162. }
  1163. r.withLogger(func(log Logger) {
  1164. log.Printf("the kafka reader for partition %d of %s is seeking to offset %d", r.partition, r.topic, offset)
  1165. })
  1166. if start, err = conn.Seek(offset, SeekAbsolute); err != nil {
  1167. conn.Close()
  1168. conn = nil
  1169. break
  1170. }
  1171. conn.SetDeadline(time.Time{})
  1172. }
  1173. return
  1174. }
  1175. func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
  1176. r.stats.fetches.observe(1)
  1177. r.stats.offset.observe(offset)
  1178. t0 := time.Now()
  1179. conn.SetReadDeadline(t0.Add(r.maxWait))
  1180. batch := conn.ReadBatchWith(ReadBatchConfig{
  1181. MinBytes: r.minBytes,
  1182. MaxBytes: r.maxBytes,
  1183. IsolationLevel: r.isolationLevel,
  1184. })
  1185. highWaterMark := batch.HighWaterMark()
  1186. t1 := time.Now()
  1187. r.stats.waitTime.observeDuration(t1.Sub(t0))
  1188. var msg Message
  1189. var err error
  1190. var size int64
  1191. var bytes int64
  1192. const safetyTimeout = 10 * time.Second
  1193. deadline := time.Now().Add(safetyTimeout)
  1194. conn.SetReadDeadline(deadline)
  1195. for {
  1196. if now := time.Now(); deadline.Sub(now) < (safetyTimeout / 2) {
  1197. deadline = now.Add(safetyTimeout)
  1198. conn.SetReadDeadline(deadline)
  1199. }
  1200. if msg, err = batch.ReadMessage(); err != nil {
  1201. batch.Close()
  1202. break
  1203. }
  1204. n := int64(len(msg.Key) + len(msg.Value))
  1205. r.stats.messages.observe(1)
  1206. r.stats.bytes.observe(n)
  1207. if err = r.sendMessage(ctx, msg, highWaterMark); err != nil {
  1208. batch.Close()
  1209. break
  1210. }
  1211. offset = msg.Offset + 1
  1212. r.stats.offset.observe(offset)
  1213. r.stats.lag.observe(highWaterMark - offset)
  1214. size++
  1215. bytes += n
  1216. }
  1217. conn.SetReadDeadline(time.Time{})
  1218. t2 := time.Now()
  1219. r.stats.readTime.observeDuration(t2.Sub(t1))
  1220. r.stats.fetchSize.observe(size)
  1221. r.stats.fetchBytes.observe(bytes)
  1222. return offset, err
  1223. }
  1224. func (r *reader) readOffsets(conn *Conn) (first, last int64, err error) {
  1225. conn.SetDeadline(time.Now().Add(10 * time.Second))
  1226. return conn.ReadOffsets()
  1227. }
  1228. func (r *reader) sendMessage(ctx context.Context, msg Message, watermark int64) error {
  1229. select {
  1230. case r.msgs <- readerMessage{version: r.version, message: msg, watermark: watermark}:
  1231. return nil
  1232. case <-ctx.Done():
  1233. return ctx.Err()
  1234. }
  1235. }
  1236. func (r *reader) sendError(ctx context.Context, err error) error {
  1237. select {
  1238. case r.msgs <- readerMessage{version: r.version, error: err}:
  1239. return nil
  1240. case <-ctx.Done():
  1241. return ctx.Err()
  1242. }
  1243. }
  1244. func (r *reader) withLogger(do func(Logger)) {
  1245. if r.logger != nil {
  1246. do(r.logger)
  1247. }
  1248. }
  1249. func (r *reader) withErrorLogger(do func(Logger)) {
  1250. if r.errorLogger != nil {
  1251. do(r.errorLogger)
  1252. } else {
  1253. r.withLogger(do)
  1254. }
  1255. }
  1256. // extractTopics returns the unique list of topics represented by the set of
  1257. // provided members
  1258. func extractTopics(members []GroupMember) []string {
  1259. var visited = map[string]struct{}{}
  1260. var topics []string
  1261. for _, member := range members {
  1262. for _, topic := range member.Topics {
  1263. if _, seen := visited[topic]; seen {
  1264. continue
  1265. }
  1266. topics = append(topics, topic)
  1267. visited[topic] = struct{}{}
  1268. }
  1269. }
  1270. sort.Strings(topics)
  1271. return topics
  1272. }