incrementalalterconfigs.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package incrementalalterconfigs
  2. import (
  3. "errors"
  4. "strconv"
  5. "github.com/segmentio/kafka-go/protocol"
  6. )
  7. const (
  8. resourceTypeBroker int8 = 4
  9. )
  10. func init() {
  11. protocol.Register(&Request{}, &Response{})
  12. }
  13. // Detailed API definition: https://kafka.apache.org/protocol#The_Messages_IncrementalAlterConfigs
  14. type Request struct {
  15. Resources []RequestResource `kafka:"min=v0,max=v0"`
  16. ValidateOnly bool `kafka:"min=v0,max=v0"`
  17. }
  18. type RequestResource struct {
  19. ResourceType int8 `kafka:"min=v0,max=v0"`
  20. ResourceName string `kafka:"min=v0,max=v0"`
  21. Configs []RequestConfig `kafka:"min=v0,max=v0"`
  22. }
  23. type RequestConfig struct {
  24. Name string `kafka:"min=v0,max=v0"`
  25. ConfigOperation int8 `kafka:"min=v0,max=v0"`
  26. Value string `kafka:"min=v0,max=v0,nullable"`
  27. }
  28. func (r *Request) ApiKey() protocol.ApiKey { return protocol.IncrementalAlterConfigs }
  29. func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
  30. // Check that at most only one broker is being updated.
  31. //
  32. // TODO: Support updating multiple brokers in a single request.
  33. brokers := map[string]struct{}{}
  34. for _, resource := range r.Resources {
  35. if resource.ResourceType == resourceTypeBroker {
  36. brokers[resource.ResourceName] = struct{}{}
  37. }
  38. }
  39. if len(brokers) > 1 {
  40. return protocol.Broker{},
  41. errors.New("Updating more than one broker in a single request is not supported yet")
  42. }
  43. for _, resource := range r.Resources {
  44. if resource.ResourceType == resourceTypeBroker {
  45. brokerID, err := strconv.Atoi(resource.ResourceName)
  46. if err != nil {
  47. return protocol.Broker{}, err
  48. }
  49. return cluster.Brokers[int32(brokerID)], nil
  50. }
  51. }
  52. return cluster.Brokers[cluster.Controller], nil
  53. }
  54. type Response struct {
  55. ThrottleTimeMs int32 `kafka:"min=v0,max=v0"`
  56. Responses []ResponseAlterResponse `kafka:"min=v0,max=v0"`
  57. }
  58. type ResponseAlterResponse struct {
  59. ErrorCode int16 `kafka:"min=v0,max=v0"`
  60. ErrorMessage string `kafka:"min=v0,max=v0,nullable"`
  61. ResourceType int8 `kafka:"min=v0,max=v0"`
  62. ResourceName string `kafka:"min=v0,max=v0"`
  63. }
  64. func (r *Response) ApiKey() protocol.ApiKey { return protocol.IncrementalAlterConfigs }