client.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package kafka
  2. import (
  3. "context"
  4. "errors"
  5. "net"
  6. "time"
  7. "github.com/segmentio/kafka-go/protocol"
  8. )
  9. const (
  10. defaultCreateTopicsTimeout = 2 * time.Second
  11. defaultDeleteTopicsTimeout = 2 * time.Second
  12. defaultCreatePartitionsTimeout = 2 * time.Second
  13. defaultProduceTimeout = 500 * time.Millisecond
  14. defaultMaxWait = 500 * time.Millisecond
  15. )
  16. // Client is a high-level API to interract with kafka brokers.
  17. //
  18. // All methods of the Client type accept a context as first argument, which may
  19. // be used to asynchronously cancel the requests.
  20. //
  21. // Clients are safe to use concurrently from multiple goroutines, as long as
  22. // their configuration is not changed after first use.
  23. type Client struct {
  24. // Address of the kafka cluster (or specific broker) that the client will be
  25. // sending requests to.
  26. //
  27. // This field is optional, the address may be provided in each request
  28. // instead. The request address takes precedence if both were specified.
  29. Addr net.Addr
  30. // Time limit for requests sent by this client.
  31. //
  32. // If zero, no timeout is applied.
  33. Timeout time.Duration
  34. // A transport used to communicate with the kafka brokers.
  35. //
  36. // If nil, DefaultTransport is used.
  37. Transport RoundTripper
  38. }
  39. // A ConsumerGroup and Topic as these are both strings we define a type for
  40. // clarity when passing to the Client as a function argument
  41. //
  42. // N.B TopicAndGroup is currently experimental! Therefore, it is subject to
  43. // change, including breaking changes between MINOR and PATCH releases.
  44. //
  45. // DEPRECATED: this type will be removed in version 1.0, programs should
  46. // migrate to use kafka.(*Client).OffsetFetch instead.
  47. type TopicAndGroup struct {
  48. Topic string
  49. GroupId string
  50. }
  51. // ConsumerOffsets returns a map[int]int64 of partition to committed offset for
  52. // a consumer group id and topic.
  53. //
  54. // DEPRECATED: this method will be removed in version 1.0, programs should
  55. // migrate to use kafka.(*Client).OffsetFetch instead.
  56. func (c *Client) ConsumerOffsets(ctx context.Context, tg TopicAndGroup) (map[int]int64, error) {
  57. metadata, err := c.Metadata(ctx, &MetadataRequest{
  58. Topics: []string{tg.Topic},
  59. })
  60. if err != nil {
  61. return nil, err
  62. }
  63. topic := metadata.Topics[0]
  64. partitions := make([]int, len(topic.Partitions))
  65. for i := range topic.Partitions {
  66. partitions[i] = topic.Partitions[i].ID
  67. }
  68. offsets, err := c.OffsetFetch(ctx, &OffsetFetchRequest{
  69. GroupID: tg.GroupId,
  70. Topics: map[string][]int{
  71. tg.Topic: partitions,
  72. },
  73. })
  74. if err != nil {
  75. return nil, err
  76. }
  77. topicOffsets := offsets.Topics[topic.Name]
  78. partitionOffsets := make(map[int]int64, len(topicOffsets))
  79. for _, off := range topicOffsets {
  80. partitionOffsets[off.Partition] = off.CommittedOffset
  81. }
  82. return partitionOffsets, nil
  83. }
  84. func (c *Client) roundTrip(ctx context.Context, addr net.Addr, msg protocol.Message) (protocol.Message, error) {
  85. if c.Timeout > 0 {
  86. var cancel context.CancelFunc
  87. ctx, cancel = context.WithTimeout(ctx, c.Timeout)
  88. defer cancel()
  89. }
  90. if addr == nil {
  91. if addr = c.Addr; addr == nil {
  92. return nil, errors.New("no address was given for the kafka cluster in the request or on the client")
  93. }
  94. }
  95. return c.transport().RoundTrip(ctx, addr, msg)
  96. }
  97. func (c *Client) transport() RoundTripper {
  98. if c.Transport != nil {
  99. return c.Transport
  100. }
  101. return DefaultTransport
  102. }
  103. func (c *Client) timeout(ctx context.Context, defaultTimeout time.Duration) time.Duration {
  104. timeout := c.Timeout
  105. if deadline, ok := ctx.Deadline(); ok {
  106. if remain := time.Until(deadline); remain < timeout {
  107. timeout = remain
  108. }
  109. }
  110. if timeout > 0 {
  111. // Half the timeout because it is communicated to kafka in multiple
  112. // requests (e.g. Fetch, Produce, etc...), this adds buffer to account
  113. // for network latency when waiting for the response from kafka.
  114. return timeout / 2
  115. }
  116. return defaultTimeout
  117. }
  118. func (c *Client) timeoutMs(ctx context.Context, defaultTimeout time.Duration) int32 {
  119. return milliseconds(c.timeout(ctx, defaultTimeout))
  120. }