electleaders.go 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package kafka
  2. import (
  3. "context"
  4. "net"
  5. "time"
  6. "github.com/segmentio/kafka-go/protocol/electleaders"
  7. )
  8. // ElectLeadersRequest is a request to the ElectLeaders API.
  9. type ElectLeadersRequest struct {
  10. // Addr is the address of the kafka broker to send the request to.
  11. Addr net.Addr
  12. // Topic is the name of the topic to do the leader elections in.
  13. Topic string
  14. // Partitions is the list of partitions to run leader elections for.
  15. Partitions []int
  16. // Timeout is the amount of time to wait for the election to run.
  17. Timeout time.Duration
  18. }
  19. // ElectLeadersResponse is a response from the ElectLeaders API.
  20. type ElectLeadersResponse struct {
  21. // ErrorCode is set to a non-nil value if a top-level error occurred.
  22. Error error
  23. // PartitionResults contains the results for each partition leader election.
  24. PartitionResults []ElectLeadersResponsePartitionResult
  25. }
  26. // ElectLeadersResponsePartitionResult contains the response details for a single partition.
  27. type ElectLeadersResponsePartitionResult struct {
  28. // Partition is the ID of the partition.
  29. Partition int
  30. // Error is set to a non-nil value if an error occurred electing leaders
  31. // for this partition.
  32. Error error
  33. }
  34. func (c *Client) ElectLeaders(
  35. ctx context.Context,
  36. req *ElectLeadersRequest,
  37. ) (*ElectLeadersResponse, error) {
  38. partitions32 := []int32{}
  39. for _, partition := range req.Partitions {
  40. partitions32 = append(partitions32, int32(partition))
  41. }
  42. protoResp, err := c.roundTrip(
  43. ctx,
  44. req.Addr,
  45. &electleaders.Request{
  46. TopicPartitions: []electleaders.RequestTopicPartitions{
  47. {
  48. Topic: req.Topic,
  49. PartitionIDs: partitions32,
  50. },
  51. },
  52. TimeoutMs: int32(req.Timeout.Milliseconds()),
  53. },
  54. )
  55. if err != nil {
  56. return nil, err
  57. }
  58. apiResp := protoResp.(*electleaders.Response)
  59. resp := &ElectLeadersResponse{
  60. Error: makeError(apiResp.ErrorCode, ""),
  61. }
  62. for _, topicResult := range apiResp.ReplicaElectionResults {
  63. for _, partitionResult := range topicResult.PartitionResults {
  64. resp.PartitionResults = append(
  65. resp.PartitionResults,
  66. ElectLeadersResponsePartitionResult{
  67. Partition: int(partitionResult.PartitionID),
  68. Error: makeError(partitionResult.ErrorCode, partitionResult.ErrorMessage),
  69. },
  70. )
  71. }
  72. }
  73. return resp, nil
  74. }