incrementalalterconfigs.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package kafka
  2. import (
  3. "context"
  4. "net"
  5. "github.com/segmentio/kafka-go/protocol/incrementalalterconfigs"
  6. )
  7. type ConfigOperation int8
  8. const (
  9. ConfigOperationSet ConfigOperation = 0
  10. ConfigOperationDelete ConfigOperation = 1
  11. ConfigOperationAppend ConfigOperation = 2
  12. ConfigOperationSubtract ConfigOperation = 3
  13. )
  14. // IncrementalAlterConfigsRequest is a request to the IncrementalAlterConfigs API.
  15. type IncrementalAlterConfigsRequest struct {
  16. // Addr is the address of the kafka broker to send the request to.
  17. Addr net.Addr
  18. // Resources contains the list of resources to update configs for.
  19. Resources []IncrementalAlterConfigsRequestResource
  20. // ValidateOnly indicates whether Kafka should validate the changes without actually
  21. // applying them.
  22. ValidateOnly bool
  23. }
  24. // IncrementalAlterConfigsRequestResource contains the details of a single resource type whose
  25. // configs should be altered.
  26. type IncrementalAlterConfigsRequestResource struct {
  27. // ResourceType is the type of resource to update.
  28. ResourceType ResourceType
  29. // ResourceName is the name of the resource to update (i.e., topic name or broker ID).
  30. ResourceName string
  31. // Configs contains the list of config key/values to update.
  32. Configs []IncrementalAlterConfigsRequestConfig
  33. }
  34. // IncrementalAlterConfigsRequestConfig describes a single config key/value pair that should
  35. // be altered.
  36. type IncrementalAlterConfigsRequestConfig struct {
  37. // Name is the name of the config.
  38. Name string
  39. // Value is the value to set for this config.
  40. Value string
  41. // ConfigOperation indicates how this config should be updated (e.g., add, delete, etc.).
  42. ConfigOperation ConfigOperation
  43. }
  44. // IncrementalAlterConfigsResponse is a response from the IncrementalAlterConfigs API.
  45. type IncrementalAlterConfigsResponse struct {
  46. // Resources contains details of each resource config that was updated.
  47. Resources []IncrementalAlterConfigsResponseResource
  48. }
  49. // IncrementalAlterConfigsResponseResource contains the response details for a single resource
  50. // whose configs were updated.
  51. type IncrementalAlterConfigsResponseResource struct {
  52. // Error is set to a non-nil value if an error occurred while updating this specific
  53. // config.
  54. Error error
  55. // ResourceType is the type of resource that was updated.
  56. ResourceType ResourceType
  57. // ResourceName is the name of the resource that was updated.
  58. ResourceName string
  59. }
  60. func (c *Client) IncrementalAlterConfigs(
  61. ctx context.Context,
  62. req *IncrementalAlterConfigsRequest,
  63. ) (*IncrementalAlterConfigsResponse, error) {
  64. apiReq := &incrementalalterconfigs.Request{
  65. ValidateOnly: req.ValidateOnly,
  66. }
  67. for _, res := range req.Resources {
  68. apiRes := incrementalalterconfigs.RequestResource{
  69. ResourceType: int8(res.ResourceType),
  70. ResourceName: res.ResourceName,
  71. }
  72. for _, config := range res.Configs {
  73. apiRes.Configs = append(
  74. apiRes.Configs,
  75. incrementalalterconfigs.RequestConfig{
  76. Name: config.Name,
  77. Value: config.Value,
  78. ConfigOperation: int8(config.ConfigOperation),
  79. },
  80. )
  81. }
  82. apiReq.Resources = append(
  83. apiReq.Resources,
  84. apiRes,
  85. )
  86. }
  87. protoResp, err := c.roundTrip(
  88. ctx,
  89. req.Addr,
  90. apiReq,
  91. )
  92. if err != nil {
  93. return nil, err
  94. }
  95. resp := &IncrementalAlterConfigsResponse{}
  96. apiResp := protoResp.(*incrementalalterconfigs.Response)
  97. for _, res := range apiResp.Responses {
  98. resp.Resources = append(
  99. resp.Resources,
  100. IncrementalAlterConfigsResponseResource{
  101. Error: makeError(res.ErrorCode, res.ErrorMessage),
  102. ResourceType: ResourceType(res.ResourceType),
  103. ResourceName: res.ResourceName,
  104. },
  105. )
  106. }
  107. return resp, nil
  108. }