123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- package kafka
- import (
- "context"
- "fmt"
- "net"
- "time"
- "github.com/segmentio/kafka-go/protocol/alterconfigs"
- )
- // AlterConfigsRequest represents a request sent to a kafka broker to alter configs
- type AlterConfigsRequest struct {
- // Address of the kafka broker to send the request to.
- Addr net.Addr
- // List of resources to update.
- Resources []AlterConfigRequestResource
- // When set to true, topics are not created but the configuration is
- // validated as if they were.
- ValidateOnly bool
- }
- type AlterConfigRequestResource struct {
- // Resource Type
- ResourceType ResourceType
- // Resource Name
- ResourceName string
- // Configs is a list of configuration updates.
- Configs []AlterConfigRequestConfig
- }
- type AlterConfigRequestConfig struct {
- // Configuration key name
- Name string
- // The value to set for the configuration key.
- Value string
- }
- // AlterConfigsResponse represents a response from a kafka broker to an alter config request.
- type AlterConfigsResponse struct {
- // Duration for which the request was throttled due to a quota violation.
- Throttle time.Duration
- // Mapping of topic names to errors that occurred while attempting to create
- // the topics.
- //
- // The errors contain the kafka error code. Programs may use the standard
- // errors.Is function to test the error against kafka error codes.
- Errors map[AlterConfigsResponseResource]error
- }
- // AlterConfigsResponseResource
- type AlterConfigsResponseResource struct {
- Type int8
- Name string
- }
- // AlterConfigs sends a config altering request to a kafka broker and returns the
- // response.
- func (c *Client) AlterConfigs(ctx context.Context, req *AlterConfigsRequest) (*AlterConfigsResponse, error) {
- resources := make([]alterconfigs.RequestResources, len(req.Resources))
- for i, t := range req.Resources {
- configs := make([]alterconfigs.RequestConfig, len(t.Configs))
- for j, v := range t.Configs {
- configs[j] = alterconfigs.RequestConfig{
- Name: v.Name,
- Value: v.Value,
- }
- }
- resources[i] = alterconfigs.RequestResources{
- ResourceType: int8(t.ResourceType),
- ResourceName: t.ResourceName,
- Configs: configs,
- }
- }
- m, err := c.roundTrip(ctx, req.Addr, &alterconfigs.Request{
- Resources: resources,
- ValidateOnly: req.ValidateOnly,
- })
- if err != nil {
- return nil, fmt.Errorf("kafka.(*Client).AlterConfigs: %w", err)
- }
- res := m.(*alterconfigs.Response)
- ret := &AlterConfigsResponse{
- Throttle: makeDuration(res.ThrottleTimeMs),
- Errors: make(map[AlterConfigsResponseResource]error, len(res.Responses)),
- }
- for _, t := range res.Responses {
- ret.Errors[AlterConfigsResponseResource{
- Type: t.ResourceType,
- Name: t.ResourceName,
- }] = makeError(t.ErrorCode, t.ErrorMessage)
- }
- return ret, nil
- }
|