writer.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098
  1. package kafka
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "io"
  7. "net"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. metadataAPI "github.com/segmentio/kafka-go/protocol/metadata"
  12. )
  13. // The Writer type provides the implementation of a producer of kafka messages
  14. // that automatically distributes messages across partitions of a single topic
  15. // using a configurable balancing policy.
  16. //
  17. // Writes manage the dispatch of messages across partitions of the topic they
  18. // are configured to write to using a Balancer, and aggregate batches to
  19. // optimize the writes to kafka.
  20. //
  21. // Writers may be configured to be used synchronously or asynchronously. When
  22. // use synchronously, calls to WriteMessages block until the messages have been
  23. // written to kafka. In this mode, the program should inspect the error returned
  24. // by the function and test if it an instance of kafka.WriteErrors in order to
  25. // identify which messages have succeeded or failed, for example:
  26. //
  27. // // Construct a synchronous writer (the default mode).
  28. // w := &kafka.Writer{
  29. // Addr: kafka.TCP("localhost:9092"),
  30. // Topic: "topic-A",
  31. // RequiredAcks: kafka.RequireAll,
  32. // }
  33. //
  34. // ...
  35. //
  36. // // Passing a context can prevent the operation from blocking indefinitely.
  37. // switch err := w.WriteMessages(ctx, msgs...).(type) {
  38. // case nil:
  39. // case kafka.WriteErrors:
  40. // for i := range msgs {
  41. // if err[i] != nil {
  42. // // handle the error writing msgs[i]
  43. // ...
  44. // }
  45. // }
  46. // default:
  47. // // handle other errors
  48. // ...
  49. // }
  50. //
  51. // In asynchronous mode, the program may configure a completion handler on the
  52. // writer to receive notifications of messages being written to kafka:
  53. //
  54. // w := &kafka.Writer{
  55. // Addr: kafka.TCP("localhost:9092"),
  56. // Topic: "topic-A",
  57. // RequiredAcks: kafka.RequireAll,
  58. // Async: true, // make the writer asynchronous
  59. // Completion: func(messages []kafka.Message, err error) {
  60. // ...
  61. // },
  62. // }
  63. //
  64. // ...
  65. //
  66. // // Because the writer is asynchronous, there is no need for the context to
  67. // // be cancelled, the call will never block.
  68. // if err := w.WriteMessages(context.Background(), msgs...); err != nil {
  69. // // Only validation errors would be reported in this case.
  70. // ...
  71. // }
  72. //
  73. // Methods of Writer are safe to use concurrently from multiple goroutines,
  74. // however the writer configuration should not be modified after first use.
  75. type Writer struct {
  76. // Address of the kafka cluster that this writer is configured to send
  77. // messages to.
  78. //
  79. // This feild is required, attempting to write messages to a writer with a
  80. // nil address will error.
  81. Addr net.Addr
  82. // Topic is the name of the topic that the writer will produce messages to.
  83. //
  84. // Setting this field or not is a mutually exclusive option. If you set Topic
  85. // here, you must not set Topic for any produced Message. Otherwise, if you do
  86. // not set Topic, every Message must have Topic specified.
  87. Topic string
  88. // The balancer used to distribute messages across partitions.
  89. //
  90. // The default is to use a round-robin distribution.
  91. Balancer Balancer
  92. // Limit on how many attempts will be made to deliver a message.
  93. //
  94. // The default is to try at most 10 times.
  95. MaxAttempts int
  96. // Limit on how many messages will be buffered before being sent to a
  97. // partition.
  98. //
  99. // The default is to use a target batch size of 100 messages.
  100. BatchSize int
  101. // Limit the maximum size of a request in bytes before being sent to
  102. // a partition.
  103. //
  104. // The default is to use a kafka default value of 1048576.
  105. BatchBytes int64
  106. // Time limit on how often incomplete message batches will be flushed to
  107. // kafka.
  108. //
  109. // The default is to flush at least every second.
  110. BatchTimeout time.Duration
  111. // Timeout for read operations performed by the Writer.
  112. //
  113. // Defaults to 10 seconds.
  114. ReadTimeout time.Duration
  115. // Timeout for write operation performed by the Writer.
  116. //
  117. // Defaults to 10 seconds.
  118. WriteTimeout time.Duration
  119. // Number of acknowledges from partition replicas required before receiving
  120. // a response to a produce request, the following values are supported:
  121. //
  122. // RequireNone (0) fire-and-forget, do not wait for acknowledgements from the
  123. // RequireOne (1) wait for the leader to acknowledge the writes
  124. // RequireAll (-1) wait for the full ISR to acknowledge the writes
  125. //
  126. RequiredAcks RequiredAcks
  127. // Setting this flag to true causes the WriteMessages method to never block.
  128. // It also means that errors are ignored since the caller will not receive
  129. // the returned value. Use this only if you don't care about guarantees of
  130. // whether the messages were written to kafka.
  131. Async bool
  132. // An optional function called when the writer succeeds or fails the
  133. // delivery of messages to a kafka partition. When writing the messages
  134. // fails, the `err` parameter will be non-nil.
  135. //
  136. // The messages that the Completion function is called with have their
  137. // topic, partition, offset, and time set based on the Produce responses
  138. // received from kafka. All messages passed to a call to the function have
  139. // been written to the same partition. The keys and values of messages are
  140. // referencing the original byte slices carried by messages in the calls to
  141. // WriteMessages.
  142. //
  143. // The function is called from goroutines started by the writer. Calls to
  144. // Close will block on the Completion function calls. When the Writer is
  145. // not writing asynchronously, the WriteMessages call will also block on
  146. // Completion function, which is a useful guarantee if the byte slices
  147. // for the message keys and values are intended to be reused after the
  148. // WriteMessages call returned.
  149. //
  150. // If a completion function panics, the program terminates because the
  151. // panic is not recovered by the writer and bubbles up to the top of the
  152. // goroutine's call stack.
  153. Completion func(messages []Message, err error)
  154. // Compression set the compression codec to be used to compress messages.
  155. Compression Compression
  156. // If not nil, specifies a logger used to report internal changes within the
  157. // writer.
  158. Logger Logger
  159. // ErrorLogger is the logger used to report errors. If nil, the writer falls
  160. // back to using Logger instead.
  161. ErrorLogger Logger
  162. // A transport used to send messages to kafka clusters.
  163. //
  164. // If nil, DefaultTransport is used.
  165. Transport RoundTripper
  166. // Atomic flag indicating whether the writer has been closed.
  167. closed uint32
  168. group sync.WaitGroup
  169. // Manages the current batch being aggregated on the writer.
  170. mutex sync.Mutex
  171. batches map[topicPartition]*writeBatch
  172. // writer stats are all made of atomic values, no need for synchronization.
  173. // Use a pointer to ensure 64-bit alignment of the values. The once value is
  174. // used to lazily create the value when first used, allowing programs to use
  175. // the zero-value value of Writer.
  176. once sync.Once
  177. *writerStats
  178. // If no balancer is configured, the writer uses this one. RoundRobin values
  179. // are safe to use concurrently from multiple goroutines, there is no need
  180. // for extra synchronization to access this field.
  181. roundRobin RoundRobin
  182. // non-nil when a transport was created by NewWriter, remove in 1.0.
  183. transport *Transport
  184. }
  185. // WriterConfig is a configuration type used to create new instances of Writer.
  186. //
  187. // DEPRECATED: writer values should be configured directly by assigning their
  188. // exported fields. This type is kept for backward compatibility, and will be
  189. // removed in version 1.0.
  190. type WriterConfig struct {
  191. // The list of brokers used to discover the partitions available on the
  192. // kafka cluster.
  193. //
  194. // This field is required, attempting to create a writer with an empty list
  195. // of brokers will panic.
  196. Brokers []string
  197. // The topic that the writer will produce messages to.
  198. //
  199. // If provided, this will be used to set the topic for all produced messages.
  200. // If not provided, each Message must specify a topic for itself. This must be
  201. // mutually exclusive, otherwise the Writer will return an error.
  202. Topic string
  203. // The dialer used by the writer to establish connections to the kafka
  204. // cluster.
  205. //
  206. // If nil, the default dialer is used instead.
  207. Dialer *Dialer
  208. // The balancer used to distribute messages across partitions.
  209. //
  210. // The default is to use a round-robin distribution.
  211. Balancer Balancer
  212. // Limit on how many attempts will be made to deliver a message.
  213. //
  214. // The default is to try at most 10 times.
  215. MaxAttempts int
  216. // DEPRECATED: in versions prior to 0.4, the writer used channels internally
  217. // to dispatch messages to partitions. This has been replaced by an in-memory
  218. // aggregation of batches which uses shared state instead of message passing,
  219. // making this option unnecessary.
  220. QueueCapacity int
  221. // Limit on how many messages will be buffered before being sent to a
  222. // partition.
  223. //
  224. // The default is to use a target batch size of 100 messages.
  225. BatchSize int
  226. // Limit the maximum size of a request in bytes before being sent to
  227. // a partition.
  228. //
  229. // The default is to use a kafka default value of 1048576.
  230. BatchBytes int
  231. // Time limit on how often incomplete message batches will be flushed to
  232. // kafka.
  233. //
  234. // The default is to flush at least every second.
  235. BatchTimeout time.Duration
  236. // Timeout for read operations performed by the Writer.
  237. //
  238. // Defaults to 10 seconds.
  239. ReadTimeout time.Duration
  240. // Timeout for write operation performed by the Writer.
  241. //
  242. // Defaults to 10 seconds.
  243. WriteTimeout time.Duration
  244. // DEPRECATED: in versions prior to 0.4, the writer used to maintain a cache
  245. // the topic layout. With the change to use a transport to manage connections,
  246. // the responsibility of syncing the cluster layout has been delegated to the
  247. // transport.
  248. RebalanceInterval time.Duration
  249. // DEPRECACTED: in versions prior to 0.4, the writer used to manage connections
  250. // to the kafka cluster directly. With the change to use a transport to manage
  251. // connections, the writer has no connections to manage directly anymore.
  252. IdleConnTimeout time.Duration
  253. // Number of acknowledges from partition replicas required before receiving
  254. // a response to a produce request. The default is -1, which means to wait for
  255. // all replicas, and a value above 0 is required to indicate how many replicas
  256. // should acknowledge a message to be considered successful.
  257. //
  258. // This version of kafka-go (v0.3) does not support 0 required acks, due to
  259. // some internal complexity implementing this with the Kafka protocol. If you
  260. // need that functionality specifically, you'll need to upgrade to v0.4.
  261. RequiredAcks int
  262. // Setting this flag to true causes the WriteMessages method to never block.
  263. // It also means that errors are ignored since the caller will not receive
  264. // the returned value. Use this only if you don't care about guarantees of
  265. // whether the messages were written to kafka.
  266. Async bool
  267. // CompressionCodec set the codec to be used to compress Kafka messages.
  268. CompressionCodec
  269. // If not nil, specifies a logger used to report internal changes within the
  270. // writer.
  271. Logger Logger
  272. // ErrorLogger is the logger used to report errors. If nil, the writer falls
  273. // back to using Logger instead.
  274. ErrorLogger Logger
  275. }
  276. type topicPartition struct {
  277. topic string
  278. partition int32
  279. }
  280. // Validate method validates WriterConfig properties.
  281. func (config *WriterConfig) Validate() error {
  282. if len(config.Brokers) == 0 {
  283. return errors.New("cannot create a kafka writer with an empty list of brokers")
  284. }
  285. return nil
  286. }
  287. // WriterStats is a data structure returned by a call to Writer.Stats that
  288. // exposes details about the behavior of the writer.
  289. type WriterStats struct {
  290. Writes int64 `metric:"kafka.writer.write.count" type:"counter"`
  291. Messages int64 `metric:"kafka.writer.message.count" type:"counter"`
  292. Bytes int64 `metric:"kafka.writer.message.bytes" type:"counter"`
  293. Errors int64 `metric:"kafka.writer.error.count" type:"counter"`
  294. BatchTime DurationStats `metric:"kafka.writer.batch.seconds"`
  295. WriteTime DurationStats `metric:"kafka.writer.write.seconds"`
  296. WaitTime DurationStats `metric:"kafka.writer.wait.seconds"`
  297. Retries SummaryStats `metric:"kafka.writer.retries.count"`
  298. BatchSize SummaryStats `metric:"kafka.writer.batch.size"`
  299. BatchBytes SummaryStats `metric:"kafka.writer.batch.bytes"`
  300. MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"`
  301. MaxBatchSize int64 `metric:"kafka.writer.batch.max" type:"gauge"`
  302. BatchTimeout time.Duration `metric:"kafka.writer.batch.timeout" type:"gauge"`
  303. ReadTimeout time.Duration `metric:"kafka.writer.read.timeout" type:"gauge"`
  304. WriteTimeout time.Duration `metric:"kafka.writer.write.timeout" type:"gauge"`
  305. RequiredAcks int64 `metric:"kafka.writer.acks.required" type:"gauge"`
  306. Async bool `metric:"kafka.writer.async" type:"gauge"`
  307. Topic string `tag:"topic"`
  308. // DEPRECATED: these fields will only be reported for backward compatibility
  309. // if the Writer was constructed with NewWriter.
  310. Dials int64 `metric:"kafka.writer.dial.count" type:"counter"`
  311. DialTime DurationStats `metric:"kafka.writer.dial.seconds"`
  312. // DEPRECATED: these fields were meaningful prior to kafka-go 0.4, changes
  313. // to the internal implementation and the introduction of the transport type
  314. // made them unnecessary.
  315. //
  316. // The values will be zero but are left for backward compatibility to avoid
  317. // breaking programs that used these fields.
  318. Rebalances int64
  319. RebalanceInterval time.Duration
  320. QueueLength int64
  321. QueueCapacity int64
  322. ClientID string
  323. }
  324. // writerStats is a struct that contains statistics on a writer.
  325. //
  326. // Since atomic is used to mutate the statistics the values must be 64-bit aligned.
  327. // This is easily accomplished by always allocating this struct directly, (i.e. using a pointer to the struct).
  328. // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
  329. type writerStats struct {
  330. dials counter
  331. writes counter
  332. messages counter
  333. bytes counter
  334. errors counter
  335. dialTime summary
  336. batchTime summary
  337. writeTime summary
  338. waitTime summary
  339. retries summary
  340. batchSize summary
  341. batchSizeBytes summary
  342. }
  343. // NewWriter creates and returns a new Writer configured with config.
  344. //
  345. // DEPRECATED: Writer value can be instantiated and configured directly,
  346. // this function is retained for backward compatibility and will be removed
  347. // in version 1.0.
  348. func NewWriter(config WriterConfig) *Writer {
  349. if err := config.Validate(); err != nil {
  350. panic(err)
  351. }
  352. if config.Dialer == nil {
  353. config.Dialer = DefaultDialer
  354. }
  355. if config.Balancer == nil {
  356. config.Balancer = &RoundRobin{}
  357. }
  358. // Converts the pre-0.4 Dialer API into a Transport.
  359. kafkaDialer := DefaultDialer
  360. if config.Dialer != nil {
  361. kafkaDialer = config.Dialer
  362. }
  363. dialer := (&net.Dialer{
  364. Timeout: kafkaDialer.Timeout,
  365. Deadline: kafkaDialer.Deadline,
  366. LocalAddr: kafkaDialer.LocalAddr,
  367. DualStack: kafkaDialer.DualStack,
  368. FallbackDelay: kafkaDialer.FallbackDelay,
  369. KeepAlive: kafkaDialer.KeepAlive,
  370. })
  371. var resolver Resolver
  372. if r, ok := kafkaDialer.Resolver.(*net.Resolver); ok {
  373. dialer.Resolver = r
  374. } else {
  375. resolver = kafkaDialer.Resolver
  376. }
  377. stats := new(writerStats)
  378. // For backward compatibility with the pre-0.4 APIs, support custom
  379. // resolvers by wrapping the dial function.
  380. dial := func(ctx context.Context, network, addr string) (net.Conn, error) {
  381. start := time.Now()
  382. defer func() {
  383. stats.dials.observe(1)
  384. stats.dialTime.observe(int64(time.Since(start)))
  385. }()
  386. address, err := lookupHost(ctx, addr, resolver)
  387. if err != nil {
  388. return nil, err
  389. }
  390. return dialer.DialContext(ctx, network, address)
  391. }
  392. idleTimeout := config.IdleConnTimeout
  393. if idleTimeout == 0 {
  394. // Historical default value of WriterConfig.IdleTimeout, 9 minutes seems
  395. // like it is way too long when there is no ping mechanism in the kafka
  396. // protocol.
  397. idleTimeout = 9 * time.Minute
  398. }
  399. metadataTTL := config.RebalanceInterval
  400. if metadataTTL == 0 {
  401. // Historical default value of WriterConfig.RebalanceInterval.
  402. metadataTTL = 15 * time.Second
  403. }
  404. transport := &Transport{
  405. Dial: dial,
  406. SASL: kafkaDialer.SASLMechanism,
  407. TLS: kafkaDialer.TLS,
  408. ClientID: kafkaDialer.ClientID,
  409. IdleTimeout: idleTimeout,
  410. MetadataTTL: metadataTTL,
  411. }
  412. w := &Writer{
  413. Addr: TCP(config.Brokers...),
  414. Topic: config.Topic,
  415. MaxAttempts: config.MaxAttempts,
  416. BatchSize: config.BatchSize,
  417. Balancer: config.Balancer,
  418. BatchBytes: int64(config.BatchBytes),
  419. BatchTimeout: config.BatchTimeout,
  420. ReadTimeout: config.ReadTimeout,
  421. WriteTimeout: config.WriteTimeout,
  422. RequiredAcks: RequiredAcks(config.RequiredAcks),
  423. Async: config.Async,
  424. Logger: config.Logger,
  425. ErrorLogger: config.ErrorLogger,
  426. Transport: transport,
  427. transport: transport,
  428. writerStats: stats,
  429. }
  430. if config.RequiredAcks == 0 {
  431. // Historically the writers created by NewWriter have used "all" as the
  432. // default value when 0 was specified.
  433. w.RequiredAcks = RequireAll
  434. }
  435. if config.CompressionCodec != nil {
  436. w.Compression = Compression(config.CompressionCodec.Code())
  437. }
  438. return w
  439. }
  440. // Close flushes pending writes, and waits for all writes to complete before
  441. // returning. Calling Close also prevents new writes from being submitted to
  442. // the writer, further calls to WriteMessages and the like will fail with
  443. // io.ErrClosedPipe.
  444. func (w *Writer) Close() error {
  445. w.markClosed()
  446. // If batches are pending, trigger them so messages get sent.
  447. w.mutex.Lock()
  448. for _, batch := range w.batches {
  449. batch.trigger()
  450. }
  451. for partition := range w.batches {
  452. delete(w.batches, partition)
  453. }
  454. w.mutex.Unlock()
  455. w.group.Wait()
  456. if w.transport != nil {
  457. w.transport.CloseIdleConnections()
  458. }
  459. return nil
  460. }
  461. // WriteMessages writes a batch of messages to the kafka topic configured on this
  462. // writer.
  463. //
  464. // Unless the writer was configured to write messages asynchronously, the method
  465. // blocks until all messages have been written, or until the maximum number of
  466. // attempts was reached.
  467. //
  468. // When sending synchronously and the writer's batch size is configured to be
  469. // greater than 1, this method blocks until either a full batch can be assembled
  470. // or the batch timeout is reached. The batch size and timeouts are evaluated
  471. // per partition, so the choice of Balancer can also influence the flushing
  472. // behavior. For example, the Hash balancer will require on average N * batch
  473. // size messages to trigger a flush where N is the number of partitions. The
  474. // best way to achieve good batching behavior is to share one Writer amongst
  475. // multiple go routines.
  476. //
  477. // When the method returns an error, it may be of type kafka.WriteError to allow
  478. // the caller to determine the status of each message.
  479. //
  480. // The context passed as first argument may also be used to asynchronously
  481. // cancel the operation. Note that in this case there are no guarantees made on
  482. // whether messages were written to kafka. The program should assume that the
  483. // whole batch failed and re-write the messages later (which could then cause
  484. // duplicates).
  485. func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
  486. if w.Addr == nil {
  487. return errors.New("kafka.(*Writer).WriteMessages: cannot create a kafka writer with a nil address")
  488. }
  489. w.group.Add(1)
  490. defer w.group.Done()
  491. if w.isClosed() {
  492. return io.ErrClosedPipe
  493. }
  494. if len(msgs) == 0 {
  495. return nil
  496. }
  497. balancer := w.balancer()
  498. batchBytes := w.batchBytes()
  499. for i := range msgs {
  500. n := int64(msgs[i].size())
  501. if n > batchBytes {
  502. // This error is left for backward compatibility with historical
  503. // behavior, but it can yield O(N^2) behaviors. The expectations
  504. // are that the program will check if WriteMessages returned a
  505. // MessageTooLargeError, discard the message that was exceeding
  506. // the maximum size, and try again.
  507. return messageTooLarge(msgs, i)
  508. }
  509. }
  510. // We use int32 here to half the memory footprint (compared to using int
  511. // on 64 bits architectures). We map lists of the message indexes instead
  512. // of the message values for the same reason, int32 is 4 bytes, vs a full
  513. // Message value which is 100+ bytes and contains pointers and contributes
  514. // to increasing GC work.
  515. assignments := make(map[topicPartition][]int32)
  516. for i, msg := range msgs {
  517. topic, err := w.chooseTopic(msg)
  518. if err != nil {
  519. return err
  520. }
  521. numPartitions, err := w.partitions(ctx, topic)
  522. if err != nil {
  523. return err
  524. }
  525. partition := balancer.Balance(msg, loadCachedPartitions(numPartitions)...)
  526. key := topicPartition{
  527. topic: topic,
  528. partition: int32(partition),
  529. }
  530. assignments[key] = append(assignments[key], int32(i))
  531. }
  532. batches := w.batchMessages(msgs, assignments)
  533. if w.Async {
  534. return nil
  535. }
  536. done := ctx.Done()
  537. hasErrors := false
  538. for batch := range batches {
  539. select {
  540. case <-done:
  541. return ctx.Err()
  542. case <-batch.done:
  543. if batch.err != nil {
  544. hasErrors = true
  545. }
  546. }
  547. }
  548. if !hasErrors {
  549. return nil
  550. }
  551. werr := make(WriteErrors, len(msgs))
  552. for batch, indexes := range batches {
  553. for _, i := range indexes {
  554. werr[i] = batch.err
  555. }
  556. }
  557. return werr
  558. }
  559. func (w *Writer) batchMessages(messages []Message, assignments map[topicPartition][]int32) map[*writeBatch][]int32 {
  560. var batches map[*writeBatch][]int32
  561. if !w.Async {
  562. batches = make(map[*writeBatch][]int32, len(assignments))
  563. }
  564. batchSize := w.batchSize()
  565. batchBytes := w.batchBytes()
  566. w.mutex.Lock()
  567. defer w.mutex.Unlock()
  568. if w.batches == nil {
  569. w.batches = map[topicPartition]*writeBatch{}
  570. }
  571. for key, indexes := range assignments {
  572. for _, i := range indexes {
  573. assignMessage:
  574. batch := w.batches[key]
  575. if batch == nil {
  576. batch = w.newWriteBatch(key)
  577. w.batches[key] = batch
  578. }
  579. if !batch.add(messages[i], batchSize, batchBytes) {
  580. batch.trigger()
  581. delete(w.batches, key)
  582. goto assignMessage
  583. }
  584. if batch.full(batchSize, batchBytes) {
  585. batch.trigger()
  586. delete(w.batches, key)
  587. }
  588. if !w.Async {
  589. batches[batch] = append(batches[batch], i)
  590. }
  591. }
  592. }
  593. return batches
  594. }
  595. func (w *Writer) newWriteBatch(key topicPartition) *writeBatch {
  596. batch := newWriteBatch(time.Now(), w.batchTimeout())
  597. w.group.Add(1)
  598. go func() {
  599. defer w.group.Done()
  600. w.writeBatch(key, batch)
  601. }()
  602. return batch
  603. }
  604. func (w *Writer) writeBatch(key topicPartition, batch *writeBatch) {
  605. // This goroutine has taken ownership of the batch, it is responsible
  606. // for waiting for the batch to be ready (because it became full), or
  607. // to timeout.
  608. select {
  609. case <-batch.timer.C:
  610. // The batch timed out, we want to detach it from the writer to
  611. // prevent more messages from being added.
  612. w.mutex.Lock()
  613. if batch == w.batches[key] {
  614. delete(w.batches, key)
  615. }
  616. w.mutex.Unlock()
  617. case <-batch.ready:
  618. // The batch became full, it was removed from the writer and its
  619. // ready channel was closed. We need to close the timer to avoid
  620. // having it leak until it expires.
  621. batch.timer.Stop()
  622. }
  623. stats := w.stats()
  624. stats.batchTime.observe(int64(time.Since(batch.time)))
  625. stats.batchSize.observe(int64(len(batch.msgs)))
  626. stats.batchSizeBytes.observe(batch.bytes)
  627. var res *ProduceResponse
  628. var err error
  629. for attempt, maxAttempts := 0, w.maxAttempts(); attempt < maxAttempts; attempt++ {
  630. if attempt != 0 {
  631. stats.retries.observe(1)
  632. // TODO: should there be a way to asynchronously cancel this
  633. // operation?
  634. //
  635. // * If all goroutines that added message to this batch have stopped
  636. // waiting for it, should we abort?
  637. //
  638. // * If the writer has been closed? It reduces the durability
  639. // guarantees to abort, but may be better to avoid long wait times
  640. // on close.
  641. //
  642. delay := backoff(attempt, 100*time.Millisecond, 1*time.Second)
  643. w.withLogger(func(log Logger) {
  644. log.Printf("backing off %s writing %d messages to %s (partition: %d)", delay, len(batch.msgs), key.topic, key.partition)
  645. })
  646. time.Sleep(delay)
  647. }
  648. w.withLogger(func(log Logger) {
  649. log.Printf("writing %d messages to %s (partition: %d)", len(batch.msgs), key.topic, key.partition)
  650. })
  651. start := time.Now()
  652. res, err = w.produce(key, batch)
  653. stats.writes.observe(1)
  654. stats.messages.observe(int64(len(batch.msgs)))
  655. stats.bytes.observe(batch.bytes)
  656. // stats.writeTime used to report the duration of WriteMessages, but the
  657. // implementation was broken and reporting values in the nanoseconds
  658. // range. In kafka-go 0.4, we recylced this value to instead report the
  659. // duration of produce requests, and changed the stats.waitTime value to
  660. // report the time that kafka has throttled the requests for.
  661. stats.writeTime.observe(int64(time.Since(start)))
  662. if res != nil {
  663. err = res.Error
  664. stats.waitTime.observe(int64(res.Throttle))
  665. }
  666. if err == nil {
  667. break
  668. }
  669. stats.errors.observe(1)
  670. w.withErrorLogger(func(log Logger) {
  671. log.Printf("error writing messages to %s (partition %d): %s", key.topic, key.partition, err)
  672. })
  673. if !isTemporary(err) {
  674. break
  675. }
  676. }
  677. if res != nil {
  678. for i := range batch.msgs {
  679. m := &batch.msgs[i]
  680. m.Topic = key.topic
  681. m.Partition = int(key.partition)
  682. m.Offset = res.BaseOffset + int64(i)
  683. if m.Time.IsZero() {
  684. m.Time = res.LogAppendTime
  685. }
  686. }
  687. }
  688. if w.Completion != nil {
  689. w.Completion(batch.msgs, err)
  690. }
  691. batch.complete(err)
  692. }
  693. func (w *Writer) produce(key topicPartition, batch *writeBatch) (*ProduceResponse, error) {
  694. timeout := w.writeTimeout()
  695. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  696. defer cancel()
  697. return w.client(timeout).Produce(ctx, &ProduceRequest{
  698. Partition: int(key.partition),
  699. Topic: key.topic,
  700. RequiredAcks: w.RequiredAcks,
  701. Compression: w.Compression,
  702. Records: &writerRecords{
  703. msgs: batch.msgs,
  704. },
  705. })
  706. }
  707. func (w *Writer) partitions(ctx context.Context, topic string) (int, error) {
  708. client := w.client(w.readTimeout())
  709. // Here we use the transport directly as an optimization to avoid the
  710. // construction of temporary request and response objects made by the
  711. // (*Client).Metadata API.
  712. //
  713. // It is expected that the transport will optimize this request by
  714. // caching recent results (the kafka.Transport types does).
  715. r, err := client.transport().RoundTrip(ctx, client.Addr, &metadataAPI.Request{
  716. TopicNames: []string{topic},
  717. })
  718. if err != nil {
  719. return 0, err
  720. }
  721. for _, t := range r.(*metadataAPI.Response).Topics {
  722. if t.Name == topic {
  723. // This should always hit, unless kafka has a bug.
  724. if t.ErrorCode != 0 {
  725. return 0, Error(t.ErrorCode)
  726. }
  727. return len(t.Partitions), nil
  728. }
  729. }
  730. return 0, UnknownTopicOrPartition
  731. }
  732. func (w *Writer) markClosed() {
  733. atomic.StoreUint32(&w.closed, 1)
  734. }
  735. func (w *Writer) isClosed() bool {
  736. return atomic.LoadUint32(&w.closed) != 0
  737. }
  738. func (w *Writer) client(timeout time.Duration) *Client {
  739. return &Client{
  740. Addr: w.Addr,
  741. Transport: w.Transport,
  742. Timeout: timeout,
  743. }
  744. }
  745. func (w *Writer) balancer() Balancer {
  746. if w.Balancer != nil {
  747. return w.Balancer
  748. }
  749. return &w.roundRobin
  750. }
  751. func (w *Writer) maxAttempts() int {
  752. if w.MaxAttempts > 0 {
  753. return w.MaxAttempts
  754. }
  755. // TODO: this is a very high default, if something has failed 9 times it
  756. // seems unlikely it will succeed on the 10th attempt. However, it does
  757. // carry the risk to greatly increase the volume of requests sent to the
  758. // kafka cluster. We should consider reducing this default (3?).
  759. return 10
  760. }
  761. func (w *Writer) batchSize() int {
  762. if w.BatchSize > 0 {
  763. return w.BatchSize
  764. }
  765. return 100
  766. }
  767. func (w *Writer) batchBytes() int64 {
  768. if w.BatchBytes > 0 {
  769. return w.BatchBytes
  770. }
  771. return 1048576
  772. }
  773. func (w *Writer) batchTimeout() time.Duration {
  774. if w.BatchTimeout > 0 {
  775. return w.BatchTimeout
  776. }
  777. return 1 * time.Second
  778. }
  779. func (w *Writer) readTimeout() time.Duration {
  780. if w.ReadTimeout > 0 {
  781. return w.ReadTimeout
  782. }
  783. return 10 * time.Second
  784. }
  785. func (w *Writer) writeTimeout() time.Duration {
  786. if w.WriteTimeout > 0 {
  787. return w.WriteTimeout
  788. }
  789. return 10 * time.Second
  790. }
  791. func (w *Writer) withLogger(do func(Logger)) {
  792. if w.Logger != nil {
  793. do(w.Logger)
  794. }
  795. }
  796. func (w *Writer) withErrorLogger(do func(Logger)) {
  797. if w.ErrorLogger != nil {
  798. do(w.ErrorLogger)
  799. } else {
  800. w.withLogger(do)
  801. }
  802. }
  803. func (w *Writer) stats() *writerStats {
  804. w.once.Do(func() {
  805. // This field is not nil when the writer was constructed with NewWriter
  806. // to share the value with the dial function and count dials.
  807. if w.writerStats == nil {
  808. w.writerStats = new(writerStats)
  809. }
  810. })
  811. return w.writerStats
  812. }
  813. // Stats returns a snapshot of the writer stats since the last time the method
  814. // was called, or since the writer was created if it is called for the first
  815. // time.
  816. //
  817. // A typical use of this method is to spawn a goroutine that will periodically
  818. // call Stats on a kafka writer and report the metrics to a stats collection
  819. // system.
  820. func (w *Writer) Stats() WriterStats {
  821. stats := w.stats()
  822. return WriterStats{
  823. Dials: stats.dials.snapshot(),
  824. Writes: stats.writes.snapshot(),
  825. Messages: stats.messages.snapshot(),
  826. Bytes: stats.bytes.snapshot(),
  827. Errors: stats.errors.snapshot(),
  828. DialTime: stats.dialTime.snapshotDuration(),
  829. BatchTime: stats.batchTime.snapshotDuration(),
  830. WriteTime: stats.writeTime.snapshotDuration(),
  831. WaitTime: stats.waitTime.snapshotDuration(),
  832. Retries: stats.retries.snapshot(),
  833. BatchSize: stats.batchSize.snapshot(),
  834. BatchBytes: stats.batchSizeBytes.snapshot(),
  835. MaxAttempts: int64(w.MaxAttempts),
  836. MaxBatchSize: int64(w.BatchSize),
  837. BatchTimeout: w.BatchTimeout,
  838. ReadTimeout: w.ReadTimeout,
  839. WriteTimeout: w.WriteTimeout,
  840. RequiredAcks: int64(w.RequiredAcks),
  841. Async: w.Async,
  842. Topic: w.Topic,
  843. }
  844. }
  845. func (w *Writer) chooseTopic(msg Message) (string, error) {
  846. // w.Topic and msg.Topic are mutually exclusive, meaning only 1 must be set
  847. // otherwise we will return an error.
  848. if (w.Topic != "" && msg.Topic != "") || (w.Topic == "" && msg.Topic == "") {
  849. return "", InvalidMessage
  850. }
  851. // now we choose the topic, depending on which one is not empty
  852. if msg.Topic != "" {
  853. return msg.Topic, nil
  854. }
  855. return w.Topic, nil
  856. }
  857. type writeBatch struct {
  858. time time.Time
  859. msgs []Message
  860. size int
  861. bytes int64
  862. ready chan struct{}
  863. done chan struct{}
  864. timer *time.Timer
  865. err error // result of the batch completion
  866. }
  867. func newWriteBatch(now time.Time, timeout time.Duration) *writeBatch {
  868. return &writeBatch{
  869. time: now,
  870. ready: make(chan struct{}),
  871. done: make(chan struct{}),
  872. timer: time.NewTimer(timeout),
  873. }
  874. }
  875. func (b *writeBatch) add(msg Message, maxSize int, maxBytes int64) bool {
  876. bytes := int64(msg.size())
  877. if b.size > 0 && (b.bytes+bytes) > maxBytes {
  878. return false
  879. }
  880. if cap(b.msgs) == 0 {
  881. b.msgs = make([]Message, 0, maxSize)
  882. }
  883. b.msgs = append(b.msgs, msg)
  884. b.size++
  885. b.bytes += bytes
  886. return true
  887. }
  888. func (b *writeBatch) full(maxSize int, maxBytes int64) bool {
  889. return b.size >= maxSize || b.bytes >= maxBytes
  890. }
  891. func (b *writeBatch) trigger() {
  892. close(b.ready)
  893. }
  894. func (b *writeBatch) complete(err error) {
  895. b.err = err
  896. close(b.done)
  897. }
  898. type writerRecords struct {
  899. msgs []Message
  900. index int
  901. record Record
  902. key bytesReadCloser
  903. value bytesReadCloser
  904. }
  905. func (r *writerRecords) ReadRecord() (*Record, error) {
  906. if r.index >= 0 && r.index < len(r.msgs) {
  907. m := &r.msgs[r.index]
  908. r.index++
  909. r.record = Record{
  910. Time: m.Time,
  911. Headers: m.Headers,
  912. }
  913. if m.Key != nil {
  914. r.key.Reset(m.Key)
  915. r.record.Key = &r.key
  916. }
  917. if m.Value != nil {
  918. r.value.Reset(m.Value)
  919. r.record.Value = &r.value
  920. }
  921. return &r.record, nil
  922. }
  923. return nil, io.EOF
  924. }
  925. type bytesReadCloser struct{ bytes.Reader }
  926. func (*bytesReadCloser) Close() error { return nil }
  927. // A cache of []int values passed to balancers of writers, used to amortize the
  928. // heap allocation of the partition index lists.
  929. //
  930. // With hindsight, the use of `...int` to pass the partition list to Balancers
  931. // was not the best design choice: kafka partition numbers are monotonically
  932. // increasing, we could have simply passed the number of partitions instead.
  933. // If we ever revisit this API, we can hopefully remove this cache.
  934. var partitionsCache atomic.Value
  935. func loadCachedPartitions(numPartitions int) []int {
  936. partitions, ok := partitionsCache.Load().([]int)
  937. if ok && len(partitions) >= numPartitions {
  938. return partitions[:numPartitions]
  939. }
  940. const alignment = 128
  941. n := ((numPartitions / alignment) + 1) * alignment
  942. partitions = make([]int, n)
  943. for i := range partitions {
  944. partitions[i] = i
  945. }
  946. partitionsCache.Store(partitions)
  947. return partitions[:numPartitions]
  948. }