metadata.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package kafka
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "time"
  7. metadataAPI "github.com/segmentio/kafka-go/protocol/metadata"
  8. )
  9. // MetadataRequest represents a request sent to a kafka broker to retrieve its
  10. // cluster metadata.
  11. type MetadataRequest struct {
  12. // Address of the kafka broker to send the request to.
  13. Addr net.Addr
  14. // The list of topics to retrieve metadata for.
  15. Topics []string
  16. }
  17. // MetadatResponse represents a response from a kafka broker to a metadata
  18. // request.
  19. type MetadataResponse struct {
  20. // The amount of time that the broker throttled the request.
  21. Throttle time.Duration
  22. // Name of the kafka cluster that client retrieved metadata from.
  23. ClusterID string
  24. // The broker which is currently the controller for the cluster.
  25. Controller Broker
  26. // The list of brokers registered to the cluster.
  27. Brokers []Broker
  28. // The list of topics available on the cluster.
  29. Topics []Topic
  30. }
  31. // Metadata sends a metadata request to a kafka broker and returns the response.
  32. func (c *Client) Metadata(ctx context.Context, req *MetadataRequest) (*MetadataResponse, error) {
  33. m, err := c.roundTrip(ctx, req.Addr, &metadataAPI.Request{
  34. TopicNames: req.Topics,
  35. })
  36. if err != nil {
  37. return nil, fmt.Errorf("kafka.(*Client).Metadata: %w", err)
  38. }
  39. res := m.(*metadataAPI.Response)
  40. ret := &MetadataResponse{
  41. Throttle: makeDuration(res.ThrottleTimeMs),
  42. Brokers: make([]Broker, len(res.Brokers)),
  43. Topics: make([]Topic, len(res.Topics)),
  44. ClusterID: res.ClusterID,
  45. }
  46. brokers := make(map[int32]Broker, len(res.Brokers))
  47. for i, b := range res.Brokers {
  48. broker := Broker{
  49. Host: b.Host,
  50. Port: int(b.Port),
  51. ID: int(b.NodeID),
  52. Rack: b.Rack,
  53. }
  54. ret.Brokers[i] = broker
  55. brokers[b.NodeID] = broker
  56. if b.NodeID == res.ControllerID {
  57. ret.Controller = broker
  58. }
  59. }
  60. for i, t := range res.Topics {
  61. ret.Topics[i] = Topic{
  62. Name: t.Name,
  63. Internal: t.IsInternal,
  64. Partitions: make([]Partition, len(t.Partitions)),
  65. Error: makeError(t.ErrorCode, ""),
  66. }
  67. for j, p := range t.Partitions {
  68. partition := Partition{
  69. Topic: t.Name,
  70. ID: int(p.PartitionIndex),
  71. Leader: brokers[p.LeaderID],
  72. Replicas: make([]Broker, len(p.ReplicaNodes)),
  73. Isr: make([]Broker, len(p.IsrNodes)),
  74. Error: makeError(p.ErrorCode, ""),
  75. }
  76. for i, id := range p.ReplicaNodes {
  77. partition.Replicas[i] = brokers[id]
  78. }
  79. for i, id := range p.IsrNodes {
  80. partition.Isr[i] = brokers[id]
  81. }
  82. ret.Topics[i].Partitions[j] = partition
  83. }
  84. }
  85. return ret, nil
  86. }
  87. type topicMetadataRequestV1 []string
  88. func (r topicMetadataRequestV1) size() int32 {
  89. return sizeofStringArray([]string(r))
  90. }
  91. func (r topicMetadataRequestV1) writeTo(wb *writeBuffer) {
  92. // communicate nil-ness to the broker by passing -1 as the array length.
  93. // for this particular request, the broker interpets a zero length array
  94. // as a request for no topics whereas a nil array is for all topics.
  95. if r == nil {
  96. wb.writeArrayLen(-1)
  97. } else {
  98. wb.writeStringArray([]string(r))
  99. }
  100. }
  101. type metadataResponseV1 struct {
  102. Brokers []brokerMetadataV1
  103. ControllerID int32
  104. Topics []topicMetadataV1
  105. }
  106. func (r metadataResponseV1) size() int32 {
  107. n1 := sizeofArray(len(r.Brokers), func(i int) int32 { return r.Brokers[i].size() })
  108. n2 := sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
  109. return 4 + n1 + n2
  110. }
  111. func (r metadataResponseV1) writeTo(wb *writeBuffer) {
  112. wb.writeArray(len(r.Brokers), func(i int) { r.Brokers[i].writeTo(wb) })
  113. wb.writeInt32(r.ControllerID)
  114. wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
  115. }
  116. type brokerMetadataV1 struct {
  117. NodeID int32
  118. Host string
  119. Port int32
  120. Rack string
  121. }
  122. func (b brokerMetadataV1) size() int32 {
  123. return 4 + 4 + sizeofString(b.Host) + sizeofString(b.Rack)
  124. }
  125. func (b brokerMetadataV1) writeTo(wb *writeBuffer) {
  126. wb.writeInt32(b.NodeID)
  127. wb.writeString(b.Host)
  128. wb.writeInt32(b.Port)
  129. wb.writeString(b.Rack)
  130. }
  131. type topicMetadataV1 struct {
  132. TopicErrorCode int16
  133. TopicName string
  134. Internal bool
  135. Partitions []partitionMetadataV1
  136. }
  137. func (t topicMetadataV1) size() int32 {
  138. return 2 + 1 +
  139. sizeofString(t.TopicName) +
  140. sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
  141. }
  142. func (t topicMetadataV1) writeTo(wb *writeBuffer) {
  143. wb.writeInt16(t.TopicErrorCode)
  144. wb.writeString(t.TopicName)
  145. wb.writeBool(t.Internal)
  146. wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
  147. }
  148. type partitionMetadataV1 struct {
  149. PartitionErrorCode int16
  150. PartitionID int32
  151. Leader int32
  152. Replicas []int32
  153. Isr []int32
  154. }
  155. func (p partitionMetadataV1) size() int32 {
  156. return 2 + 4 + 4 + sizeofInt32Array(p.Replicas) + sizeofInt32Array(p.Isr)
  157. }
  158. func (p partitionMetadataV1) writeTo(wb *writeBuffer) {
  159. wb.writeInt16(p.PartitionErrorCode)
  160. wb.writeInt32(p.PartitionID)
  161. wb.writeInt32(p.Leader)
  162. wb.writeInt32Array(p.Replicas)
  163. wb.writeInt32Array(p.Isr)
  164. }