offsetfetch.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. package kafka
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "net"
  7. "time"
  8. "github.com/segmentio/kafka-go/protocol/offsetfetch"
  9. )
  10. // OffsetFetchRequest represents a request sent to a kafka broker to read the
  11. // currently committed offsets of topic partitions.
  12. type OffsetFetchRequest struct {
  13. // Address of the kafka broker to send the request to.
  14. Addr net.Addr
  15. // ID of the consumer group to retrieve the offsets for.
  16. GroupID string
  17. // Set of topic partitions to retrieve the offsets for.
  18. Topics map[string][]int
  19. }
  20. // OffsetFetchResponse represents a response from a kafka broker to an offset
  21. // fetch request.
  22. type OffsetFetchResponse struct {
  23. // The amount of time that the broker throttled the request.
  24. Throttle time.Duration
  25. // Set of topic partitions that the kafka broker has returned offsets for.
  26. Topics map[string][]OffsetFetchPartition
  27. // An error that may have occurred while attempting to retrieve consumer
  28. // group offsets.
  29. //
  30. // The error contains both the kafka error code, and an error message
  31. // returned by the kafka broker. Programs may use the standard errors.Is
  32. // function to test the error against kafka error codes.
  33. Error error
  34. }
  35. // OffsetFetchPartition represents the state of a single partition in a consumer
  36. // group.
  37. type OffsetFetchPartition struct {
  38. // ID of the partition.
  39. Partition int
  40. // Last committed offsets on the partition when the request was served by
  41. // the kafka broker.
  42. CommittedOffset int64
  43. // Consumer group metadata for this partition.
  44. Metadata string
  45. // An error that may have occurred while attempting to retrieve consumer
  46. // group offsets for this partition.
  47. //
  48. // The error contains both the kafka error code, and an error message
  49. // returned by the kafka broker. Programs may use the standard errors.Is
  50. // function to test the error against kafka error codes.
  51. Error error
  52. }
  53. // OffsetFetch sends an offset fetch request to a kafka broker and returns the
  54. // response.
  55. func (c *Client) OffsetFetch(ctx context.Context, req *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  56. topics := make([]offsetfetch.RequestTopic, 0, len(req.Topics))
  57. for topicName, partitions := range req.Topics {
  58. indexes := make([]int32, len(partitions))
  59. for i, p := range partitions {
  60. indexes[i] = int32(p)
  61. }
  62. topics = append(topics, offsetfetch.RequestTopic{
  63. Name: topicName,
  64. PartitionIndexes: indexes,
  65. })
  66. }
  67. m, err := c.roundTrip(ctx, req.Addr, &offsetfetch.Request{
  68. GroupID: req.GroupID,
  69. Topics: topics,
  70. })
  71. if err != nil {
  72. return nil, fmt.Errorf("kafka.(*Client).OffsetFetch: %w", err)
  73. }
  74. res := m.(*offsetfetch.Response)
  75. ret := &OffsetFetchResponse{
  76. Throttle: makeDuration(res.ThrottleTimeMs),
  77. Topics: make(map[string][]OffsetFetchPartition, len(res.Topics)),
  78. Error: makeError(res.ErrorCode, ""),
  79. }
  80. for _, t := range res.Topics {
  81. partitions := make([]OffsetFetchPartition, len(t.Partitions))
  82. for i, p := range t.Partitions {
  83. partitions[i] = OffsetFetchPartition{
  84. Partition: int(p.PartitionIndex),
  85. CommittedOffset: p.CommittedOffset,
  86. Metadata: p.Metadata,
  87. Error: makeError(p.ErrorCode, ""),
  88. }
  89. }
  90. ret.Topics[t.Name] = partitions
  91. }
  92. return ret, nil
  93. }
  94. type offsetFetchRequestV1Topic struct {
  95. // Topic name
  96. Topic string
  97. // Partitions to fetch offsets
  98. Partitions []int32
  99. }
  100. func (t offsetFetchRequestV1Topic) size() int32 {
  101. return sizeofString(t.Topic) +
  102. sizeofInt32Array(t.Partitions)
  103. }
  104. func (t offsetFetchRequestV1Topic) writeTo(wb *writeBuffer) {
  105. wb.writeString(t.Topic)
  106. wb.writeInt32Array(t.Partitions)
  107. }
  108. type offsetFetchRequestV1 struct {
  109. // GroupID holds the unique group identifier
  110. GroupID string
  111. // Topics to fetch offsets.
  112. Topics []offsetFetchRequestV1Topic
  113. }
  114. func (t offsetFetchRequestV1) size() int32 {
  115. return sizeofString(t.GroupID) +
  116. sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() })
  117. }
  118. func (t offsetFetchRequestV1) writeTo(wb *writeBuffer) {
  119. wb.writeString(t.GroupID)
  120. wb.writeArray(len(t.Topics), func(i int) { t.Topics[i].writeTo(wb) })
  121. }
  122. type offsetFetchResponseV1PartitionResponse struct {
  123. // Partition ID
  124. Partition int32
  125. // Offset of last committed message
  126. Offset int64
  127. // Metadata client wants to keep
  128. Metadata string
  129. // ErrorCode holds response error code
  130. ErrorCode int16
  131. }
  132. func (t offsetFetchResponseV1PartitionResponse) size() int32 {
  133. return sizeofInt32(t.Partition) +
  134. sizeofInt64(t.Offset) +
  135. sizeofString(t.Metadata) +
  136. sizeofInt16(t.ErrorCode)
  137. }
  138. func (t offsetFetchResponseV1PartitionResponse) writeTo(wb *writeBuffer) {
  139. wb.writeInt32(t.Partition)
  140. wb.writeInt64(t.Offset)
  141. wb.writeString(t.Metadata)
  142. wb.writeInt16(t.ErrorCode)
  143. }
  144. func (t *offsetFetchResponseV1PartitionResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  145. if remain, err = readInt32(r, size, &t.Partition); err != nil {
  146. return
  147. }
  148. if remain, err = readInt64(r, remain, &t.Offset); err != nil {
  149. return
  150. }
  151. if remain, err = readString(r, remain, &t.Metadata); err != nil {
  152. return
  153. }
  154. if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil {
  155. return
  156. }
  157. return
  158. }
  159. type offsetFetchResponseV1Response struct {
  160. // Topic name
  161. Topic string
  162. // PartitionResponses holds offsets by partition
  163. PartitionResponses []offsetFetchResponseV1PartitionResponse
  164. }
  165. func (t offsetFetchResponseV1Response) size() int32 {
  166. return sizeofString(t.Topic) +
  167. sizeofArray(len(t.PartitionResponses), func(i int) int32 { return t.PartitionResponses[i].size() })
  168. }
  169. func (t offsetFetchResponseV1Response) writeTo(wb *writeBuffer) {
  170. wb.writeString(t.Topic)
  171. wb.writeArray(len(t.PartitionResponses), func(i int) { t.PartitionResponses[i].writeTo(wb) })
  172. }
  173. func (t *offsetFetchResponseV1Response) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  174. if remain, err = readString(r, size, &t.Topic); err != nil {
  175. return
  176. }
  177. fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
  178. item := offsetFetchResponseV1PartitionResponse{}
  179. if fnRemain, fnErr = (&item).readFrom(r, size); err != nil {
  180. return
  181. }
  182. t.PartitionResponses = append(t.PartitionResponses, item)
  183. return
  184. }
  185. if remain, err = readArrayWith(r, remain, fn); err != nil {
  186. return
  187. }
  188. return
  189. }
  190. type offsetFetchResponseV1 struct {
  191. // Responses holds topic partition offsets
  192. Responses []offsetFetchResponseV1Response
  193. }
  194. func (t offsetFetchResponseV1) size() int32 {
  195. return sizeofArray(len(t.Responses), func(i int) int32 { return t.Responses[i].size() })
  196. }
  197. func (t offsetFetchResponseV1) writeTo(wb *writeBuffer) {
  198. wb.writeArray(len(t.Responses), func(i int) { t.Responses[i].writeTo(wb) })
  199. }
  200. func (t *offsetFetchResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  201. fn := func(r *bufio.Reader, withSize int) (fnRemain int, fnErr error) {
  202. item := offsetFetchResponseV1Response{}
  203. if fnRemain, fnErr = (&item).readFrom(r, withSize); fnErr != nil {
  204. return
  205. }
  206. t.Responses = append(t.Responses, item)
  207. return
  208. }
  209. if remain, err = readArrayWith(r, size, fn); err != nil {
  210. return
  211. }
  212. return
  213. }
  214. func findOffset(topic string, partition int32, response offsetFetchResponseV1) (int64, bool) {
  215. for _, r := range response.Responses {
  216. if r.Topic != topic {
  217. continue
  218. }
  219. for _, pr := range r.PartitionResponses {
  220. if pr.Partition == partition {
  221. return pr.Offset, true
  222. }
  223. }
  224. }
  225. return 0, false
  226. }