123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- package kafka
- import (
- "context"
- "net"
- "time"
- "github.com/segmentio/kafka-go/protocol/alterpartitionreassignments"
- )
- // AlterPartitionReassignmentsRequest is a request to the AlterPartitionReassignments API.
- type AlterPartitionReassignmentsRequest struct {
- // Address of the kafka broker to send the request to.
- Addr net.Addr
- // Topic is the name of the topic to alter partitions in.
- Topic string
- // Assignments is the list of partition reassignments to submit to the API.
- Assignments []AlterPartitionReassignmentsRequestAssignment
- // Timeout is the amount of time to wait for the request to complete.
- Timeout time.Duration
- }
- // AlterPartitionReassignmentsRequestAssignment contains the requested reassignments for a single
- // partition.
- type AlterPartitionReassignmentsRequestAssignment struct {
- // PartitionID is the ID of the partition to make the reassignments in.
- PartitionID int
- // BrokerIDs is a slice of brokers to set the partition replicas to.
- BrokerIDs []int
- }
- // AlterPartitionReassignmentsResponse is a response from the AlterPartitionReassignments API.
- type AlterPartitionReassignmentsResponse struct {
- // Error is set to a non-nil value including the code and message if a top-level
- // error was encountered when doing the update.
- Error error
- // PartitionResults contains the specific results for each partition.
- PartitionResults []AlterPartitionReassignmentsResponsePartitionResult
- }
- // AlterPartitionReassignmentsResponsePartitionResult contains the detailed result of
- // doing reassignments for a single partition.
- type AlterPartitionReassignmentsResponsePartitionResult struct {
- // PartitionID is the ID of the partition that was altered.
- PartitionID int
- // Error is set to a non-nil value including the code and message if an error was encountered
- // during the update for this partition.
- Error error
- }
- func (c *Client) AlterPartitionReassignments(
- ctx context.Context,
- req *AlterPartitionReassignmentsRequest,
- ) (*AlterPartitionReassignmentsResponse, error) {
- apiPartitions := []alterpartitionreassignments.RequestPartition{}
- for _, assignment := range req.Assignments {
- replicas := []int32{}
- for _, brokerID := range assignment.BrokerIDs {
- replicas = append(replicas, int32(brokerID))
- }
- apiPartitions = append(
- apiPartitions,
- alterpartitionreassignments.RequestPartition{
- PartitionIndex: int32(assignment.PartitionID),
- Replicas: replicas,
- },
- )
- }
- apiReq := &alterpartitionreassignments.Request{
- TimeoutMs: int32(req.Timeout.Milliseconds()),
- Topics: []alterpartitionreassignments.RequestTopic{
- {
- Name: req.Topic,
- Partitions: apiPartitions,
- },
- },
- }
- protoResp, err := c.roundTrip(
- ctx,
- req.Addr,
- apiReq,
- )
- if err != nil {
- return nil, err
- }
- apiResp := protoResp.(*alterpartitionreassignments.Response)
- resp := &AlterPartitionReassignmentsResponse{
- Error: makeError(apiResp.ErrorCode, apiResp.ErrorMessage),
- }
- for _, topicResult := range apiResp.Results {
- for _, partitionResult := range topicResult.Partitions {
- resp.PartitionResults = append(
- resp.PartitionResults,
- AlterPartitionReassignmentsResponsePartitionResult{
- PartitionID: int(partitionResult.PartitionIndex),
- Error: makeError(partitionResult.ErrorCode, partitionResult.ErrorMessage),
- },
- )
- }
- }
- return resp, nil
- }
|