fetch.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package fetch
  2. import (
  3. "fmt"
  4. "github.com/segmentio/kafka-go/protocol"
  5. )
  6. func init() {
  7. protocol.Register(&Request{}, &Response{})
  8. }
  9. type Request struct {
  10. ReplicaID int32 `kafka:"min=v0,max=v11"`
  11. MaxWaitTime int32 `kafka:"min=v0,max=v11"`
  12. MinBytes int32 `kafka:"min=v0,max=v11"`
  13. MaxBytes int32 `kafka:"min=v3,max=v11"`
  14. IsolationLevel int8 `kafka:"min=v4,max=v11"`
  15. SessionID int32 `kafka:"min=v7,max=v11"`
  16. SessionEpoch int32 `kafka:"min=v7,max=v11"`
  17. Topics []RequestTopic `kafka:"min=v0,max=v11"`
  18. ForgottenTopics []RequestForgottenTopic `kafka:"min=v7,max=v11"`
  19. RackID string `kafka:"min=v11,max=v11"`
  20. }
  21. func (r *Request) ApiKey() protocol.ApiKey { return protocol.Fetch }
  22. func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
  23. broker := protocol.Broker{ID: -1}
  24. for i := range r.Topics {
  25. t := &r.Topics[i]
  26. topic, ok := cluster.Topics[t.Topic]
  27. if !ok {
  28. return broker, NewError(protocol.NewErrNoTopic(t.Topic))
  29. }
  30. for j := range t.Partitions {
  31. p := &t.Partitions[j]
  32. partition, ok := topic.Partitions[p.Partition]
  33. if !ok {
  34. return broker, NewError(protocol.NewErrNoPartition(t.Topic, p.Partition))
  35. }
  36. if b, ok := cluster.Brokers[partition.Leader]; !ok {
  37. return broker, NewError(protocol.NewErrNoLeader(t.Topic, p.Partition))
  38. } else if broker.ID < 0 {
  39. broker = b
  40. } else if b.ID != broker.ID {
  41. return broker, NewError(fmt.Errorf("mismatching leaders (%d!=%d)", b.ID, broker.ID))
  42. }
  43. }
  44. }
  45. return broker, nil
  46. }
  47. type RequestTopic struct {
  48. Topic string `kafka:"min=v0,max=v11"`
  49. Partitions []RequestPartition `kafka:"min=v0,max=v11"`
  50. }
  51. type RequestPartition struct {
  52. Partition int32 `kafka:"min=v0,max=v11"`
  53. CurrentLeaderEpoch int32 `kafka:"min=v9,max=v11"`
  54. FetchOffset int64 `kafka:"min=v0,max=v11"`
  55. LogStartOffset int64 `kafka:"min=v5,max=v11"`
  56. PartitionMaxBytes int32 `kafka:"min=v0,max=v11"`
  57. }
  58. type RequestForgottenTopic struct {
  59. Topic string `kafka:"min=v7,max=v11"`
  60. Partitions []int32 `kafka:"min=v7,max=v11"`
  61. }
  62. type Response struct {
  63. ThrottleTimeMs int32 `kafka:"min=v1,max=v11"`
  64. ErrorCode int16 `kafka:"min=v7,max=v11"`
  65. SessionID int32 `kafka:"min=v7,max=v11"`
  66. Topics []ResponseTopic `kafka:"min=v0,max=v11"`
  67. }
  68. func (r *Response) ApiKey() protocol.ApiKey { return protocol.Fetch }
  69. type ResponseTopic struct {
  70. Topic string `kafka:"min=v0,max=v11"`
  71. Partitions []ResponsePartition `kafka:"min=v0,max=v11"`
  72. }
  73. type ResponsePartition struct {
  74. Partition int32 `kafka:"min=v0,max=v11"`
  75. ErrorCode int16 `kafka:"min=v0,max=v11"`
  76. HighWatermark int64 `kafka:"min=v0,max=v11"`
  77. LastStableOffset int64 `kafka:"min=v4,max=v11"`
  78. LogStartOffset int64 `kafka:"min=v5,max=v11"`
  79. AbortedTransactions []ResponseTransaction `kafka:"min=v4,max=v11"`
  80. PreferredReadReplica int32 `kafka:"min=v11,max=v11"`
  81. RecordSet protocol.RecordSet `kafka:"min=v0,max=v11"`
  82. }
  83. type ResponseTransaction struct {
  84. ProducerID int64 `kafka:"min=v4,max=v11"`
  85. FirstOffset int64 `kafka:"min=v4,max=v11"`
  86. }
  87. var (
  88. _ protocol.BrokerMessage = (*Request)(nil)
  89. )
  90. type Error struct {
  91. Err error
  92. }
  93. func NewError(err error) *Error {
  94. return &Error{Err: err}
  95. }
  96. func (e *Error) Error() string {
  97. return fmt.Sprintf("fetch request error: %v", e.Err)
  98. }
  99. func (e *Error) Unwrap() error {
  100. return e.Err
  101. }