123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- package kafka
- import (
- "context"
- "fmt"
- "net"
- "time"
- "github.com/segmentio/kafka-go/protocol/createpartitions"
- )
- // CreatePartitionsRequest represents a request sent to a kafka broker to create
- // and update topic parititions.
- type CreatePartitionsRequest struct {
- // Address of the kafka broker to send the request to.
- Addr net.Addr
- // List of topics to create and their configuration.
- Topics []TopicPartitionsConfig
- // When set to true, topics are not created but the configuration is
- // validated as if they were.
- ValidateOnly bool
- }
- // CreatePartitionsResponse represents a response from a kafka broker to a partition
- // creation request.
- type CreatePartitionsResponse struct {
- // The amount of time that the broker throttled the request.
- Throttle time.Duration
- // Mapping of topic names to errors that occurred while attempting to create
- // the topics.
- //
- // The errors contain the kafka error code. Programs may use the standard
- // errors.Is function to test the error against kafka error codes.
- Errors map[string]error
- }
- // CreatePartitions sends a partitions creation request to a kafka broker and returns the
- // response.
- func (c *Client) CreatePartitions(ctx context.Context, req *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
- topics := make([]createpartitions.RequestTopic, len(req.Topics))
- for i, t := range req.Topics {
- topics[i] = createpartitions.RequestTopic{
- Name: t.Name,
- Count: t.Count,
- Assignments: t.assignments(),
- }
- }
- m, err := c.roundTrip(ctx, req.Addr, &createpartitions.Request{
- Topics: topics,
- TimeoutMs: c.timeoutMs(ctx, defaultCreatePartitionsTimeout),
- ValidateOnly: req.ValidateOnly,
- })
- if err != nil {
- return nil, fmt.Errorf("kafka.(*Client).CreatePartitions: %w", err)
- }
- res := m.(*createpartitions.Response)
- ret := &CreatePartitionsResponse{
- Throttle: makeDuration(res.ThrottleTimeMs),
- Errors: make(map[string]error, len(res.Results)),
- }
- for _, t := range res.Results {
- ret.Errors[t.Name] = makeError(t.ErrorCode, t.ErrorMessage)
- }
- return ret, nil
- }
- type TopicPartitionsConfig struct {
- // Topic name
- Name string
- // Topic partition's count.
- Count int32
- // TopicPartitionAssignments among kafka brokers for this topic partitions.
- TopicPartitionAssignments []TopicPartitionAssignment
- }
- func (t *TopicPartitionsConfig) assignments() []createpartitions.RequestAssignment {
- if len(t.TopicPartitionAssignments) == 0 {
- return nil
- }
- assignments := make([]createpartitions.RequestAssignment, len(t.TopicPartitionAssignments))
- for i, a := range t.TopicPartitionAssignments {
- assignments[i] = createpartitions.RequestAssignment{
- BrokerIDs: a.BrokerIDs,
- }
- }
- return assignments
- }
- type TopicPartitionAssignment struct {
- // Broker IDs
- BrokerIDs []int32
- }
|