createpartitions.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package kafka
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "time"
  7. "github.com/segmentio/kafka-go/protocol/createpartitions"
  8. )
  9. // CreatePartitionsRequest represents a request sent to a kafka broker to create
  10. // and update topic parititions.
  11. type CreatePartitionsRequest struct {
  12. // Address of the kafka broker to send the request to.
  13. Addr net.Addr
  14. // List of topics to create and their configuration.
  15. Topics []TopicPartitionsConfig
  16. // When set to true, topics are not created but the configuration is
  17. // validated as if they were.
  18. ValidateOnly bool
  19. }
  20. // CreatePartitionsResponse represents a response from a kafka broker to a partition
  21. // creation request.
  22. type CreatePartitionsResponse struct {
  23. // The amount of time that the broker throttled the request.
  24. Throttle time.Duration
  25. // Mapping of topic names to errors that occurred while attempting to create
  26. // the topics.
  27. //
  28. // The errors contain the kafka error code. Programs may use the standard
  29. // errors.Is function to test the error against kafka error codes.
  30. Errors map[string]error
  31. }
  32. // CreatePartitions sends a partitions creation request to a kafka broker and returns the
  33. // response.
  34. func (c *Client) CreatePartitions(ctx context.Context, req *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
  35. topics := make([]createpartitions.RequestTopic, len(req.Topics))
  36. for i, t := range req.Topics {
  37. topics[i] = createpartitions.RequestTopic{
  38. Name: t.Name,
  39. Count: t.Count,
  40. Assignments: t.assignments(),
  41. }
  42. }
  43. m, err := c.roundTrip(ctx, req.Addr, &createpartitions.Request{
  44. Topics: topics,
  45. TimeoutMs: c.timeoutMs(ctx, defaultCreatePartitionsTimeout),
  46. ValidateOnly: req.ValidateOnly,
  47. })
  48. if err != nil {
  49. return nil, fmt.Errorf("kafka.(*Client).CreatePartitions: %w", err)
  50. }
  51. res := m.(*createpartitions.Response)
  52. ret := &CreatePartitionsResponse{
  53. Throttle: makeDuration(res.ThrottleTimeMs),
  54. Errors: make(map[string]error, len(res.Results)),
  55. }
  56. for _, t := range res.Results {
  57. ret.Errors[t.Name] = makeError(t.ErrorCode, t.ErrorMessage)
  58. }
  59. return ret, nil
  60. }
  61. type TopicPartitionsConfig struct {
  62. // Topic name
  63. Name string
  64. // Topic partition's count.
  65. Count int32
  66. // TopicPartitionAssignments among kafka brokers for this topic partitions.
  67. TopicPartitionAssignments []TopicPartitionAssignment
  68. }
  69. func (t *TopicPartitionsConfig) assignments() []createpartitions.RequestAssignment {
  70. if len(t.TopicPartitionAssignments) == 0 {
  71. return nil
  72. }
  73. assignments := make([]createpartitions.RequestAssignment, len(t.TopicPartitionAssignments))
  74. for i, a := range t.TopicPartitionAssignments {
  75. assignments[i] = createpartitions.RequestAssignment{
  76. BrokerIDs: a.BrokerIDs,
  77. }
  78. }
  79. return assignments
  80. }
  81. type TopicPartitionAssignment struct {
  82. // Broker IDs
  83. BrokerIDs []int32
  84. }