123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- package kafka
- import (
- "context"
- "fmt"
- "net"
- "time"
- metadataAPI "github.com/segmentio/kafka-go/protocol/metadata"
- )
- // MetadataRequest represents a request sent to a kafka broker to retrieve its
- // cluster metadata.
- type MetadataRequest struct {
- // Address of the kafka broker to send the request to.
- Addr net.Addr
- // The list of topics to retrieve metadata for.
- Topics []string
- }
- // MetadatResponse represents a response from a kafka broker to a metadata
- // request.
- type MetadataResponse struct {
- // The amount of time that the broker throttled the request.
- Throttle time.Duration
- // Name of the kafka cluster that client retrieved metadata from.
- ClusterID string
- // The broker which is currently the controller for the cluster.
- Controller Broker
- // The list of brokers registered to the cluster.
- Brokers []Broker
- // The list of topics available on the cluster.
- Topics []Topic
- }
- // Metadata sends a metadata request to a kafka broker and returns the response.
- func (c *Client) Metadata(ctx context.Context, req *MetadataRequest) (*MetadataResponse, error) {
- m, err := c.roundTrip(ctx, req.Addr, &metadataAPI.Request{
- TopicNames: req.Topics,
- })
- if err != nil {
- return nil, fmt.Errorf("kafka.(*Client).Metadata: %w", err)
- }
- res := m.(*metadataAPI.Response)
- ret := &MetadataResponse{
- Throttle: makeDuration(res.ThrottleTimeMs),
- Brokers: make([]Broker, len(res.Brokers)),
- Topics: make([]Topic, len(res.Topics)),
- ClusterID: res.ClusterID,
- }
- brokers := make(map[int32]Broker, len(res.Brokers))
- for i, b := range res.Brokers {
- broker := Broker{
- Host: b.Host,
- Port: int(b.Port),
- ID: int(b.NodeID),
- Rack: b.Rack,
- }
- ret.Brokers[i] = broker
- brokers[b.NodeID] = broker
- if b.NodeID == res.ControllerID {
- ret.Controller = broker
- }
- }
- for i, t := range res.Topics {
- ret.Topics[i] = Topic{
- Name: t.Name,
- Internal: t.IsInternal,
- Partitions: make([]Partition, len(t.Partitions)),
- Error: makeError(t.ErrorCode, ""),
- }
- for j, p := range t.Partitions {
- partition := Partition{
- Topic: t.Name,
- ID: int(p.PartitionIndex),
- Leader: brokers[p.LeaderID],
- Replicas: make([]Broker, len(p.ReplicaNodes)),
- Isr: make([]Broker, len(p.IsrNodes)),
- Error: makeError(p.ErrorCode, ""),
- }
- for i, id := range p.ReplicaNodes {
- partition.Replicas[i] = brokers[id]
- }
- for i, id := range p.IsrNodes {
- partition.Isr[i] = brokers[id]
- }
- ret.Topics[i].Partitions[j] = partition
- }
- }
- return ret, nil
- }
- type topicMetadataRequestV1 []string
- func (r topicMetadataRequestV1) size() int32 {
- return sizeofStringArray([]string(r))
- }
- func (r topicMetadataRequestV1) writeTo(wb *writeBuffer) {
- // communicate nil-ness to the broker by passing -1 as the array length.
- // for this particular request, the broker interpets a zero length array
- // as a request for no topics whereas a nil array is for all topics.
- if r == nil {
- wb.writeArrayLen(-1)
- } else {
- wb.writeStringArray([]string(r))
- }
- }
- type metadataResponseV1 struct {
- Brokers []brokerMetadataV1
- ControllerID int32
- Topics []topicMetadataV1
- }
- func (r metadataResponseV1) size() int32 {
- n1 := sizeofArray(len(r.Brokers), func(i int) int32 { return r.Brokers[i].size() })
- n2 := sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
- return 4 + n1 + n2
- }
- func (r metadataResponseV1) writeTo(wb *writeBuffer) {
- wb.writeArray(len(r.Brokers), func(i int) { r.Brokers[i].writeTo(wb) })
- wb.writeInt32(r.ControllerID)
- wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
- }
- type brokerMetadataV1 struct {
- NodeID int32
- Host string
- Port int32
- Rack string
- }
- func (b brokerMetadataV1) size() int32 {
- return 4 + 4 + sizeofString(b.Host) + sizeofString(b.Rack)
- }
- func (b brokerMetadataV1) writeTo(wb *writeBuffer) {
- wb.writeInt32(b.NodeID)
- wb.writeString(b.Host)
- wb.writeInt32(b.Port)
- wb.writeString(b.Rack)
- }
- type topicMetadataV1 struct {
- TopicErrorCode int16
- TopicName string
- Internal bool
- Partitions []partitionMetadataV1
- }
- func (t topicMetadataV1) size() int32 {
- return 2 + 1 +
- sizeofString(t.TopicName) +
- sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
- }
- func (t topicMetadataV1) writeTo(wb *writeBuffer) {
- wb.writeInt16(t.TopicErrorCode)
- wb.writeString(t.TopicName)
- wb.writeBool(t.Internal)
- wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
- }
- type partitionMetadataV1 struct {
- PartitionErrorCode int16
- PartitionID int32
- Leader int32
- Replicas []int32
- Isr []int32
- }
- func (p partitionMetadataV1) size() int32 {
- return 2 + 4 + 4 + sizeofInt32Array(p.Replicas) + sizeofInt32Array(p.Isr)
- }
- func (p partitionMetadataV1) writeTo(wb *writeBuffer) {
- wb.writeInt16(p.PartitionErrorCode)
- wb.writeInt32(p.PartitionID)
- wb.writeInt32(p.Leader)
- wb.writeInt32Array(p.Replicas)
- wb.writeInt32Array(p.Isr)
- }
|