describeconfigs.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package kafka
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "time"
  7. "github.com/segmentio/kafka-go/protocol/describeconfigs"
  8. )
  9. // DescribeConfigsRequest represents a request sent to a kafka broker to describe configs
  10. type DescribeConfigsRequest struct {
  11. // Address of the kafka broker to send the request to.
  12. Addr net.Addr
  13. // List of resources to update.
  14. Resources []DescribeConfigRequestResource
  15. // Ignored if API version is less than v1
  16. IncludeSynonyms bool
  17. // Ignored if API version is less than v3
  18. IncludeDocumentation bool
  19. }
  20. type ResourceType int8
  21. const (
  22. // See https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java#L36
  23. ResourceTypeUnknown ResourceType = 0
  24. ResourceTypeTopic ResourceType = 2
  25. ResourceTypeBroker ResourceType = 4
  26. )
  27. type DescribeConfigRequestResource struct {
  28. // Resource Type
  29. ResourceType ResourceType
  30. // Resource Name
  31. ResourceName string
  32. // ConfigNames is a list of configurations to update.
  33. ConfigNames []string
  34. }
  35. // DescribeConfigsResponse represents a response from a kafka broker to a describe config request.
  36. type DescribeConfigsResponse struct {
  37. // The amount of time that the broker throttled the request.
  38. Throttle time.Duration
  39. // Resources
  40. Resources []DescribeConfigResponseResource
  41. }
  42. // DescribeConfigResponseResource
  43. type DescribeConfigResponseResource struct {
  44. // Resource Type
  45. ResourceType int8
  46. // Resource Name
  47. ResourceName string
  48. // Error
  49. Error error
  50. // ConfigEntries
  51. ConfigEntries []DescribeConfigResponseConfigEntry
  52. }
  53. // DescribeConfigResponseConfigEntry
  54. type DescribeConfigResponseConfigEntry struct {
  55. ConfigName string
  56. ConfigValue string
  57. ReadOnly bool
  58. // Ignored if API version is greater than v0
  59. IsDefault bool
  60. // Ignored if API version is less than v1
  61. ConfigSource int8
  62. IsSensitive bool
  63. // Ignored if API version is less than v1
  64. ConfigSynonyms []DescribeConfigResponseConfigSynonym
  65. // Ignored if API version is less than v3
  66. ConfigType int8
  67. // Ignored if API version is less than v3
  68. ConfigDocumentation string
  69. }
  70. // DescribeConfigResponseConfigSynonym
  71. type DescribeConfigResponseConfigSynonym struct {
  72. // Ignored if API version is less than v1
  73. ConfigName string
  74. // Ignored if API version is less than v1
  75. ConfigValue string
  76. // Ignored if API version is less than v1
  77. ConfigSource int8
  78. }
  79. // DescribeConfigs sends a config altering request to a kafka broker and returns the
  80. // response.
  81. func (c *Client) DescribeConfigs(ctx context.Context, req *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
  82. resources := make([]describeconfigs.RequestResource, len(req.Resources))
  83. for i, t := range req.Resources {
  84. resources[i] = describeconfigs.RequestResource{
  85. ResourceType: int8(t.ResourceType),
  86. ResourceName: t.ResourceName,
  87. ConfigNames: t.ConfigNames,
  88. }
  89. }
  90. m, err := c.roundTrip(ctx, req.Addr, &describeconfigs.Request{
  91. Resources: resources,
  92. IncludeSynonyms: req.IncludeSynonyms,
  93. IncludeDocumentation: req.IncludeDocumentation,
  94. })
  95. if err != nil {
  96. return nil, fmt.Errorf("kafka.(*Client).DescribeConfigs: %w", err)
  97. }
  98. res := m.(*describeconfigs.Response)
  99. ret := &DescribeConfigsResponse{
  100. Throttle: makeDuration(res.ThrottleTimeMs),
  101. Resources: make([]DescribeConfigResponseResource, len(res.Resources)),
  102. }
  103. for i, t := range res.Resources {
  104. configEntries := make([]DescribeConfigResponseConfigEntry, len(t.ConfigEntries))
  105. for j, v := range t.ConfigEntries {
  106. configSynonyms := make([]DescribeConfigResponseConfigSynonym, len(v.ConfigSynonyms))
  107. for k, cs := range v.ConfigSynonyms {
  108. configSynonyms[k] = DescribeConfigResponseConfigSynonym{
  109. ConfigName: cs.ConfigName,
  110. ConfigValue: cs.ConfigValue,
  111. ConfigSource: cs.ConfigSource,
  112. }
  113. }
  114. configEntries[j] = DescribeConfigResponseConfigEntry{
  115. ConfigName: v.ConfigName,
  116. ConfigValue: v.ConfigValue,
  117. ReadOnly: v.ReadOnly,
  118. ConfigSource: v.ConfigSource,
  119. IsDefault: v.IsDefault,
  120. IsSensitive: v.IsSensitive,
  121. ConfigSynonyms: configSynonyms,
  122. ConfigType: v.ConfigType,
  123. ConfigDocumentation: v.ConfigDocumentation,
  124. }
  125. }
  126. ret.Resources[i] = DescribeConfigResponseResource{
  127. ResourceType: t.ResourceType,
  128. ResourceName: t.ResourceName,
  129. Error: makeError(t.ErrorCode, t.ErrorMessage),
  130. ConfigEntries: configEntries,
  131. }
  132. }
  133. return ret, nil
  134. }