listoffset.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. package kafka
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "net"
  7. "time"
  8. "github.com/segmentio/kafka-go/protocol/listoffsets"
  9. )
  10. // OffsetRequest represents a request to retrieve a single partition offset.
  11. type OffsetRequest struct {
  12. Partition int
  13. Timestamp int64
  14. }
  15. // FirstOffsetOf constructs an OffsetRequest which asks for the first offset of
  16. // the parition given as argument.
  17. func FirstOffsetOf(partition int) OffsetRequest {
  18. return OffsetRequest{Partition: partition, Timestamp: FirstOffset}
  19. }
  20. // LastOffsetOf constructs an OffsetRequest which asks for the last offset of
  21. // the partition given as argument.
  22. func LastOffsetOf(partition int) OffsetRequest {
  23. return OffsetRequest{Partition: partition, Timestamp: LastOffset}
  24. }
  25. // TimeOffsetOf constructs an OffsetRequest which asks for a partition offset
  26. // at a given time.
  27. func TimeOffsetOf(partition int, at time.Time) OffsetRequest {
  28. return OffsetRequest{Partition: partition, Timestamp: timestamp(at)}
  29. }
  30. // PartitionOffsets carries information about offsets available in a topic
  31. // partition.
  32. type PartitionOffsets struct {
  33. Partition int
  34. FirstOffset int64
  35. LastOffset int64
  36. Offsets map[int64]time.Time
  37. Error error
  38. }
  39. // ListOffsetsRequest represents a request sent to a kafka broker to list of the
  40. // offsets of topic partitions.
  41. type ListOffsetsRequest struct {
  42. // Address of the kafka broker to send the request to.
  43. Addr net.Addr
  44. // A mapping of topic names to list of partitions that the program wishes to
  45. // get the offsets for.
  46. Topics map[string][]OffsetRequest
  47. // The isolation level for the request.
  48. //
  49. // Defaults to ReadUncommitted.
  50. //
  51. // This field requires the kafka broker to support the ListOffsets API in
  52. // version 2 or above (otherwise the value is ignored).
  53. IsolationLevel IsolationLevel
  54. }
  55. // ListOffsetsResponse represents a response from a kafka broker to a offset
  56. // listing request.
  57. type ListOffsetsResponse struct {
  58. // The amount of time that the broker throttled the request.
  59. Throttle time.Duration
  60. // Mappings of topics names to partition offsets, there will be one entry
  61. // for each topic in the request.
  62. Topics map[string][]PartitionOffsets
  63. }
  64. // ListOffsets sends an offset request to a kafka broker and returns the
  65. // response.
  66. func (c *Client) ListOffsets(ctx context.Context, req *ListOffsetsRequest) (*ListOffsetsResponse, error) {
  67. type topicPartition struct {
  68. topic string
  69. partition int
  70. }
  71. partitionOffsets := make(map[topicPartition]PartitionOffsets)
  72. for topicName, requests := range req.Topics {
  73. for _, r := range requests {
  74. key := topicPartition{
  75. topic: topicName,
  76. partition: r.Partition,
  77. }
  78. partition, ok := partitionOffsets[key]
  79. if !ok {
  80. partition = PartitionOffsets{
  81. Partition: r.Partition,
  82. FirstOffset: -1,
  83. LastOffset: -1,
  84. Offsets: make(map[int64]time.Time),
  85. }
  86. }
  87. switch r.Timestamp {
  88. case FirstOffset:
  89. partition.FirstOffset = 0
  90. case LastOffset:
  91. partition.LastOffset = 0
  92. }
  93. partitionOffsets[topicPartition{
  94. topic: topicName,
  95. partition: r.Partition,
  96. }] = partition
  97. }
  98. }
  99. topics := make([]listoffsets.RequestTopic, 0, len(req.Topics))
  100. for topicName, requests := range req.Topics {
  101. partitions := make([]listoffsets.RequestPartition, len(requests))
  102. for i, r := range requests {
  103. partitions[i] = listoffsets.RequestPartition{
  104. Partition: int32(r.Partition),
  105. CurrentLeaderEpoch: -1,
  106. Timestamp: r.Timestamp,
  107. }
  108. }
  109. topics = append(topics, listoffsets.RequestTopic{
  110. Topic: topicName,
  111. Partitions: partitions,
  112. })
  113. }
  114. m, err := c.roundTrip(ctx, req.Addr, &listoffsets.Request{
  115. ReplicaID: -1,
  116. IsolationLevel: int8(req.IsolationLevel),
  117. Topics: topics,
  118. })
  119. if err != nil {
  120. return nil, fmt.Errorf("kafka.(*Client).ListOffsets: %w", err)
  121. }
  122. res := m.(*listoffsets.Response)
  123. ret := &ListOffsetsResponse{
  124. Throttle: makeDuration(res.ThrottleTimeMs),
  125. Topics: make(map[string][]PartitionOffsets, len(res.Topics)),
  126. }
  127. for _, t := range res.Topics {
  128. for _, p := range t.Partitions {
  129. key := topicPartition{
  130. topic: t.Topic,
  131. partition: int(p.Partition),
  132. }
  133. partition := partitionOffsets[key]
  134. switch p.Timestamp {
  135. case FirstOffset:
  136. partition.FirstOffset = p.Offset
  137. case LastOffset:
  138. partition.LastOffset = p.Offset
  139. default:
  140. partition.Offsets[p.Offset] = makeTime(p.Timestamp)
  141. }
  142. if p.ErrorCode != 0 {
  143. partition.Error = Error(p.ErrorCode)
  144. }
  145. partitionOffsets[key] = partition
  146. }
  147. }
  148. for key, partition := range partitionOffsets {
  149. ret.Topics[key.topic] = append(ret.Topics[key.topic], partition)
  150. }
  151. return ret, nil
  152. }
  153. type listOffsetRequestV1 struct {
  154. ReplicaID int32
  155. Topics []listOffsetRequestTopicV1
  156. }
  157. func (r listOffsetRequestV1) size() int32 {
  158. return 4 + sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
  159. }
  160. func (r listOffsetRequestV1) writeTo(wb *writeBuffer) {
  161. wb.writeInt32(r.ReplicaID)
  162. wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
  163. }
  164. type listOffsetRequestTopicV1 struct {
  165. TopicName string
  166. Partitions []listOffsetRequestPartitionV1
  167. }
  168. func (t listOffsetRequestTopicV1) size() int32 {
  169. return sizeofString(t.TopicName) +
  170. sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
  171. }
  172. func (t listOffsetRequestTopicV1) writeTo(wb *writeBuffer) {
  173. wb.writeString(t.TopicName)
  174. wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
  175. }
  176. type listOffsetRequestPartitionV1 struct {
  177. Partition int32
  178. Time int64
  179. }
  180. func (p listOffsetRequestPartitionV1) size() int32 {
  181. return 4 + 8
  182. }
  183. func (p listOffsetRequestPartitionV1) writeTo(wb *writeBuffer) {
  184. wb.writeInt32(p.Partition)
  185. wb.writeInt64(p.Time)
  186. }
  187. type listOffsetResponseV1 []listOffsetResponseTopicV1
  188. func (r listOffsetResponseV1) size() int32 {
  189. return sizeofArray(len(r), func(i int) int32 { return r[i].size() })
  190. }
  191. func (r listOffsetResponseV1) writeTo(wb *writeBuffer) {
  192. wb.writeArray(len(r), func(i int) { r[i].writeTo(wb) })
  193. }
  194. type listOffsetResponseTopicV1 struct {
  195. TopicName string
  196. PartitionOffsets []partitionOffsetV1
  197. }
  198. func (t listOffsetResponseTopicV1) size() int32 {
  199. return sizeofString(t.TopicName) +
  200. sizeofArray(len(t.PartitionOffsets), func(i int) int32 { return t.PartitionOffsets[i].size() })
  201. }
  202. func (t listOffsetResponseTopicV1) writeTo(wb *writeBuffer) {
  203. wb.writeString(t.TopicName)
  204. wb.writeArray(len(t.PartitionOffsets), func(i int) { t.PartitionOffsets[i].writeTo(wb) })
  205. }
  206. type partitionOffsetV1 struct {
  207. Partition int32
  208. ErrorCode int16
  209. Timestamp int64
  210. Offset int64
  211. }
  212. func (p partitionOffsetV1) size() int32 {
  213. return 4 + 2 + 8 + 8
  214. }
  215. func (p partitionOffsetV1) writeTo(wb *writeBuffer) {
  216. wb.writeInt32(p.Partition)
  217. wb.writeInt16(p.ErrorCode)
  218. wb.writeInt64(p.Timestamp)
  219. wb.writeInt64(p.Offset)
  220. }
  221. func (p *partitionOffsetV1) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
  222. if remain, err = readInt32(r, sz, &p.Partition); err != nil {
  223. return
  224. }
  225. if remain, err = readInt16(r, remain, &p.ErrorCode); err != nil {
  226. return
  227. }
  228. if remain, err = readInt64(r, remain, &p.Timestamp); err != nil {
  229. return
  230. }
  231. if remain, err = readInt64(r, remain, &p.Offset); err != nil {
  232. return
  233. }
  234. return
  235. }