123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322 |
- package kafka
- import (
- "bufio"
- "context"
- "errors"
- "fmt"
- "net"
- "time"
- "github.com/segmentio/kafka-go/protocol"
- produceAPI "github.com/segmentio/kafka-go/protocol/produce"
- )
- type RequiredAcks int
- const (
- RequireNone RequiredAcks = 0
- RequireOne RequiredAcks = 1
- RequireAll RequiredAcks = -1
- )
- func (acks RequiredAcks) String() string {
- switch acks {
- case RequireNone:
- return "none"
- case RequireOne:
- return "one"
- case RequireAll:
- return "all"
- default:
- return "unknown"
- }
- }
- // ProduceRequest represents a request sent to a kafka broker to produce records
- // to a topic partition.
- type ProduceRequest struct {
- // Address of the kafka broker to send the request to.
- Addr net.Addr
- // The topic to produce the records to.
- Topic string
- // The partition to produce the records to.
- Partition int
- // The level of required acknowledgements to ask the kafka broker for.
- RequiredAcks RequiredAcks
- // The message format version used when encoding the records.
- //
- // By default, the client automatically determine which version should be
- // used based on the version of the Produce API supported by the server.
- MessageVersion int
- // An optional transaction id when producing to the kafka broker is part of
- // a transaction.
- TransactionalID string
- // The sequence of records to produce to the topic partition.
- Records RecordReader
- // An optional compression algorithm to apply to the batch of records sent
- // to the kafka broker.
- Compression Compression
- }
- // ProduceResponse represents a response from a kafka broker to a produce
- // request.
- type ProduceResponse struct {
- // The amount of time that the broker throttled the request.
- Throttle time.Duration
- // An error that may have occurred while attempting to produce the records.
- //
- // The error contains both the kafka error code, and an error message
- // returned by the kafka broker. Programs may use the standard errors.Is
- // function to test the error against kafka error codes.
- Error error
- // Offset of the first record that was written to the topic partition.
- //
- // This field will be zero if the kafka broker did no support the Produce
- // API in version 3 or above.
- BaseOffset int64
- // Time at which the broker wrote the records to the topic partition.
- //
- // This field will be zero if the kafka broker did no support the Produce
- // API in version 2 or above.
- LogAppendTime time.Time
- // First offset in the topic partition that the records were written to.
- //
- // This field will be zero if the kafka broker did no support the Produce
- // API in version 5 or above (or if the first offset is zero).
- LogStartOffset int64
- // If errors occurred writing specific records, they will be reported in
- // this map.
- //
- // This field will always be empty if the kafka broker did no support the
- // Produce API in version 8 or above.
- RecordErrors map[int]error
- }
- // Produce sends a produce request to a kafka broker and returns the response.
- //
- // If the request contained no records, an error wrapping protocol.ErrNoRecord
- // is returned.
- //
- // When the request is configured with RequiredAcks=none, both the response and
- // the error will be nil on success.
- func (c *Client) Produce(ctx context.Context, req *ProduceRequest) (*ProduceResponse, error) {
- attributes := protocol.Attributes(req.Compression) & 0x7
- m, err := c.roundTrip(ctx, req.Addr, &produceAPI.Request{
- TransactionalID: req.TransactionalID,
- Acks: int16(req.RequiredAcks),
- Timeout: c.timeoutMs(ctx, defaultProduceTimeout),
- Topics: []produceAPI.RequestTopic{{
- Topic: req.Topic,
- Partitions: []produceAPI.RequestPartition{{
- Partition: int32(req.Partition),
- RecordSet: protocol.RecordSet{
- Attributes: attributes,
- Records: req.Records,
- },
- }},
- }},
- })
- switch {
- case err == nil:
- case errors.Is(err, protocol.ErrNoRecord):
- return new(ProduceResponse), nil
- default:
- return nil, fmt.Errorf("kafka.(*Client).Produce: %w", err)
- }
- if req.RequiredAcks == RequireNone {
- return nil, nil
- }
- res := m.(*produceAPI.Response)
- if len(res.Topics) == 0 {
- return nil, fmt.Errorf("kafka.(*Client).Produce: %w", protocol.ErrNoTopic)
- }
- topic := &res.Topics[0]
- if len(topic.Partitions) == 0 {
- return nil, fmt.Errorf("kafka.(*Client).Produce: %w", protocol.ErrNoPartition)
- }
- partition := &topic.Partitions[0]
- ret := &ProduceResponse{
- Throttle: makeDuration(res.ThrottleTimeMs),
- Error: makeError(partition.ErrorCode, partition.ErrorMessage),
- BaseOffset: partition.BaseOffset,
- LogAppendTime: makeTime(partition.LogAppendTime),
- LogStartOffset: partition.LogStartOffset,
- }
- if len(partition.RecordErrors) != 0 {
- ret.RecordErrors = make(map[int]error, len(partition.RecordErrors))
- for _, recErr := range partition.RecordErrors {
- ret.RecordErrors[int(recErr.BatchIndex)] = errors.New(recErr.BatchIndexErrorMessage)
- }
- }
- return ret, nil
- }
- type produceRequestV2 struct {
- RequiredAcks int16
- Timeout int32
- Topics []produceRequestTopicV2
- }
- func (r produceRequestV2) size() int32 {
- return 2 + 4 + sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
- }
- func (r produceRequestV2) writeTo(wb *writeBuffer) {
- wb.writeInt16(r.RequiredAcks)
- wb.writeInt32(r.Timeout)
- wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
- }
- type produceRequestTopicV2 struct {
- TopicName string
- Partitions []produceRequestPartitionV2
- }
- func (t produceRequestTopicV2) size() int32 {
- return sizeofString(t.TopicName) +
- sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
- }
- func (t produceRequestTopicV2) writeTo(wb *writeBuffer) {
- wb.writeString(t.TopicName)
- wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
- }
- type produceRequestPartitionV2 struct {
- Partition int32
- MessageSetSize int32
- MessageSet messageSet
- }
- func (p produceRequestPartitionV2) size() int32 {
- return 4 + 4 + p.MessageSet.size()
- }
- func (p produceRequestPartitionV2) writeTo(wb *writeBuffer) {
- wb.writeInt32(p.Partition)
- wb.writeInt32(p.MessageSetSize)
- p.MessageSet.writeTo(wb)
- }
- type produceResponseV2 struct {
- ThrottleTime int32
- Topics []produceResponseTopicV2
- }
- func (r produceResponseV2) size() int32 {
- return 4 + sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
- }
- func (r produceResponseV2) writeTo(wb *writeBuffer) {
- wb.writeInt32(r.ThrottleTime)
- wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
- }
- type produceResponseTopicV2 struct {
- TopicName string
- Partitions []produceResponsePartitionV2
- }
- func (t produceResponseTopicV2) size() int32 {
- return sizeofString(t.TopicName) +
- sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
- }
- func (t produceResponseTopicV2) writeTo(wb *writeBuffer) {
- wb.writeString(t.TopicName)
- wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
- }
- type produceResponsePartitionV2 struct {
- Partition int32
- ErrorCode int16
- Offset int64
- Timestamp int64
- }
- func (p produceResponsePartitionV2) size() int32 {
- return 4 + 2 + 8 + 8
- }
- func (p produceResponsePartitionV2) writeTo(wb *writeBuffer) {
- wb.writeInt32(p.Partition)
- wb.writeInt16(p.ErrorCode)
- wb.writeInt64(p.Offset)
- wb.writeInt64(p.Timestamp)
- }
- func (p *produceResponsePartitionV2) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
- if remain, err = readInt32(r, sz, &p.Partition); err != nil {
- return
- }
- if remain, err = readInt16(r, remain, &p.ErrorCode); err != nil {
- return
- }
- if remain, err = readInt64(r, remain, &p.Offset); err != nil {
- return
- }
- if remain, err = readInt64(r, remain, &p.Timestamp); err != nil {
- return
- }
- return
- }
- type produceResponsePartitionV7 struct {
- Partition int32
- ErrorCode int16
- Offset int64
- Timestamp int64
- StartOffset int64
- }
- func (p produceResponsePartitionV7) size() int32 {
- return 4 + 2 + 8 + 8 + 8
- }
- func (p produceResponsePartitionV7) writeTo(wb *writeBuffer) {
- wb.writeInt32(p.Partition)
- wb.writeInt16(p.ErrorCode)
- wb.writeInt64(p.Offset)
- wb.writeInt64(p.Timestamp)
- wb.writeInt64(p.StartOffset)
- }
- func (p *produceResponsePartitionV7) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
- if remain, err = readInt32(r, sz, &p.Partition); err != nil {
- return
- }
- if remain, err = readInt16(r, remain, &p.ErrorCode); err != nil {
- return
- }
- if remain, err = readInt64(r, remain, &p.Offset); err != nil {
- return
- }
- if remain, err = readInt64(r, remain, &p.Timestamp); err != nil {
- return
- }
- if remain, err = readInt64(r, remain, &p.StartOffset); err != nil {
- return
- }
- return
- }
|