alterpartitionreassignments.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package kafka
  2. import (
  3. "context"
  4. "net"
  5. "time"
  6. "github.com/segmentio/kafka-go/protocol/alterpartitionreassignments"
  7. )
  8. // AlterPartitionReassignmentsRequest is a request to the AlterPartitionReassignments API.
  9. type AlterPartitionReassignmentsRequest struct {
  10. // Address of the kafka broker to send the request to.
  11. Addr net.Addr
  12. // Topic is the name of the topic to alter partitions in.
  13. Topic string
  14. // Assignments is the list of partition reassignments to submit to the API.
  15. Assignments []AlterPartitionReassignmentsRequestAssignment
  16. // Timeout is the amount of time to wait for the request to complete.
  17. Timeout time.Duration
  18. }
  19. // AlterPartitionReassignmentsRequestAssignment contains the requested reassignments for a single
  20. // partition.
  21. type AlterPartitionReassignmentsRequestAssignment struct {
  22. // PartitionID is the ID of the partition to make the reassignments in.
  23. PartitionID int
  24. // BrokerIDs is a slice of brokers to set the partition replicas to.
  25. BrokerIDs []int
  26. }
  27. // AlterPartitionReassignmentsResponse is a response from the AlterPartitionReassignments API.
  28. type AlterPartitionReassignmentsResponse struct {
  29. // Error is set to a non-nil value including the code and message if a top-level
  30. // error was encountered when doing the update.
  31. Error error
  32. // PartitionResults contains the specific results for each partition.
  33. PartitionResults []AlterPartitionReassignmentsResponsePartitionResult
  34. }
  35. // AlterPartitionReassignmentsResponsePartitionResult contains the detailed result of
  36. // doing reassignments for a single partition.
  37. type AlterPartitionReassignmentsResponsePartitionResult struct {
  38. // PartitionID is the ID of the partition that was altered.
  39. PartitionID int
  40. // Error is set to a non-nil value including the code and message if an error was encountered
  41. // during the update for this partition.
  42. Error error
  43. }
  44. func (c *Client) AlterPartitionReassignments(
  45. ctx context.Context,
  46. req *AlterPartitionReassignmentsRequest,
  47. ) (*AlterPartitionReassignmentsResponse, error) {
  48. apiPartitions := []alterpartitionreassignments.RequestPartition{}
  49. for _, assignment := range req.Assignments {
  50. replicas := []int32{}
  51. for _, brokerID := range assignment.BrokerIDs {
  52. replicas = append(replicas, int32(brokerID))
  53. }
  54. apiPartitions = append(
  55. apiPartitions,
  56. alterpartitionreassignments.RequestPartition{
  57. PartitionIndex: int32(assignment.PartitionID),
  58. Replicas: replicas,
  59. },
  60. )
  61. }
  62. apiReq := &alterpartitionreassignments.Request{
  63. TimeoutMs: int32(req.Timeout.Milliseconds()),
  64. Topics: []alterpartitionreassignments.RequestTopic{
  65. {
  66. Name: req.Topic,
  67. Partitions: apiPartitions,
  68. },
  69. },
  70. }
  71. protoResp, err := c.roundTrip(
  72. ctx,
  73. req.Addr,
  74. apiReq,
  75. )
  76. if err != nil {
  77. return nil, err
  78. }
  79. apiResp := protoResp.(*alterpartitionreassignments.Response)
  80. resp := &AlterPartitionReassignmentsResponse{
  81. Error: makeError(apiResp.ErrorCode, apiResp.ErrorMessage),
  82. }
  83. for _, topicResult := range apiResp.Results {
  84. for _, partitionResult := range topicResult.Partitions {
  85. resp.PartitionResults = append(
  86. resp.PartitionResults,
  87. AlterPartitionReassignmentsResponsePartitionResult{
  88. PartitionID: int(partitionResult.PartitionIndex),
  89. Error: makeError(partitionResult.ErrorCode, partitionResult.ErrorMessage),
  90. },
  91. )
  92. }
  93. }
  94. return resp, nil
  95. }