fetch.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. package kafka
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "net"
  7. "time"
  8. "github.com/segmentio/kafka-go/protocol"
  9. fetchAPI "github.com/segmentio/kafka-go/protocol/fetch"
  10. )
  11. // FetchRequest represents a request sent to a kafka broker to retrieve records
  12. // from a topic partition.
  13. type FetchRequest struct {
  14. // Address of the kafka broker to send the request to.
  15. Addr net.Addr
  16. // Topic, partition, and offset to retrieve records from.
  17. Topic string
  18. Partition int
  19. Offset int64
  20. // Size and time limits of the response returned by the broker.
  21. MinBytes int64
  22. MaxBytes int64
  23. MaxWait time.Duration
  24. // The isolation level for the request.
  25. //
  26. // Defaults to ReadUncommitted.
  27. //
  28. // This field requires the kafka broker to support the Fetch API in version
  29. // 4 or above (otherwise the value is ignored).
  30. IsolationLevel IsolationLevel
  31. }
  32. // FetchResponse represents a response from a kafka broker to a fetch request.
  33. type FetchResponse struct {
  34. // The amount of time that the broker throttled the request.
  35. Throttle time.Duration
  36. // The topic and partition that the response came for (will match the values
  37. // in the request).
  38. Topic string
  39. Partition int
  40. // Informations about the topic partition layout returned from the broker.
  41. //
  42. // LastStableOffset requires the kafka broker to support the Fetch API in
  43. // version 4 or above (otherwise the value is zero).
  44. //
  45. /// LogStartOffset requires the kafka broker to support the Fetch API in
  46. // version 5 or above (otherwise the value is zero).
  47. HighWatermark int64
  48. LastStableOffset int64
  49. LogStartOffset int64
  50. // An error that may have occurred while attempting to fetch the records.
  51. //
  52. // The error contains both the kafka error code, and an error message
  53. // returned by the kafka broker. Programs may use the standard errors.Is
  54. // function to test the error against kafka error codes.
  55. Error error
  56. // The set of records returned in the response.
  57. //
  58. // The program is expected to call the RecordSet's Close method when it
  59. // finished reading the records.
  60. //
  61. // Note that kafka may return record batches that start at an offset before
  62. // the one that was requested. It is the program's responsibility to skip
  63. // the offsets that it is not interested in.
  64. Records RecordReader
  65. }
  66. // Fetch sends a fetch request to a kafka broker and returns the response.
  67. //
  68. // If the broker returned an invalid response with no topics, an error wrapping
  69. // protocol.ErrNoTopic is returned.
  70. //
  71. // If the broker returned an invalid response with no partitions, an error
  72. // wrapping ErrNoPartitions is returned.
  73. func (c *Client) Fetch(ctx context.Context, req *FetchRequest) (*FetchResponse, error) {
  74. timeout := c.timeout(ctx, math.MaxInt64)
  75. maxWait := req.maxWait()
  76. if maxWait < timeout {
  77. timeout = maxWait
  78. }
  79. m, err := c.roundTrip(ctx, req.Addr, &fetchAPI.Request{
  80. ReplicaID: -1,
  81. MaxWaitTime: milliseconds(timeout),
  82. MinBytes: int32(req.MinBytes),
  83. MaxBytes: int32(req.MaxBytes),
  84. IsolationLevel: int8(req.IsolationLevel),
  85. SessionID: -1,
  86. SessionEpoch: -1,
  87. Topics: []fetchAPI.RequestTopic{{
  88. Topic: req.Topic,
  89. Partitions: []fetchAPI.RequestPartition{{
  90. Partition: int32(req.Partition),
  91. CurrentLeaderEpoch: -1,
  92. FetchOffset: req.Offset,
  93. LogStartOffset: -1,
  94. PartitionMaxBytes: int32(req.MaxBytes),
  95. }},
  96. }},
  97. })
  98. if err != nil {
  99. return nil, fmt.Errorf("kafka.(*Client).Fetch: %w", err)
  100. }
  101. res := m.(*fetchAPI.Response)
  102. if len(res.Topics) == 0 {
  103. return nil, fmt.Errorf("kafka.(*Client).Fetch: %w", protocol.ErrNoTopic)
  104. }
  105. topic := &res.Topics[0]
  106. if len(topic.Partitions) == 0 {
  107. return nil, fmt.Errorf("kafka.(*Client).Fetch: %w", protocol.ErrNoPartition)
  108. }
  109. partition := &topic.Partitions[0]
  110. ret := &FetchResponse{
  111. Throttle: makeDuration(res.ThrottleTimeMs),
  112. Topic: topic.Topic,
  113. Partition: int(partition.Partition),
  114. Error: makeError(res.ErrorCode, ""),
  115. HighWatermark: partition.HighWatermark,
  116. LastStableOffset: partition.LastStableOffset,
  117. LogStartOffset: partition.LogStartOffset,
  118. Records: partition.RecordSet.Records,
  119. }
  120. if partition.ErrorCode != 0 {
  121. ret.Error = makeError(partition.ErrorCode, "")
  122. }
  123. if ret.Records == nil {
  124. ret.Records = NewRecordReader()
  125. }
  126. return ret, nil
  127. }
  128. func (req *FetchRequest) maxWait() time.Duration {
  129. if req.MaxWait > 0 {
  130. return req.MaxWait
  131. }
  132. return defaultMaxWait
  133. }
  134. type fetchRequestV2 struct {
  135. ReplicaID int32
  136. MaxWaitTime int32
  137. MinBytes int32
  138. Topics []fetchRequestTopicV2
  139. }
  140. func (r fetchRequestV2) size() int32 {
  141. return 4 + 4 + 4 + sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
  142. }
  143. func (r fetchRequestV2) writeTo(wb *writeBuffer) {
  144. wb.writeInt32(r.ReplicaID)
  145. wb.writeInt32(r.MaxWaitTime)
  146. wb.writeInt32(r.MinBytes)
  147. wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
  148. }
  149. type fetchRequestTopicV2 struct {
  150. TopicName string
  151. Partitions []fetchRequestPartitionV2
  152. }
  153. func (t fetchRequestTopicV2) size() int32 {
  154. return sizeofString(t.TopicName) +
  155. sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
  156. }
  157. func (t fetchRequestTopicV2) writeTo(wb *writeBuffer) {
  158. wb.writeString(t.TopicName)
  159. wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
  160. }
  161. type fetchRequestPartitionV2 struct {
  162. Partition int32
  163. FetchOffset int64
  164. MaxBytes int32
  165. }
  166. func (p fetchRequestPartitionV2) size() int32 {
  167. return 4 + 8 + 4
  168. }
  169. func (p fetchRequestPartitionV2) writeTo(wb *writeBuffer) {
  170. wb.writeInt32(p.Partition)
  171. wb.writeInt64(p.FetchOffset)
  172. wb.writeInt32(p.MaxBytes)
  173. }
  174. type fetchResponseV2 struct {
  175. ThrottleTime int32
  176. Topics []fetchResponseTopicV2
  177. }
  178. func (r fetchResponseV2) size() int32 {
  179. return 4 + sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
  180. }
  181. func (r fetchResponseV2) writeTo(wb *writeBuffer) {
  182. wb.writeInt32(r.ThrottleTime)
  183. wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
  184. }
  185. type fetchResponseTopicV2 struct {
  186. TopicName string
  187. Partitions []fetchResponsePartitionV2
  188. }
  189. func (t fetchResponseTopicV2) size() int32 {
  190. return sizeofString(t.TopicName) +
  191. sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
  192. }
  193. func (t fetchResponseTopicV2) writeTo(wb *writeBuffer) {
  194. wb.writeString(t.TopicName)
  195. wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
  196. }
  197. type fetchResponsePartitionV2 struct {
  198. Partition int32
  199. ErrorCode int16
  200. HighwaterMarkOffset int64
  201. MessageSetSize int32
  202. MessageSet messageSet
  203. }
  204. func (p fetchResponsePartitionV2) size() int32 {
  205. return 4 + 2 + 8 + 4 + p.MessageSet.size()
  206. }
  207. func (p fetchResponsePartitionV2) writeTo(wb *writeBuffer) {
  208. wb.writeInt32(p.Partition)
  209. wb.writeInt16(p.ErrorCode)
  210. wb.writeInt64(p.HighwaterMarkOffset)
  211. wb.writeInt32(p.MessageSetSize)
  212. p.MessageSet.writeTo(wb)
  213. }