123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- package kafka
- import (
- "context"
- "errors"
- "net"
- "time"
- "github.com/segmentio/kafka-go/protocol"
- )
- const (
- defaultCreateTopicsTimeout = 2 * time.Second
- defaultDeleteTopicsTimeout = 2 * time.Second
- defaultCreatePartitionsTimeout = 2 * time.Second
- defaultProduceTimeout = 500 * time.Millisecond
- defaultMaxWait = 500 * time.Millisecond
- )
- // Client is a high-level API to interract with kafka brokers.
- //
- // All methods of the Client type accept a context as first argument, which may
- // be used to asynchronously cancel the requests.
- //
- // Clients are safe to use concurrently from multiple goroutines, as long as
- // their configuration is not changed after first use.
- type Client struct {
- // Address of the kafka cluster (or specific broker) that the client will be
- // sending requests to.
- //
- // This field is optional, the address may be provided in each request
- // instead. The request address takes precedence if both were specified.
- Addr net.Addr
- // Time limit for requests sent by this client.
- //
- // If zero, no timeout is applied.
- Timeout time.Duration
- // A transport used to communicate with the kafka brokers.
- //
- // If nil, DefaultTransport is used.
- Transport RoundTripper
- }
- // A ConsumerGroup and Topic as these are both strings we define a type for
- // clarity when passing to the Client as a function argument
- //
- // N.B TopicAndGroup is currently experimental! Therefore, it is subject to
- // change, including breaking changes between MINOR and PATCH releases.
- //
- // DEPRECATED: this type will be removed in version 1.0, programs should
- // migrate to use kafka.(*Client).OffsetFetch instead.
- type TopicAndGroup struct {
- Topic string
- GroupId string
- }
- // ConsumerOffsets returns a map[int]int64 of partition to committed offset for
- // a consumer group id and topic.
- //
- // DEPRECATED: this method will be removed in version 1.0, programs should
- // migrate to use kafka.(*Client).OffsetFetch instead.
- func (c *Client) ConsumerOffsets(ctx context.Context, tg TopicAndGroup) (map[int]int64, error) {
- metadata, err := c.Metadata(ctx, &MetadataRequest{
- Topics: []string{tg.Topic},
- })
- if err != nil {
- return nil, err
- }
- topic := metadata.Topics[0]
- partitions := make([]int, len(topic.Partitions))
- for i := range topic.Partitions {
- partitions[i] = topic.Partitions[i].ID
- }
- offsets, err := c.OffsetFetch(ctx, &OffsetFetchRequest{
- GroupID: tg.GroupId,
- Topics: map[string][]int{
- tg.Topic: partitions,
- },
- })
- if err != nil {
- return nil, err
- }
- topicOffsets := offsets.Topics[topic.Name]
- partitionOffsets := make(map[int]int64, len(topicOffsets))
- for _, off := range topicOffsets {
- partitionOffsets[off.Partition] = off.CommittedOffset
- }
- return partitionOffsets, nil
- }
- func (c *Client) roundTrip(ctx context.Context, addr net.Addr, msg protocol.Message) (protocol.Message, error) {
- if c.Timeout > 0 {
- var cancel context.CancelFunc
- ctx, cancel = context.WithTimeout(ctx, c.Timeout)
- defer cancel()
- }
- if addr == nil {
- if addr = c.Addr; addr == nil {
- return nil, errors.New("no address was given for the kafka cluster in the request or on the client")
- }
- }
- return c.transport().RoundTrip(ctx, addr, msg)
- }
- func (c *Client) transport() RoundTripper {
- if c.Transport != nil {
- return c.Transport
- }
- return DefaultTransport
- }
- func (c *Client) timeout(ctx context.Context, defaultTimeout time.Duration) time.Duration {
- timeout := c.Timeout
- if deadline, ok := ctx.Deadline(); ok {
- if remain := time.Until(deadline); remain < timeout {
- timeout = remain
- }
- }
- if timeout > 0 {
- // Half the timeout because it is communicated to kafka in multiple
- // requests (e.g. Fetch, Produce, etc...), this adds buffer to account
- // for network latency when waiting for the response from kafka.
- return timeout / 2
- }
- return defaultTimeout
- }
- func (c *Client) timeoutMs(ctx context.Context, defaultTimeout time.Duration) int32 {
- return milliseconds(c.timeout(ctx, defaultTimeout))
- }
|