1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- package kafka
- import (
- "context"
- "net"
- "time"
- "github.com/segmentio/kafka-go/protocol/electleaders"
- )
- // ElectLeadersRequest is a request to the ElectLeaders API.
- type ElectLeadersRequest struct {
- // Addr is the address of the kafka broker to send the request to.
- Addr net.Addr
- // Topic is the name of the topic to do the leader elections in.
- Topic string
- // Partitions is the list of partitions to run leader elections for.
- Partitions []int
- // Timeout is the amount of time to wait for the election to run.
- Timeout time.Duration
- }
- // ElectLeadersResponse is a response from the ElectLeaders API.
- type ElectLeadersResponse struct {
- // ErrorCode is set to a non-nil value if a top-level error occurred.
- Error error
- // PartitionResults contains the results for each partition leader election.
- PartitionResults []ElectLeadersResponsePartitionResult
- }
- // ElectLeadersResponsePartitionResult contains the response details for a single partition.
- type ElectLeadersResponsePartitionResult struct {
- // Partition is the ID of the partition.
- Partition int
- // Error is set to a non-nil value if an error occurred electing leaders
- // for this partition.
- Error error
- }
- func (c *Client) ElectLeaders(
- ctx context.Context,
- req *ElectLeadersRequest,
- ) (*ElectLeadersResponse, error) {
- partitions32 := []int32{}
- for _, partition := range req.Partitions {
- partitions32 = append(partitions32, int32(partition))
- }
- protoResp, err := c.roundTrip(
- ctx,
- req.Addr,
- &electleaders.Request{
- TopicPartitions: []electleaders.RequestTopicPartitions{
- {
- Topic: req.Topic,
- PartitionIDs: partitions32,
- },
- },
- TimeoutMs: int32(req.Timeout.Milliseconds()),
- },
- )
- if err != nil {
- return nil, err
- }
- apiResp := protoResp.(*electleaders.Response)
- resp := &ElectLeadersResponse{
- Error: makeError(apiResp.ErrorCode, ""),
- }
- for _, topicResult := range apiResp.ReplicaElectionResults {
- for _, partitionResult := range topicResult.PartitionResults {
- resp.PartitionResults = append(
- resp.PartitionResults,
- ElectLeadersResponsePartitionResult{
- Partition: int(partitionResult.PartitionID),
- Error: makeError(partitionResult.ErrorCode, partitionResult.ErrorMessage),
- },
- )
- }
- }
- return resp, nil
- }
|