123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098 |
- package kafka
- import (
- "bytes"
- "context"
- "errors"
- "io"
- "net"
- "sync"
- "sync/atomic"
- "time"
- metadataAPI "github.com/segmentio/kafka-go/protocol/metadata"
- )
- type Writer struct {
-
-
-
-
-
- Addr net.Addr
-
-
-
-
-
- Topic string
-
-
-
- Balancer Balancer
-
-
-
- MaxAttempts int
-
-
-
-
- BatchSize int
-
-
-
-
- BatchBytes int64
-
-
-
-
- BatchTimeout time.Duration
-
-
-
- ReadTimeout time.Duration
-
-
-
- WriteTimeout time.Duration
-
-
-
-
-
-
-
- RequiredAcks RequiredAcks
-
-
-
-
- Async bool
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- Completion func(messages []Message, err error)
-
- Compression Compression
-
-
- Logger Logger
-
-
- ErrorLogger Logger
-
-
-
- Transport RoundTripper
-
- closed uint32
- group sync.WaitGroup
-
- mutex sync.Mutex
- batches map[topicPartition]*writeBatch
-
-
-
-
- once sync.Once
- *writerStats
-
-
-
- roundRobin RoundRobin
-
- transport *Transport
- }
- type WriterConfig struct {
-
-
-
-
-
- Brokers []string
-
-
-
-
-
- Topic string
-
-
-
-
- Dialer *Dialer
-
-
-
- Balancer Balancer
-
-
-
- MaxAttempts int
-
-
-
-
- QueueCapacity int
-
-
-
-
- BatchSize int
-
-
-
-
- BatchBytes int
-
-
-
-
- BatchTimeout time.Duration
-
-
-
- ReadTimeout time.Duration
-
-
-
- WriteTimeout time.Duration
-
-
-
-
- RebalanceInterval time.Duration
-
-
-
- IdleConnTimeout time.Duration
-
-
-
-
-
-
-
-
- RequiredAcks int
-
-
-
-
- Async bool
-
- CompressionCodec
-
-
- Logger Logger
-
-
- ErrorLogger Logger
- }
- type topicPartition struct {
- topic string
- partition int32
- }
- func (config *WriterConfig) Validate() error {
- if len(config.Brokers) == 0 {
- return errors.New("cannot create a kafka writer with an empty list of brokers")
- }
- return nil
- }
- type WriterStats struct {
- Writes int64 `metric:"kafka.writer.write.count" type:"counter"`
- Messages int64 `metric:"kafka.writer.message.count" type:"counter"`
- Bytes int64 `metric:"kafka.writer.message.bytes" type:"counter"`
- Errors int64 `metric:"kafka.writer.error.count" type:"counter"`
- BatchTime DurationStats `metric:"kafka.writer.batch.seconds"`
- WriteTime DurationStats `metric:"kafka.writer.write.seconds"`
- WaitTime DurationStats `metric:"kafka.writer.wait.seconds"`
- Retries SummaryStats `metric:"kafka.writer.retries.count"`
- BatchSize SummaryStats `metric:"kafka.writer.batch.size"`
- BatchBytes SummaryStats `metric:"kafka.writer.batch.bytes"`
- MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"`
- MaxBatchSize int64 `metric:"kafka.writer.batch.max" type:"gauge"`
- BatchTimeout time.Duration `metric:"kafka.writer.batch.timeout" type:"gauge"`
- ReadTimeout time.Duration `metric:"kafka.writer.read.timeout" type:"gauge"`
- WriteTimeout time.Duration `metric:"kafka.writer.write.timeout" type:"gauge"`
- RequiredAcks int64 `metric:"kafka.writer.acks.required" type:"gauge"`
- Async bool `metric:"kafka.writer.async" type:"gauge"`
- Topic string `tag:"topic"`
-
-
- Dials int64 `metric:"kafka.writer.dial.count" type:"counter"`
- DialTime DurationStats `metric:"kafka.writer.dial.seconds"`
-
-
-
-
-
-
- Rebalances int64
- RebalanceInterval time.Duration
- QueueLength int64
- QueueCapacity int64
- ClientID string
- }
- type writerStats struct {
- dials counter
- writes counter
- messages counter
- bytes counter
- errors counter
- dialTime summary
- batchTime summary
- writeTime summary
- waitTime summary
- retries summary
- batchSize summary
- batchSizeBytes summary
- }
- func NewWriter(config WriterConfig) *Writer {
- if err := config.Validate(); err != nil {
- panic(err)
- }
- if config.Dialer == nil {
- config.Dialer = DefaultDialer
- }
- if config.Balancer == nil {
- config.Balancer = &RoundRobin{}
- }
-
- kafkaDialer := DefaultDialer
- if config.Dialer != nil {
- kafkaDialer = config.Dialer
- }
- dialer := (&net.Dialer{
- Timeout: kafkaDialer.Timeout,
- Deadline: kafkaDialer.Deadline,
- LocalAddr: kafkaDialer.LocalAddr,
- DualStack: kafkaDialer.DualStack,
- FallbackDelay: kafkaDialer.FallbackDelay,
- KeepAlive: kafkaDialer.KeepAlive,
- })
- var resolver Resolver
- if r, ok := kafkaDialer.Resolver.(*net.Resolver); ok {
- dialer.Resolver = r
- } else {
- resolver = kafkaDialer.Resolver
- }
- stats := new(writerStats)
-
-
- dial := func(ctx context.Context, network, addr string) (net.Conn, error) {
- start := time.Now()
- defer func() {
- stats.dials.observe(1)
- stats.dialTime.observe(int64(time.Since(start)))
- }()
- address, err := lookupHost(ctx, addr, resolver)
- if err != nil {
- return nil, err
- }
- return dialer.DialContext(ctx, network, address)
- }
- idleTimeout := config.IdleConnTimeout
- if idleTimeout == 0 {
-
-
-
- idleTimeout = 9 * time.Minute
- }
- metadataTTL := config.RebalanceInterval
- if metadataTTL == 0 {
-
- metadataTTL = 15 * time.Second
- }
- transport := &Transport{
- Dial: dial,
- SASL: kafkaDialer.SASLMechanism,
- TLS: kafkaDialer.TLS,
- ClientID: kafkaDialer.ClientID,
- IdleTimeout: idleTimeout,
- MetadataTTL: metadataTTL,
- }
- w := &Writer{
- Addr: TCP(config.Brokers...),
- Topic: config.Topic,
- MaxAttempts: config.MaxAttempts,
- BatchSize: config.BatchSize,
- Balancer: config.Balancer,
- BatchBytes: int64(config.BatchBytes),
- BatchTimeout: config.BatchTimeout,
- ReadTimeout: config.ReadTimeout,
- WriteTimeout: config.WriteTimeout,
- RequiredAcks: RequiredAcks(config.RequiredAcks),
- Async: config.Async,
- Logger: config.Logger,
- ErrorLogger: config.ErrorLogger,
- Transport: transport,
- transport: transport,
- writerStats: stats,
- }
- if config.RequiredAcks == 0 {
-
-
- w.RequiredAcks = RequireAll
- }
- if config.CompressionCodec != nil {
- w.Compression = Compression(config.CompressionCodec.Code())
- }
- return w
- }
- func (w *Writer) Close() error {
- w.markClosed()
-
- w.mutex.Lock()
- for _, batch := range w.batches {
- batch.trigger()
- }
- for partition := range w.batches {
- delete(w.batches, partition)
- }
- w.mutex.Unlock()
- w.group.Wait()
- if w.transport != nil {
- w.transport.CloseIdleConnections()
- }
- return nil
- }
- func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
- if w.Addr == nil {
- return errors.New("kafka.(*Writer).WriteMessages: cannot create a kafka writer with a nil address")
- }
- w.group.Add(1)
- defer w.group.Done()
- if w.isClosed() {
- return io.ErrClosedPipe
- }
- if len(msgs) == 0 {
- return nil
- }
- balancer := w.balancer()
- batchBytes := w.batchBytes()
- for i := range msgs {
- n := int64(msgs[i].size())
- if n > batchBytes {
-
-
-
-
-
- return messageTooLarge(msgs, i)
- }
- }
-
-
-
-
-
- assignments := make(map[topicPartition][]int32)
- for i, msg := range msgs {
- topic, err := w.chooseTopic(msg)
- if err != nil {
- return err
- }
- numPartitions, err := w.partitions(ctx, topic)
- if err != nil {
- return err
- }
- partition := balancer.Balance(msg, loadCachedPartitions(numPartitions)...)
- key := topicPartition{
- topic: topic,
- partition: int32(partition),
- }
- assignments[key] = append(assignments[key], int32(i))
- }
- batches := w.batchMessages(msgs, assignments)
- if w.Async {
- return nil
- }
- done := ctx.Done()
- hasErrors := false
- for batch := range batches {
- select {
- case <-done:
- return ctx.Err()
- case <-batch.done:
- if batch.err != nil {
- hasErrors = true
- }
- }
- }
- if !hasErrors {
- return nil
- }
- werr := make(WriteErrors, len(msgs))
- for batch, indexes := range batches {
- for _, i := range indexes {
- werr[i] = batch.err
- }
- }
- return werr
- }
- func (w *Writer) batchMessages(messages []Message, assignments map[topicPartition][]int32) map[*writeBatch][]int32 {
- var batches map[*writeBatch][]int32
- if !w.Async {
- batches = make(map[*writeBatch][]int32, len(assignments))
- }
- batchSize := w.batchSize()
- batchBytes := w.batchBytes()
- w.mutex.Lock()
- defer w.mutex.Unlock()
- if w.batches == nil {
- w.batches = map[topicPartition]*writeBatch{}
- }
- for key, indexes := range assignments {
- for _, i := range indexes {
- assignMessage:
- batch := w.batches[key]
- if batch == nil {
- batch = w.newWriteBatch(key)
- w.batches[key] = batch
- }
- if !batch.add(messages[i], batchSize, batchBytes) {
- batch.trigger()
- delete(w.batches, key)
- goto assignMessage
- }
- if batch.full(batchSize, batchBytes) {
- batch.trigger()
- delete(w.batches, key)
- }
- if !w.Async {
- batches[batch] = append(batches[batch], i)
- }
- }
- }
- return batches
- }
- func (w *Writer) newWriteBatch(key topicPartition) *writeBatch {
- batch := newWriteBatch(time.Now(), w.batchTimeout())
- w.group.Add(1)
- go func() {
- defer w.group.Done()
- w.writeBatch(key, batch)
- }()
- return batch
- }
- func (w *Writer) writeBatch(key topicPartition, batch *writeBatch) {
-
-
-
- select {
- case <-batch.timer.C:
-
-
- w.mutex.Lock()
- if batch == w.batches[key] {
- delete(w.batches, key)
- }
- w.mutex.Unlock()
- case <-batch.ready:
-
-
-
- batch.timer.Stop()
- }
- stats := w.stats()
- stats.batchTime.observe(int64(time.Since(batch.time)))
- stats.batchSize.observe(int64(len(batch.msgs)))
- stats.batchSizeBytes.observe(batch.bytes)
- var res *ProduceResponse
- var err error
- for attempt, maxAttempts := 0, w.maxAttempts(); attempt < maxAttempts; attempt++ {
- if attempt != 0 {
- stats.retries.observe(1)
-
-
-
-
-
-
-
-
-
-
- delay := backoff(attempt, 100*time.Millisecond, 1*time.Second)
- w.withLogger(func(log Logger) {
- log.Printf("backing off %s writing %d messages to %s (partition: %d)", delay, len(batch.msgs), key.topic, key.partition)
- })
- time.Sleep(delay)
- }
- w.withLogger(func(log Logger) {
- log.Printf("writing %d messages to %s (partition: %d)", len(batch.msgs), key.topic, key.partition)
- })
- start := time.Now()
- res, err = w.produce(key, batch)
- stats.writes.observe(1)
- stats.messages.observe(int64(len(batch.msgs)))
- stats.bytes.observe(batch.bytes)
-
-
-
-
-
- stats.writeTime.observe(int64(time.Since(start)))
- if res != nil {
- err = res.Error
- stats.waitTime.observe(int64(res.Throttle))
- }
- if err == nil {
- break
- }
- stats.errors.observe(1)
- w.withErrorLogger(func(log Logger) {
- log.Printf("error writing messages to %s (partition %d): %s", key.topic, key.partition, err)
- })
- if !isTemporary(err) {
- break
- }
- }
- if res != nil {
- for i := range batch.msgs {
- m := &batch.msgs[i]
- m.Topic = key.topic
- m.Partition = int(key.partition)
- m.Offset = res.BaseOffset + int64(i)
- if m.Time.IsZero() {
- m.Time = res.LogAppendTime
- }
- }
- }
- if w.Completion != nil {
- w.Completion(batch.msgs, err)
- }
- batch.complete(err)
- }
- func (w *Writer) produce(key topicPartition, batch *writeBatch) (*ProduceResponse, error) {
- timeout := w.writeTimeout()
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- return w.client(timeout).Produce(ctx, &ProduceRequest{
- Partition: int(key.partition),
- Topic: key.topic,
- RequiredAcks: w.RequiredAcks,
- Compression: w.Compression,
- Records: &writerRecords{
- msgs: batch.msgs,
- },
- })
- }
- func (w *Writer) partitions(ctx context.Context, topic string) (int, error) {
- client := w.client(w.readTimeout())
-
-
-
-
-
-
- r, err := client.transport().RoundTrip(ctx, client.Addr, &metadataAPI.Request{
- TopicNames: []string{topic},
- })
- if err != nil {
- return 0, err
- }
- for _, t := range r.(*metadataAPI.Response).Topics {
- if t.Name == topic {
-
- if t.ErrorCode != 0 {
- return 0, Error(t.ErrorCode)
- }
- return len(t.Partitions), nil
- }
- }
- return 0, UnknownTopicOrPartition
- }
- func (w *Writer) markClosed() {
- atomic.StoreUint32(&w.closed, 1)
- }
- func (w *Writer) isClosed() bool {
- return atomic.LoadUint32(&w.closed) != 0
- }
- func (w *Writer) client(timeout time.Duration) *Client {
- return &Client{
- Addr: w.Addr,
- Transport: w.Transport,
- Timeout: timeout,
- }
- }
- func (w *Writer) balancer() Balancer {
- if w.Balancer != nil {
- return w.Balancer
- }
- return &w.roundRobin
- }
- func (w *Writer) maxAttempts() int {
- if w.MaxAttempts > 0 {
- return w.MaxAttempts
- }
-
-
-
-
- return 10
- }
- func (w *Writer) batchSize() int {
- if w.BatchSize > 0 {
- return w.BatchSize
- }
- return 100
- }
- func (w *Writer) batchBytes() int64 {
- if w.BatchBytes > 0 {
- return w.BatchBytes
- }
- return 1048576
- }
- func (w *Writer) batchTimeout() time.Duration {
- if w.BatchTimeout > 0 {
- return w.BatchTimeout
- }
- return 1 * time.Second
- }
- func (w *Writer) readTimeout() time.Duration {
- if w.ReadTimeout > 0 {
- return w.ReadTimeout
- }
- return 10 * time.Second
- }
- func (w *Writer) writeTimeout() time.Duration {
- if w.WriteTimeout > 0 {
- return w.WriteTimeout
- }
- return 10 * time.Second
- }
- func (w *Writer) withLogger(do func(Logger)) {
- if w.Logger != nil {
- do(w.Logger)
- }
- }
- func (w *Writer) withErrorLogger(do func(Logger)) {
- if w.ErrorLogger != nil {
- do(w.ErrorLogger)
- } else {
- w.withLogger(do)
- }
- }
- func (w *Writer) stats() *writerStats {
- w.once.Do(func() {
-
-
- if w.writerStats == nil {
- w.writerStats = new(writerStats)
- }
- })
- return w.writerStats
- }
- func (w *Writer) Stats() WriterStats {
- stats := w.stats()
- return WriterStats{
- Dials: stats.dials.snapshot(),
- Writes: stats.writes.snapshot(),
- Messages: stats.messages.snapshot(),
- Bytes: stats.bytes.snapshot(),
- Errors: stats.errors.snapshot(),
- DialTime: stats.dialTime.snapshotDuration(),
- BatchTime: stats.batchTime.snapshotDuration(),
- WriteTime: stats.writeTime.snapshotDuration(),
- WaitTime: stats.waitTime.snapshotDuration(),
- Retries: stats.retries.snapshot(),
- BatchSize: stats.batchSize.snapshot(),
- BatchBytes: stats.batchSizeBytes.snapshot(),
- MaxAttempts: int64(w.MaxAttempts),
- MaxBatchSize: int64(w.BatchSize),
- BatchTimeout: w.BatchTimeout,
- ReadTimeout: w.ReadTimeout,
- WriteTimeout: w.WriteTimeout,
- RequiredAcks: int64(w.RequiredAcks),
- Async: w.Async,
- Topic: w.Topic,
- }
- }
- func (w *Writer) chooseTopic(msg Message) (string, error) {
-
-
- if (w.Topic != "" && msg.Topic != "") || (w.Topic == "" && msg.Topic == "") {
- return "", InvalidMessage
- }
-
- if msg.Topic != "" {
- return msg.Topic, nil
- }
- return w.Topic, nil
- }
- type writeBatch struct {
- time time.Time
- msgs []Message
- size int
- bytes int64
- ready chan struct{}
- done chan struct{}
- timer *time.Timer
- err error
- }
- func newWriteBatch(now time.Time, timeout time.Duration) *writeBatch {
- return &writeBatch{
- time: now,
- ready: make(chan struct{}),
- done: make(chan struct{}),
- timer: time.NewTimer(timeout),
- }
- }
- func (b *writeBatch) add(msg Message, maxSize int, maxBytes int64) bool {
- bytes := int64(msg.size())
- if b.size > 0 && (b.bytes+bytes) > maxBytes {
- return false
- }
- if cap(b.msgs) == 0 {
- b.msgs = make([]Message, 0, maxSize)
- }
- b.msgs = append(b.msgs, msg)
- b.size++
- b.bytes += bytes
- return true
- }
- func (b *writeBatch) full(maxSize int, maxBytes int64) bool {
- return b.size >= maxSize || b.bytes >= maxBytes
- }
- func (b *writeBatch) trigger() {
- close(b.ready)
- }
- func (b *writeBatch) complete(err error) {
- b.err = err
- close(b.done)
- }
- type writerRecords struct {
- msgs []Message
- index int
- record Record
- key bytesReadCloser
- value bytesReadCloser
- }
- func (r *writerRecords) ReadRecord() (*Record, error) {
- if r.index >= 0 && r.index < len(r.msgs) {
- m := &r.msgs[r.index]
- r.index++
- r.record = Record{
- Time: m.Time,
- Headers: m.Headers,
- }
- if m.Key != nil {
- r.key.Reset(m.Key)
- r.record.Key = &r.key
- }
- if m.Value != nil {
- r.value.Reset(m.Value)
- r.record.Value = &r.value
- }
- return &r.record, nil
- }
- return nil, io.EOF
- }
- type bytesReadCloser struct{ bytes.Reader }
- func (*bytesReadCloser) Close() error { return nil }
- var partitionsCache atomic.Value
- func loadCachedPartitions(numPartitions int) []int {
- partitions, ok := partitionsCache.Load().([]int)
- if ok && len(partitions) >= numPartitions {
- return partitions[:numPartitions]
- }
- const alignment = 128
- n := ((numPartitions / alignment) + 1) * alignment
- partitions = make([]int, n)
- for i := range partitions {
- partitions[i] = i
- }
- partitionsCache.Store(partitions)
- return partitions[:numPartitions]
- }
|