produce.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. package kafka
  2. import (
  3. "bufio"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "time"
  9. "github.com/segmentio/kafka-go/protocol"
  10. produceAPI "github.com/segmentio/kafka-go/protocol/produce"
  11. )
  12. type RequiredAcks int
  13. const (
  14. RequireNone RequiredAcks = 0
  15. RequireOne RequiredAcks = 1
  16. RequireAll RequiredAcks = -1
  17. )
  18. func (acks RequiredAcks) String() string {
  19. switch acks {
  20. case RequireNone:
  21. return "none"
  22. case RequireOne:
  23. return "one"
  24. case RequireAll:
  25. return "all"
  26. default:
  27. return "unknown"
  28. }
  29. }
  30. // ProduceRequest represents a request sent to a kafka broker to produce records
  31. // to a topic partition.
  32. type ProduceRequest struct {
  33. // Address of the kafka broker to send the request to.
  34. Addr net.Addr
  35. // The topic to produce the records to.
  36. Topic string
  37. // The partition to produce the records to.
  38. Partition int
  39. // The level of required acknowledgements to ask the kafka broker for.
  40. RequiredAcks RequiredAcks
  41. // The message format version used when encoding the records.
  42. //
  43. // By default, the client automatically determine which version should be
  44. // used based on the version of the Produce API supported by the server.
  45. MessageVersion int
  46. // An optional transaction id when producing to the kafka broker is part of
  47. // a transaction.
  48. TransactionalID string
  49. // The sequence of records to produce to the topic partition.
  50. Records RecordReader
  51. // An optional compression algorithm to apply to the batch of records sent
  52. // to the kafka broker.
  53. Compression Compression
  54. }
  55. // ProduceResponse represents a response from a kafka broker to a produce
  56. // request.
  57. type ProduceResponse struct {
  58. // The amount of time that the broker throttled the request.
  59. Throttle time.Duration
  60. // An error that may have occurred while attempting to produce the records.
  61. //
  62. // The error contains both the kafka error code, and an error message
  63. // returned by the kafka broker. Programs may use the standard errors.Is
  64. // function to test the error against kafka error codes.
  65. Error error
  66. // Offset of the first record that was written to the topic partition.
  67. //
  68. // This field will be zero if the kafka broker did no support the Produce
  69. // API in version 3 or above.
  70. BaseOffset int64
  71. // Time at which the broker wrote the records to the topic partition.
  72. //
  73. // This field will be zero if the kafka broker did no support the Produce
  74. // API in version 2 or above.
  75. LogAppendTime time.Time
  76. // First offset in the topic partition that the records were written to.
  77. //
  78. // This field will be zero if the kafka broker did no support the Produce
  79. // API in version 5 or above (or if the first offset is zero).
  80. LogStartOffset int64
  81. // If errors occurred writing specific records, they will be reported in
  82. // this map.
  83. //
  84. // This field will always be empty if the kafka broker did no support the
  85. // Produce API in version 8 or above.
  86. RecordErrors map[int]error
  87. }
  88. // Produce sends a produce request to a kafka broker and returns the response.
  89. //
  90. // If the request contained no records, an error wrapping protocol.ErrNoRecord
  91. // is returned.
  92. //
  93. // When the request is configured with RequiredAcks=none, both the response and
  94. // the error will be nil on success.
  95. func (c *Client) Produce(ctx context.Context, req *ProduceRequest) (*ProduceResponse, error) {
  96. attributes := protocol.Attributes(req.Compression) & 0x7
  97. m, err := c.roundTrip(ctx, req.Addr, &produceAPI.Request{
  98. TransactionalID: req.TransactionalID,
  99. Acks: int16(req.RequiredAcks),
  100. Timeout: c.timeoutMs(ctx, defaultProduceTimeout),
  101. Topics: []produceAPI.RequestTopic{{
  102. Topic: req.Topic,
  103. Partitions: []produceAPI.RequestPartition{{
  104. Partition: int32(req.Partition),
  105. RecordSet: protocol.RecordSet{
  106. Attributes: attributes,
  107. Records: req.Records,
  108. },
  109. }},
  110. }},
  111. })
  112. switch {
  113. case err == nil:
  114. case errors.Is(err, protocol.ErrNoRecord):
  115. return new(ProduceResponse), nil
  116. default:
  117. return nil, fmt.Errorf("kafka.(*Client).Produce: %w", err)
  118. }
  119. if req.RequiredAcks == RequireNone {
  120. return nil, nil
  121. }
  122. res := m.(*produceAPI.Response)
  123. if len(res.Topics) == 0 {
  124. return nil, fmt.Errorf("kafka.(*Client).Produce: %w", protocol.ErrNoTopic)
  125. }
  126. topic := &res.Topics[0]
  127. if len(topic.Partitions) == 0 {
  128. return nil, fmt.Errorf("kafka.(*Client).Produce: %w", protocol.ErrNoPartition)
  129. }
  130. partition := &topic.Partitions[0]
  131. ret := &ProduceResponse{
  132. Throttle: makeDuration(res.ThrottleTimeMs),
  133. Error: makeError(partition.ErrorCode, partition.ErrorMessage),
  134. BaseOffset: partition.BaseOffset,
  135. LogAppendTime: makeTime(partition.LogAppendTime),
  136. LogStartOffset: partition.LogStartOffset,
  137. }
  138. if len(partition.RecordErrors) != 0 {
  139. ret.RecordErrors = make(map[int]error, len(partition.RecordErrors))
  140. for _, recErr := range partition.RecordErrors {
  141. ret.RecordErrors[int(recErr.BatchIndex)] = errors.New(recErr.BatchIndexErrorMessage)
  142. }
  143. }
  144. return ret, nil
  145. }
  146. type produceRequestV2 struct {
  147. RequiredAcks int16
  148. Timeout int32
  149. Topics []produceRequestTopicV2
  150. }
  151. func (r produceRequestV2) size() int32 {
  152. return 2 + 4 + sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
  153. }
  154. func (r produceRequestV2) writeTo(wb *writeBuffer) {
  155. wb.writeInt16(r.RequiredAcks)
  156. wb.writeInt32(r.Timeout)
  157. wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
  158. }
  159. type produceRequestTopicV2 struct {
  160. TopicName string
  161. Partitions []produceRequestPartitionV2
  162. }
  163. func (t produceRequestTopicV2) size() int32 {
  164. return sizeofString(t.TopicName) +
  165. sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
  166. }
  167. func (t produceRequestTopicV2) writeTo(wb *writeBuffer) {
  168. wb.writeString(t.TopicName)
  169. wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
  170. }
  171. type produceRequestPartitionV2 struct {
  172. Partition int32
  173. MessageSetSize int32
  174. MessageSet messageSet
  175. }
  176. func (p produceRequestPartitionV2) size() int32 {
  177. return 4 + 4 + p.MessageSet.size()
  178. }
  179. func (p produceRequestPartitionV2) writeTo(wb *writeBuffer) {
  180. wb.writeInt32(p.Partition)
  181. wb.writeInt32(p.MessageSetSize)
  182. p.MessageSet.writeTo(wb)
  183. }
  184. type produceResponseV2 struct {
  185. ThrottleTime int32
  186. Topics []produceResponseTopicV2
  187. }
  188. func (r produceResponseV2) size() int32 {
  189. return 4 + sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
  190. }
  191. func (r produceResponseV2) writeTo(wb *writeBuffer) {
  192. wb.writeInt32(r.ThrottleTime)
  193. wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
  194. }
  195. type produceResponseTopicV2 struct {
  196. TopicName string
  197. Partitions []produceResponsePartitionV2
  198. }
  199. func (t produceResponseTopicV2) size() int32 {
  200. return sizeofString(t.TopicName) +
  201. sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
  202. }
  203. func (t produceResponseTopicV2) writeTo(wb *writeBuffer) {
  204. wb.writeString(t.TopicName)
  205. wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
  206. }
  207. type produceResponsePartitionV2 struct {
  208. Partition int32
  209. ErrorCode int16
  210. Offset int64
  211. Timestamp int64
  212. }
  213. func (p produceResponsePartitionV2) size() int32 {
  214. return 4 + 2 + 8 + 8
  215. }
  216. func (p produceResponsePartitionV2) writeTo(wb *writeBuffer) {
  217. wb.writeInt32(p.Partition)
  218. wb.writeInt16(p.ErrorCode)
  219. wb.writeInt64(p.Offset)
  220. wb.writeInt64(p.Timestamp)
  221. }
  222. func (p *produceResponsePartitionV2) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
  223. if remain, err = readInt32(r, sz, &p.Partition); err != nil {
  224. return
  225. }
  226. if remain, err = readInt16(r, remain, &p.ErrorCode); err != nil {
  227. return
  228. }
  229. if remain, err = readInt64(r, remain, &p.Offset); err != nil {
  230. return
  231. }
  232. if remain, err = readInt64(r, remain, &p.Timestamp); err != nil {
  233. return
  234. }
  235. return
  236. }
  237. type produceResponsePartitionV7 struct {
  238. Partition int32
  239. ErrorCode int16
  240. Offset int64
  241. Timestamp int64
  242. StartOffset int64
  243. }
  244. func (p produceResponsePartitionV7) size() int32 {
  245. return 4 + 2 + 8 + 8 + 8
  246. }
  247. func (p produceResponsePartitionV7) writeTo(wb *writeBuffer) {
  248. wb.writeInt32(p.Partition)
  249. wb.writeInt16(p.ErrorCode)
  250. wb.writeInt64(p.Offset)
  251. wb.writeInt64(p.Timestamp)
  252. wb.writeInt64(p.StartOffset)
  253. }
  254. func (p *produceResponsePartitionV7) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
  255. if remain, err = readInt32(r, sz, &p.Partition); err != nil {
  256. return
  257. }
  258. if remain, err = readInt16(r, remain, &p.ErrorCode); err != nil {
  259. return
  260. }
  261. if remain, err = readInt64(r, remain, &p.Offset); err != nil {
  262. return
  263. }
  264. if remain, err = readInt64(r, remain, &p.Timestamp); err != nil {
  265. return
  266. }
  267. if remain, err = readInt64(r, remain, &p.StartOffset); err != nil {
  268. return
  269. }
  270. return
  271. }