describeconfigs.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package describeconfigs
  2. import (
  3. "strconv"
  4. "github.com/segmentio/kafka-go/protocol"
  5. )
  6. const (
  7. resourceTypeBroker int8 = 4
  8. )
  9. func init() {
  10. protocol.Register(&Request{}, &Response{})
  11. }
  12. // Detailed API definition: https://kafka.apache.org/protocol#The_Messages_DescribeConfigs
  13. type Request struct {
  14. Resources []RequestResource `kafka:"min=v0,max=v3"`
  15. IncludeSynonyms bool `kafka:"min=v1,max=v3"`
  16. IncludeDocumentation bool `kafka:"min=v3,max=v3"`
  17. }
  18. func (r *Request) ApiKey() protocol.ApiKey { return protocol.DescribeConfigs }
  19. func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
  20. // Broker metadata requests must be sent to the associated broker
  21. for _, resource := range r.Resources {
  22. if resource.ResourceType == resourceTypeBroker {
  23. brokerID, err := strconv.Atoi(resource.ResourceName)
  24. if err != nil {
  25. return protocol.Broker{}, err
  26. }
  27. return cluster.Brokers[int32(brokerID)], nil
  28. }
  29. }
  30. return cluster.Brokers[cluster.Controller], nil
  31. }
  32. func (r *Request) Split(cluster protocol.Cluster) (
  33. []protocol.Message,
  34. protocol.Merger,
  35. error,
  36. ) {
  37. messages := []protocol.Message{}
  38. topicsMessage := Request{}
  39. for _, resource := range r.Resources {
  40. // Split out broker requests to separate brokers
  41. if resource.ResourceType == resourceTypeBroker {
  42. messages = append(messages, &Request{
  43. Resources: []RequestResource{resource},
  44. })
  45. } else {
  46. topicsMessage.Resources = append(
  47. topicsMessage.Resources, resource,
  48. )
  49. }
  50. }
  51. if len(topicsMessage.Resources) > 0 {
  52. messages = append(messages, &topicsMessage)
  53. }
  54. return messages, new(Response), nil
  55. }
  56. type RequestResource struct {
  57. ResourceType int8 `kafka:"min=v0,max=v3"`
  58. ResourceName string `kafka:"min=v0,max=v3"`
  59. ConfigNames []string `kafka:"min=v0,max=v3,nullable"`
  60. }
  61. type Response struct {
  62. ThrottleTimeMs int32 `kafka:"min=v0,max=v3"`
  63. Resources []ResponseResource `kafka:"min=v0,max=v3"`
  64. }
  65. func (r *Response) ApiKey() protocol.ApiKey { return protocol.DescribeConfigs }
  66. func (r *Response) Merge(requests []protocol.Message, results []interface{}) (
  67. protocol.Message,
  68. error,
  69. ) {
  70. response := &Response{}
  71. for _, result := range results {
  72. brokerResp := result.(*Response)
  73. response.Resources = append(
  74. response.Resources,
  75. brokerResp.Resources...,
  76. )
  77. }
  78. return response, nil
  79. }
  80. type ResponseResource struct {
  81. ErrorCode int16 `kafka:"min=v0,max=v3"`
  82. ErrorMessage string `kafka:"min=v0,max=v3,nullable"`
  83. ResourceType int8 `kafka:"min=v0,max=v3"`
  84. ResourceName string `kafka:"min=v0,max=v3"`
  85. ConfigEntries []ResponseConfigEntry `kafka:"min=v0,max=v3"`
  86. }
  87. type ResponseConfigEntry struct {
  88. ConfigName string `kafka:"min=v0,max=v3"`
  89. ConfigValue string `kafka:"min=v0,max=v3,nullable"`
  90. ReadOnly bool `kafka:"min=v0,max=v3"`
  91. IsDefault bool `kafka:"min=v0,max=v0"`
  92. ConfigSource int8 `kafka:"min=v1,max=v3"`
  93. IsSensitive bool `kafka:"min=v0,max=v3"`
  94. ConfigSynonyms []ResponseConfigSynonym `kafka:"min=v1,max=v3"`
  95. ConfigType int8 `kafka:"min=v3,max=v3"`
  96. ConfigDocumentation string `kafka:"min=v3,max=v3,nullable"`
  97. }
  98. type ResponseConfigSynonym struct {
  99. ConfigName string `kafka:"min=v1,max=v3"`
  100. ConfigValue string `kafka:"min=v1,max=v3,nullable"`
  101. ConfigSource int8 `kafka:"min=v1,max=v3"`
  102. }
  103. var (
  104. _ protocol.BrokerMessage = (*Request)(nil)
  105. )