|
- package kafka
- import (
- "bufio"
- "errors"
- "fmt"
- "io"
- "math"
- "net"
- "os"
- "path/filepath"
- "runtime"
- "sync"
- "sync/atomic"
- "time"
- )
- var (
- errInvalidWriteTopic = errors.New("writes must NOT set Topic on kafka.Message")
- errInvalidWritePartition = errors.New("writes must NOT set Partition on kafka.Message")
- )
- // Conn represents a connection to a kafka broker.
- //
- // Instances of Conn are safe to use concurrently from multiple goroutines.
- type Conn struct {
- // base network connection
- conn net.Conn
- // number of inflight requests on the connection.
- inflight int32
- // offset management (synchronized on the mutex field)
- mutex sync.Mutex
- offset int64
- // read buffer (synchronized on rlock)
- rlock sync.Mutex
- rbuf bufio.Reader
- // write buffer (synchronized on wlock)
- wlock sync.Mutex
- wbuf bufio.Writer
- wb writeBuffer
- // deadline management
- wdeadline connDeadline
- rdeadline connDeadline
- // immutable values of the connection object
- clientID string
- topic string
- partition int32
- fetchMaxBytes int32
- fetchMinSize int32
- // correlation ID generator (synchronized on wlock)
- correlationID int32
- // number of replica acks required when publishing to a partition
- requiredAcks int32
- // lazily loaded API versions used by this connection
- apiVersions atomic.Value // apiVersionMap
- transactionalID *string
- }
- type apiVersionMap map[apiKey]ApiVersion
- func (v apiVersionMap) negotiate(key apiKey, sortedSupportedVersions ...apiVersion) apiVersion {
- x := v[key]
- for i := len(sortedSupportedVersions) - 1; i >= 0; i-- {
- s := sortedSupportedVersions[i]
- if apiVersion(x.MaxVersion) >= s {
- return s
- }
- }
- return -1
- }
- // ConnConfig is a configuration object used to create new instances of Conn.
- type ConnConfig struct {
- ClientID string
- Topic string
- Partition int
- // The transactional id to use for transactional delivery. Idempotent
- // deliver should be enabled if transactional id is configured.
- // For more details look at transactional.id description here: http://kafka.apache.org/documentation.html#producerconfigs
- // Empty string means that this connection can't be transactional.
- TransactionalID string
- }
- // ReadBatchConfig is a configuration object used for reading batches of messages.
- type ReadBatchConfig struct {
- MinBytes int
- MaxBytes int
- // IsolationLevel controls the visibility of transactional records.
- // ReadUncommitted makes all records visible. With ReadCommitted only
- // non-transactional and committed records are visible.
- IsolationLevel IsolationLevel
- // MaxWait is the amount of time for the broker while waiting to hit the
- // min/max byte targets. This setting is independent of any network-level
- // timeouts or deadlines.
- //
- // For backward compatibility, when this field is left zero, kafka-go will
- // infer the max wait from the connection's read deadline.
- MaxWait time.Duration
- }
- type IsolationLevel int8
- const (
- ReadUncommitted IsolationLevel = 0
- ReadCommitted IsolationLevel = 1
- )
- var (
- // DefaultClientID is the default value used as ClientID of kafka
- // connections.
- DefaultClientID string
- )
- func init() {
- progname := filepath.Base(os.Args[0])
- hostname, _ := os.Hostname()
- DefaultClientID = fmt.Sprintf("%s@%s (github.com/segmentio/kafka-go)", progname, hostname)
- }
- // NewConn returns a new kafka connection for the given topic and partition.
- func NewConn(conn net.Conn, topic string, partition int) *Conn {
- return NewConnWith(conn, ConnConfig{
- Topic: topic,
- Partition: partition,
- })
- }
- func emptyToNullable(transactionalID string) (result *string) {
- if transactionalID != "" {
- result = &transactionalID
- }
- return result
- }
- // NewConnWith returns a new kafka connection configured with config.
- // The offset is initialized to FirstOffset.
- func NewConnWith(conn net.Conn, config ConnConfig) *Conn {
- if len(config.ClientID) == 0 {
- config.ClientID = DefaultClientID
- }
- if config.Partition < 0 || config.Partition > math.MaxInt32 {
- panic(fmt.Sprintf("invalid partition number: %d", config.Partition))
- }
- c := &Conn{
- conn: conn,
- rbuf: *bufio.NewReader(conn),
- wbuf: *bufio.NewWriter(conn),
- clientID: config.ClientID,
- topic: config.Topic,
- partition: int32(config.Partition),
- offset: FirstOffset,
- requiredAcks: -1,
- transactionalID: emptyToNullable(config.TransactionalID),
- }
- c.wb.w = &c.wbuf
- // The fetch request needs to ask for a MaxBytes value that is at least
- // enough to load the control data of the response. To avoid having to
- // recompute it on every read, it is cached here in the Conn value.
- c.fetchMinSize = (fetchResponseV2{
- Topics: []fetchResponseTopicV2{{
- TopicName: config.Topic,
- Partitions: []fetchResponsePartitionV2{{
- Partition: int32(config.Partition),
- MessageSet: messageSet{{}},
- }},
- }},
- }).size()
- c.fetchMaxBytes = math.MaxInt32 - c.fetchMinSize
- return c
- }
- func (c *Conn) negotiateVersion(key apiKey, sortedSupportedVersions ...apiVersion) (apiVersion, error) {
- v, err := c.loadVersions()
- if err != nil {
- return -1, err
- }
- a := v.negotiate(key, sortedSupportedVersions...)
- if a < 0 {
- return -1, fmt.Errorf("no matching versions were found between the client and the broker for API key %d", key)
- }
- return a, nil
- }
- func (c *Conn) loadVersions() (apiVersionMap, error) {
- v, _ := c.apiVersions.Load().(apiVersionMap)
- if v != nil {
- return v, nil
- }
- brokerVersions, err := c.ApiVersions()
- if err != nil {
- return nil, err
- }
- v = make(apiVersionMap, len(brokerVersions))
- for _, a := range brokerVersions {
- v[apiKey(a.ApiKey)] = a
- }
- c.apiVersions.Store(v)
- return v, nil
- }
- // Controller requests kafka for the current controller and returns its URL
- func (c *Conn) Controller() (broker Broker, err error) {
- err = c.readOperation(
- func(deadline time.Time, id int32) error {
- return c.writeRequest(metadata, v1, id, topicMetadataRequestV1([]string{}))
- },
- func(deadline time.Time, size int) error {
- var res metadataResponseV1
- if err := c.readResponse(size, &res); err != nil {
- return err
- }
- for _, brokerMeta := range res.Brokers {
- if brokerMeta.NodeID == res.ControllerID {
- broker = Broker{ID: int(brokerMeta.NodeID),
- Port: int(brokerMeta.Port),
- Host: brokerMeta.Host,
- Rack: brokerMeta.Rack}
- break
- }
- }
- return nil
- },
- )
- return broker, err
- }
- // Brokers retrieve the broker list from the Kafka metadata
- func (c *Conn) Brokers() ([]Broker, error) {
- var brokers []Broker
- err := c.readOperation(
- func(deadline time.Time, id int32) error {
- return c.writeRequest(metadata, v1, id, topicMetadataRequestV1([]string{}))
- },
- func(deadline time.Time, size int) error {
- var res metadataResponseV1
- if err := c.readResponse(size, &res); err != nil {
- return err
- }
- brokers = make([]Broker, len(res.Brokers))
- for i, brokerMeta := range res.Brokers {
- brokers[i] = Broker{
- ID: int(brokerMeta.NodeID),
- Port: int(brokerMeta.Port),
- Host: brokerMeta.Host,
- Rack: brokerMeta.Rack,
- }
- }
- return nil
- },
- )
- return brokers, err
- }
- // DeleteTopics deletes the specified topics.
- func (c *Conn) DeleteTopics(topics ...string) error {
- _, err := c.deleteTopics(deleteTopicsRequestV0{
- Topics: topics,
- })
- return err
- }
- // describeGroups retrieves the specified groups
- //
- // See http://kafka.apache.org/protocol.html#The_Messages_DescribeGroups
- func (c *Conn) describeGroups(request describeGroupsRequestV0) (describeGroupsResponseV0, error) {
- var response describeGroupsResponseV0
- err := c.readOperation(
- func(deadline time.Time, id int32) error {
- return c.writeRequest(describeGroups, v0, id, request)
- },
- func(deadline time.Time, size int) error {
- return expectZeroSize(func() (remain int, err error) {
- return (&response).readFrom(&c.rbuf, size)
- }())
- },
- )
- if err != nil {
- return describeGroupsResponseV0{}, err
- }
- for _, group := range response.Groups {
- if group.ErrorCode != 0 {
- return describeGroupsResponseV0{}, Error(group.ErrorCode)
- }
- }
- return response, nil
- }
- // findCoordinator finds the coordinator for the specified group or transaction
- //
- // See http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator
- func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinatorResponseV0, error) {
- var response findCoordinatorResponseV0
- err := c.readOperation(
- func(deadline time.Time, id int32) error {
- return c.writeRequest(findCoordinator, v0, id, request)
- },
- func(deadline time.Time, size int) error {
- return expectZeroSize(func() (remain int, err error) {
- return (&response).readFrom(&c.rbuf, size)
- }())
- },
- )
- if err != nil {
- return findCoordinatorResponseV0{}, err
- }
- if response.ErrorCode != 0 {
- return findCoordinatorResponseV0{}, Error(response.ErrorCode)
- }
- return response, nil
- }
- // heartbeat sends a heartbeat message required by consumer groups
- //
- // See http://kafka.apache.org/protocol.html#The_Messages_Heartbeat
- func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error) {
- var response heartbeatResponseV0
- err := c.writeOperation(
- func(deadline time.Time, id int32) error {
- return c.writeRequest(heartbeat, v0, id, request)
- },
- func(deadline time.Time, size int) error {
- return expectZeroSize(func() (remain int, err error) {
- return (&response).readFrom(&c.rbuf, size)
- }())
- },
- )
- if err != nil {
- return heartbeatResponseV0{}, err
- }
- if response.ErrorCode != 0 {
- return heartbeatResponseV0{}, Error(response.ErrorCode)
- }
- return response, nil
- }
- // joinGroup attempts to join a consumer group
- //
- // See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup
- func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error) {
- var response joinGroupResponseV1
- err := c.writeOperation(
- func(deadline time.Time, id int32) error {
- return c.writeRequest(joinGroup, v1, id, request)
- },
- func(deadline time.Time, size int) error {
- return expectZeroSize(func() (remain int, err error) {
- return (&response).readFrom(&c.rbuf, size)
- }())
- },
- )
- if err != nil {
- return joinGroupResponseV1{}, err
- }
- if response.ErrorCode != 0 {
- return joinGroupResponseV1{}, Error(response.ErrorCode)
- }
- return response, nil
- }
- // leaveGroup leaves the consumer from the consumer group
- //
- // See http://kafka.apache.org/protocol.html#The_Messages_LeaveGroup
- func (c *Conn) leaveGroup(request leaveGroupRequestV0) (leaveGroupResponseV0, error) {
- var response leaveGroupResponseV0
- err := c.writeOperation(
- func(deadline time.Time, id int32) error {
- return c.writeRequest(leaveGroup, v0, id, request)
- },
- func(deadline time.Time, size int) error {
- return expectZeroSize(func() (remain int, err error) {
- return (&response).readFrom(&c.rbuf, size)
- }())
- },
- )
- if err != nil {
- return leaveGroupResponseV0{}, err
- }
- if response.ErrorCode != 0 {
- return leaveGroupResponseV0{}, Error(response.ErrorCode)
- }
- return response, nil
- }
- // listGroups lists all the consumer groups
- //
- // See http://kafka.apache.org/protocol.html#The_Messages_ListGroups
- func (c *Conn) listGroups(request listGroupsRequestV1) (listGroupsResponseV1, error) {
- var response listGroupsResponseV1
- err := c.readOperation(
- func(deadline time.Time, id int32) error {
- return c.writeRequest(listGroups, v1, id, request)
- },
- func(deadline time.Time, size int) error {
- return expectZeroSize(func() (remain int, err error) {
- return (&response).readFrom(&c.rbuf, size)
- }())
- },
- )
- if err != nil {
- return listGroupsResponseV1{}, err
- }
- if response.ErrorCode != 0 {
- return listGroupsResponseV1{}, Error(response.ErrorCode)
- }
- return response, nil
- }
- // offsetCommit commits the specified topic partition offsets
- //
- // See http://kafka.apache.org/protocol.html#The_Messages_OffsetCommit
- func (c *Conn) offsetCommit(request offsetCommitRequestV2) (offsetCommitResponseV2, error) {
- var response offsetCommitResponseV2
- err := c.writeOperation(
- func(deadline time.Time, id int32) error {
- return c.writeRequest(offsetCommit, v2, id, request)
- },
- func(deadline time.Time, size int) error {
- return expectZeroSize(func() (remain int, err error) {
- return (&response).readFrom(&c.rbuf, size)
- }())
- },
- )
- if err != nil {
- return offsetCommitResponseV2{}, err
- }
- for _, r := range response.Responses {
- for _, pr := range r.PartitionResponses {
- if pr.ErrorCode != 0 {
- return offsetCommitResponseV2{}, Error(pr.ErrorCode)
- }
- }
- }
- return response, nil
- }
- // offsetFetch fetches the offsets for the specified topic partitions.
- // -1 indicates that there is no offset saved for the partition.
- //
- // See http://kafka.apache.org/protocol.html#The_Messages_OffsetFetch
- func (c *Conn) offsetFetch(request offsetFetchRequestV1) (offsetFetchResponseV1, error) {
- var response offsetFetchResponseV1
- err := c.readOperation(
- func(deadline time.Time, id int32) error {
- return c.writeRequest(offsetFetch, v1, id, request)
- },
- func(deadline time.Time, size int) error {
- return expectZeroSize(func() (remain int, err error) {
- return (&response).readFrom(&c.rbuf, size)
- }())
- },
- )
- if err != nil {
- return offsetFetchResponseV1{}, err
- }
- for _, r := range response.Responses {
- for _, pr := range r.PartitionResponses {
- if pr.ErrorCode != 0 {
- return offsetFetchResponseV1{}, Error(pr.ErrorCode)
- }
- }
- }
- return response, nil
- }
- // syncGroup completes the handshake to join a consumer group
- //
- // See http://kafka.apache.org/protocol.html#The_Messages_SyncGroup
- func (c *Conn) syncGroup(request syncGroupRequestV0) (syncGroupResponseV0, error) {
- var response syncGroupResponseV0
- err := c.readOperation(
- func(deadline time.Time, id int32) error {
- return c.writeRequest(syncGroup, v0, id, request)
- },
- func(deadline time.Time, size int) error {
- return expectZeroSize(func() (remain int, err error) {
- return (&response).readFrom(&c.rbuf, size)
- }())
- },
- )
- if err != nil {
- return syncGroupResponseV0{}, err
- }
- if response.ErrorCode != 0 {
- return syncGroupResponseV0{}, Error(response.ErrorCode)
- }
- return response, nil
- }
- // Close closes the kafka connection.
- func (c *Conn) Close() error {
- return c.conn.Close()
- }
- // LocalAddr returns the local network address.
- func (c *Conn) LocalAddr() net.Addr {
- return c.conn.LocalAddr()
- }
- // RemoteAddr returns the remote network address.
- func (c *Conn) RemoteAddr() net.Addr {
- return c.conn.RemoteAddr()
- }
- // SetDeadline sets the read and write deadlines associated with the connection.
- // It is equivalent to calling both SetReadDeadline and SetWriteDeadline.
- //
- // A deadline is an absolute time after which I/O operations fail with a timeout
- // (see type Error) instead of blocking. The deadline applies to all future and
- // pending I/O, not just the immediately following call to Read or Write. After
- // a deadline has been exceeded, the connection may be closed if it was found to
- // be in an unrecoverable state.
- //
- // A zero value for t means I/O operations will not time out.
- func (c *Conn) SetDeadline(t time.Time) error {
- c.rdeadline.setDeadline(t)
- c.wdeadline.setDeadline(t)
- return nil
- }
- // SetReadDeadline sets the deadline for future Read calls and any
- // currently-blocked Read call.
- // A zero value for t means Read will not time out.
- func (c *Conn) SetReadDeadline(t time.Time) error {
- c.rdeadline.setDeadline(t)
- return nil
- }
- // SetWriteDeadline sets the deadline for future Write calls and any
- // currently-blocked Write call.
- // Even if write times out, it may return n > 0, indicating that some of the
- // data was successfully written.
- // A zero value for t means Write will not time out.
- func (c *Conn) SetWriteDeadline(t time.Time) error {
- c.wdeadline.setDeadline(t)
- return nil
- }
- // Offset returns the current offset of the connection as pair of integers,
- // where the first one is an offset value and the second one indicates how
- // to interpret it.
- //
- // See Seek for more details about the offset and whence values.
- func (c *Conn) Offset() (offset int64, whence int) {
- c.mutex.Lock()
- offset = c.offset
- c.mutex.Unlock()
- switch offset {
- case FirstOffset:
- offset = 0
- whence = SeekStart
- case LastOffset:
- offset = 0
- whence = SeekEnd
- default:
- whence = SeekAbsolute
- }
- return
- }
- const (
- SeekStart = 0 // Seek relative to the first offset available in the partition.
- SeekAbsolute = 1 // Seek to an absolute offset.
- SeekEnd = 2 // Seek relative to the last offset available in the partition.
- SeekCurrent = 3 // Seek relative to the current offset.
- // This flag may be combined to any of the SeekAbsolute and SeekCurrent
- // constants to skip the bound check that the connection would do otherwise.
- // Programs can use this flag to avoid making a metadata request to the kafka
- // broker to read the current first and last offsets of the partition.
- SeekDontCheck = 1 << 30
- )
- // Seek sets the offset for the next read or write operation according to whence, which
- // should be one of SeekStart, SeekAbsolute, SeekEnd, or SeekCurrent.
- // When seeking relative to the end, the offset is subtracted from the current offset.
- // Note that for historical reasons, these do not align with the usual whence constants
- // as in lseek(2) or os.Seek.
- // The method returns the new absolute offset of the connection.
- func (c *Conn) Seek(offset int64, whence int) (int64, error) {
- seekDontCheck := (whence & SeekDontCheck) != 0
- whence &= ^SeekDontCheck
- switch whence {
- case SeekStart, SeekAbsolute, SeekEnd, SeekCurrent:
- default:
- return 0, fmt.Errorf("whence must be one of 0, 1, 2, or 3. (whence = %d)", whence)
- }
- if seekDontCheck {
- if whence == SeekAbsolute {
- c.mutex.Lock()
- c.offset = offset
- c.mutex.Unlock()
- return offset, nil
- }
- if whence == SeekCurrent {
- c.mutex.Lock()
- c.offset += offset
- offset = c.offset
- c.mutex.Unlock()
- return offset, nil
- }
- }
- if whence == SeekAbsolute {
- c.mutex.Lock()
- unchanged := offset == c.offset
- c.mutex.Unlock()
- if unchanged {
- return offset, nil
- }
- }
- if whence == SeekCurrent {
- c.mutex.Lock()
- offset = c.offset + offset
- c.mutex.Unlock()
- }
- first, last, err := c.ReadOffsets()
- if err != nil {
- return 0, err
- }
- switch whence {
- case SeekStart:
- offset = first + offset
- case SeekEnd:
- offset = last - offset
- }
- if offset < first || offset > last {
- return 0, OffsetOutOfRange
- }
- c.mutex.Lock()
- c.offset = offset
- c.mutex.Unlock()
- return offset, nil
- }
- // Read reads the message at the current offset from the connection, advancing
- // the offset on success so the next call to a read method will produce the next
- // message.
- // The method returns the number of bytes read, or an error if something went
- // wrong.
- //
- // While it is safe to call Read concurrently from multiple goroutines it may
- // be hard for the program to predict the results as the connection offset will
- // be read and written by multiple goroutines, they could read duplicates, or
- // messages may be seen by only some of the goroutines.
- //
- // The method fails with io.ErrShortBuffer if the buffer passed as argument is
- // too small to hold the message value.
- //
- // This method is provided to satisfy the net.Conn interface but is much less
- // efficient than using the more general purpose ReadBatch method.
- func (c *Conn) Read(b []byte) (int, error) {
- batch := c.ReadBatch(1, len(b))
- n, err := batch.Read(b)
- return n, coalesceErrors(silentEOF(err), batch.Close())
- }
- // ReadMessage reads the message at the current offset from the connection,
- // advancing the offset on success so the next call to a read method will
- // produce the next message.
- //
- // Because this method allocate memory buffers for the message key and value
- // it is less memory-efficient than Read, but has the advantage of never
- // failing with io.ErrShortBuffer.
- //
- // While it is safe to call Read concurrently from multiple goroutines it may
- // be hard for the program to predict the results as the connection offset will
- // be read and written by multiple goroutines, they could read duplicates, or
- // messages may be seen by only some of the goroutines.
- //
- // This method is provided for convenience purposes but is much less efficient
- // than using the more general purpose ReadBatch method.
- func (c *Conn) ReadMessage(maxBytes int) (Message, error) {
- batch := c.ReadBatch(1, maxBytes)
- msg, err := batch.ReadMessage()
- return msg, coalesceErrors(silentEOF(err), batch.Close())
- }
- // ReadBatch reads a batch of messages from the kafka server. The method always
- // returns a non-nil Batch value. If an error occurred, either sending the fetch
- // request or reading the response, the error will be made available by the
- // returned value of the batch's Close method.
- //
- // While it is safe to call ReadBatch concurrently from multiple goroutines it
- // may be hard for the program to predict the results as the connection offset
- // will be read and written by multiple goroutines, they could read duplicates,
- // or messages may be seen by only some of the goroutines.
- //
- // A program doesn't specify the number of messages in wants from a batch, but
- // gives the minimum and maximum number of bytes that it wants to receive from
- // the kafka server.
- func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch {
- return c.ReadBatchWith(ReadBatchConfig{
- MinBytes: minBytes,
- MaxBytes: maxBytes,
- })
- }
- // ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured
- // with the default values in ReadBatchConfig except for minBytes and maxBytes.
- func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
- var adjustedDeadline time.Time
- var maxFetch = int(c.fetchMaxBytes)
- if cfg.MinBytes < 0 || cfg.MinBytes > maxFetch {
- return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes of %d out of [1,%d] bounds", cfg.MinBytes, maxFetch)}
- }
- if cfg.MaxBytes < 0 || cfg.MaxBytes > maxFetch {
- return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: maxBytes of %d out of [1,%d] bounds", cfg.MaxBytes, maxFetch)}
- }
- if cfg.MinBytes > cfg.MaxBytes {
- return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes (%d) > maxBytes (%d)", cfg.MinBytes, cfg.MaxBytes)}
- }
- offset, whence := c.Offset()
- offset, err := c.Seek(offset, whence|SeekDontCheck)
- if err != nil {
- return &Batch{err: dontExpectEOF(err)}
- }
- fetchVersion, err := c.negotiateVersion(fetch, v2, v5, v10)
- if err != nil {
- return &Batch{err: dontExpectEOF(err)}
- }
- id, err := c.doRequest(&c.rdeadline, func(deadline time.Time, id int32) error {
- now := time.Now()
- var timeout time.Duration
- if cfg.MaxWait > 0 {
- // explicitly-configured case: no changes are made to the deadline,
- // and the timeout is sent exactly as specified.
- timeout = cfg.MaxWait
- } else {
- // default case: use the original logic to adjust the conn's
- // deadline.T
- deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
- timeout = deadlineToTimeout(deadline, now)
- }
- // save this variable outside of the closure for later use in detecting
- // truncated messages.
- adjustedDeadline = deadline
- switch fetchVersion {
- case v10:
- return c.wb.writeFetchRequestV10(
- id,
- c.clientID,
- c.topic,
- c.partition,
- offset,
- cfg.MinBytes,
- cfg.MaxBytes+int(c.fetchMinSize),
- timeout,
- int8(cfg.IsolationLevel),
- )
- case v5:
- return c.wb.writeFetchRequestV5(
- id,
- c.clientID,
- c.topic,
- c.partition,
- offset,
- cfg.MinBytes,
- cfg.MaxBytes+int(c.fetchMinSize),
- timeout,
- int8(cfg.IsolationLevel),
- )
- default:
- return c.wb.writeFetchRequestV2(
- id,
- c.clientID,
- c.topic,
- c.partition,
- offset,
- cfg.MinBytes,
- cfg.MaxBytes+int(c.fetchMinSize),
- timeout,
- )
- }
- })
- if err != nil {
- return &Batch{err: dontExpectEOF(err)}
- }
- _, size, lock, err := c.waitResponse(&c.rdeadline, id)
- if err != nil {
- return &Batch{err: dontExpectEOF(err)}
- }
- var throttle int32
- var highWaterMark int64
- var remain int
- switch fetchVersion {
- case v10:
- throttle, highWaterMark, remain, err = readFetchResponseHeaderV10(&c.rbuf, size)
- case v5:
- throttle, highWaterMark, remain, err = readFetchResponseHeaderV5(&c.rbuf, size)
- default:
- throttle, highWaterMark, remain, err = readFetchResponseHeaderV2(&c.rbuf, size)
- }
- if err == errShortRead {
- err = checkTimeoutErr(adjustedDeadline)
- }
- var msgs *messageSetReader
- if err == nil {
- if highWaterMark == offset {
- msgs = &messageSetReader{empty: true}
- } else {
- msgs, err = newMessageSetReader(&c.rbuf, remain)
- }
- }
- if err == errShortRead {
- err = checkTimeoutErr(adjustedDeadline)
- }
- return &Batch{
- conn: c,
- msgs: msgs,
- deadline: adjustedDeadline,
- throttle: makeDuration(throttle),
- lock: lock,
- topic: c.topic, // topic is copied to Batch to prevent race with Batch.close
- partition: int(c.partition), // partition is copied to Batch to prevent race with Batch.close
- offset: offset,
- highWaterMark: highWaterMark,
- // there shouldn't be a short read on initially setting up the batch.
- // as such, any io.EOF is re-mapped to an io.ErrUnexpectedEOF so that we
- // don't accidentally signal that we successfully reached the end of the
- // batch.
- err: dontExpectEOF(err),
- }
- }
- // ReadOffset returns the offset of the first message with a timestamp equal or
- // greater to t.
- func (c *Conn) ReadOffset(t time.Time) (int64, error) {
- return c.readOffset(timestamp(t))
- }
- // ReadFirstOffset returns the first offset available on the connection.
- func (c *Conn) ReadFirstOffset() (int64, error) {
- return c.readOffset(FirstOffset)
- }
- // ReadLastOffset returns the last offset available on the connection.
- func (c *Conn) ReadLastOffset() (int64, error) {
- return c.readOffset(LastOffset)
- }
- // ReadOffsets returns the absolute first and last offsets of the topic used by
- // the connection.
- func (c *Conn) ReadOffsets() (first, last int64, err error) {
- // We have to submit two different requests to fetch the first and last
- // offsets because kafka refuses requests that ask for multiple offsets
- // on the same topic and partition.
- if first, err = c.ReadFirstOffset(); err != nil {
- return
- }
- if last, err = c.ReadLastOffset(); err != nil {
- first = 0 // don't leak the value on error
- return
- }
- return
- }
- func (c *Conn) readOffset(t int64) (offset int64, err error) {
- err = c.readOperation(
- func(deadline time.Time, id int32) error {
- return c.wb.writeListOffsetRequestV1(id, c.clientID, c.topic, c.partition, t)
- },
- func(deadline time.Time, size int) error {
- return expectZeroSize(readArrayWith(&c.rbuf, size, func(r *bufio.Reader, size int) (int, error) {
- // We skip the topic name because we've made a request for
- // a single topic.
- size, err := discardString(r, size)
- if err != nil {
- return size, err
- }
- // Reading the array of partitions, there will be only one
- // partition which gives the offset we're looking for.
- return readArrayWith(r, size, func(r *bufio.Reader, size int) (int, error) {
- var p partitionOffsetV1
- size, err := p.readFrom(r, size)
- if err != nil {
- return size, err
- }
- if p.ErrorCode != 0 {
- return size, Error(p.ErrorCode)
- }
- offset = p.Offset
- return size, nil
- })
- }))
- },
- )
- return
- }
- // ReadPartitions returns the list of available partitions for the given list of
- // topics.
- //
- // If the method is called with no topic, it uses the topic configured on the
- // connection. If there are none, the method fetches all partitions of the kafka
- // cluster.
- func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error) {
- if len(topics) == 0 {
- if len(c.topic) != 0 {
- defaultTopics := [...]string{c.topic}
- topics = defaultTopics[:]
- } else {
- // topics needs to be explicitly nil-ed out or the broker will
- // interpret it as a request for 0 partitions instead of all.
- topics = nil
- }
- }
- err = c.readOperation(
- func(deadline time.Time, id int32) error {
- return c.writeRequest(metadata, v1, id, topicMetadataRequestV1(topics))
- },
- func(deadline time.Time, size int) error {
- var res metadataResponseV1
- if err := c.readResponse(size, &res); err != nil {
- return err
- }
- brokers := make(map[int32]Broker, len(res.Brokers))
- for _, b := range res.Brokers {
- brokers[b.NodeID] = Broker{
- Host: b.Host,
- Port: int(b.Port),
- ID: int(b.NodeID),
- Rack: b.Rack,
- }
- }
- makeBrokers := func(ids ...int32) []Broker {
- b := make([]Broker, len(ids))
- for i, id := range ids {
- b[i] = brokers[id]
- }
- return b
- }
- for _, t := range res.Topics {
- if t.TopicErrorCode != 0 && (c.topic == "" || t.TopicName == c.topic) {
- // We only report errors if they happened for the topic of
- // the connection, otherwise the topic will simply have no
- // partitions in the result set.
- return Error(t.TopicErrorCode)
- }
- for _, p := range t.Partitions {
- partitions = append(partitions, Partition{
- Topic: t.TopicName,
- Leader: brokers[p.Leader],
- Replicas: makeBrokers(p.Replicas...),
- Isr: makeBrokers(p.Isr...),
- ID: int(p.PartitionID),
- })
- }
- }
- return nil
- },
- )
- return
- }
- // Write writes a message to the kafka broker that this connection was
- // established to. The method returns the number of bytes written, or an error
- // if something went wrong.
- //
- // The operation either succeeds or fail, it never partially writes the message.
- //
- // This method is exposed to satisfy the net.Conn interface but is less efficient
- // than the more general purpose WriteMessages method.
- func (c *Conn) Write(b []byte) (int, error) {
- return c.WriteCompressedMessages(nil, Message{Value: b})
- }
- // WriteMessages writes a batch of messages to the connection's topic and
- // partition, returning the number of bytes written. The write is an atomic
- // operation, it either fully succeeds or fails.
- func (c *Conn) WriteMessages(msgs ...Message) (int, error) {
- return c.WriteCompressedMessages(nil, msgs...)
- }
- // WriteCompressedMessages writes a batch of messages to the connection's topic
- // and partition, returning the number of bytes written. The write is an atomic
- // operation, it either fully succeeds or fails.
- //
- // If the compression codec is not nil, the messages will be compressed.
- func (c *Conn) WriteCompressedMessages(codec CompressionCodec, msgs ...Message) (nbytes int, err error) {
- nbytes, _, _, _, err = c.writeCompressedMessages(codec, msgs...)
- return
- }
- // WriteCompressedMessagesAt writes a batch of messages to the connection's topic
- // and partition, returning the number of bytes written, partition and offset numbers
- // and timestamp assigned by the kafka broker to the message set. The write is an atomic
- // operation, it either fully succeeds or fails.
- //
- // If the compression codec is not nil, the messages will be compressed.
- func (c *Conn) WriteCompressedMessagesAt(codec CompressionCodec, msgs ...Message) (nbytes int, partition int32, offset int64, appendTime time.Time, err error) {
- return c.writeCompressedMessages(codec, msgs...)
- }
- func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message) (nbytes int, partition int32, offset int64, appendTime time.Time, err error) {
- if len(msgs) == 0 {
- return
- }
- writeTime := time.Now()
- for i, msg := range msgs {
- // users may believe they can set the Topic and/or Partition
- // on the kafka message.
- if msg.Topic != "" && msg.Topic != c.topic {
- err = errInvalidWriteTopic
- return
- }
- if msg.Partition != 0 {
- err = errInvalidWritePartition
- return
- }
- if msg.Time.IsZero() {
- msgs[i].Time = writeTime
- }
- nbytes += len(msg.Key) + len(msg.Value)
- }
- var produceVersion apiVersion
- if produceVersion, err = c.negotiateVersion(produce, v2, v3, v7); err != nil {
- return
- }
- err = c.writeOperation(
- func(deadline time.Time, id int32) error {
- now := time.Now()
- deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
- switch produceVersion {
- case v7:
- recordBatch, err :=
- newRecordBatch(
- codec,
- msgs...,
- )
- if err != nil {
- return err
- }
- return c.wb.writeProduceRequestV7(
- id,
- c.clientID,
- c.topic,
- c.partition,
- deadlineToTimeout(deadline, now),
- int16(atomic.LoadInt32(&c.requiredAcks)),
- c.transactionalID,
- recordBatch,
- )
- case v3:
- recordBatch, err :=
- newRecordBatch(
- codec,
- msgs...,
- )
- if err != nil {
- return err
- }
- return c.wb.writeProduceRequestV3(
- id,
- c.clientID,
- c.topic,
- c.partition,
- deadlineToTimeout(deadline, now),
- int16(atomic.LoadInt32(&c.requiredAcks)),
- c.transactionalID,
- recordBatch,
- )
- default:
- return c.wb.writeProduceRequestV2(
- codec,
- id,
- c.clientID,
- c.topic,
- c.partition,
- deadlineToTimeout(deadline, now),
- int16(atomic.LoadInt32(&c.requiredAcks)),
- msgs...,
- )
- }
- },
- func(deadline time.Time, size int) error {
- return expectZeroSize(readArrayWith(&c.rbuf, size, func(r *bufio.Reader, size int) (int, error) {
- // Skip the topic, we've produced the message to only one topic,
- // no need to waste resources loading it in memory.
- size, err := discardString(r, size)
- if err != nil {
- return size, err
- }
- // Read the list of partitions, there should be only one since
- // we've produced a message to a single partition.
- size, err = readArrayWith(r, size, func(r *bufio.Reader, size int) (int, error) {
- switch produceVersion {
- case v7:
- var p produceResponsePartitionV7
- size, err := p.readFrom(r, size)
- if err == nil && p.ErrorCode != 0 {
- err = Error(p.ErrorCode)
- }
- if err == nil {
- partition = p.Partition
- offset = p.Offset
- appendTime = time.Unix(0, p.Timestamp*int64(time.Millisecond))
- }
- return size, err
- default:
- var p produceResponsePartitionV2
- size, err := p.readFrom(r, size)
- if err == nil && p.ErrorCode != 0 {
- err = Error(p.ErrorCode)
- }
- if err == nil {
- partition = p.Partition
- offset = p.Offset
- appendTime = time.Unix(0, p.Timestamp*int64(time.Millisecond))
- }
- return size, err
- }
- })
- if err != nil {
- return size, err
- }
- // The response is trailed by the throttle time, also skipping
- // since it's not interesting here.
- return discardInt32(r, size)
- }))
- },
- )
- if err != nil {
- nbytes = 0
- }
- return
- }
- // SetRequiredAcks sets the number of acknowledges from replicas that the
- // connection requests when producing messages.
- func (c *Conn) SetRequiredAcks(n int) error {
- switch n {
- case -1, 1:
- atomic.StoreInt32(&c.requiredAcks, int32(n))
- return nil
- default:
- return InvalidRequiredAcks
- }
- }
- func (c *Conn) writeRequestHeader(apiKey apiKey, apiVersion apiVersion, correlationID int32, size int32) {
- hdr := c.requestHeader(apiKey, apiVersion, correlationID)
- hdr.Size = (hdr.size() + size) - 4
- hdr.writeTo(&c.wb)
- }
- func (c *Conn) writeRequest(apiKey apiKey, apiVersion apiVersion, correlationID int32, req request) error {
- hdr := c.requestHeader(apiKey, apiVersion, correlationID)
- hdr.Size = (hdr.size() + req.size()) - 4
- hdr.writeTo(&c.wb)
- req.writeTo(&c.wb)
- return c.wbuf.Flush()
- }
- func (c *Conn) readResponse(size int, res interface{}) error {
- size, err := read(&c.rbuf, size, res)
- switch err.(type) {
- case Error:
- var e error
- if size, e = discardN(&c.rbuf, size, size); e != nil {
- err = e
- }
- }
- return expectZeroSize(size, err)
- }
- func (c *Conn) peekResponseSizeAndID() (int32, int32, error) {
- b, err := c.rbuf.Peek(8)
- if err != nil {
- return 0, 0, err
- }
- size, id := makeInt32(b[:4]), makeInt32(b[4:])
- return size, id, nil
- }
- func (c *Conn) skipResponseSizeAndID() {
- c.rbuf.Discard(8)
- }
- func (c *Conn) readDeadline() time.Time {
- return c.rdeadline.deadline()
- }
- func (c *Conn) writeDeadline() time.Time {
- return c.wdeadline.deadline()
- }
- func (c *Conn) readOperation(write func(time.Time, int32) error, read func(time.Time, int) error) error {
- return c.do(&c.rdeadline, write, read)
- }
- func (c *Conn) writeOperation(write func(time.Time, int32) error, read func(time.Time, int) error) error {
- return c.do(&c.wdeadline, write, read)
- }
- func (c *Conn) enter() {
- atomic.AddInt32(&c.inflight, +1)
- }
- func (c *Conn) leave() {
- atomic.AddInt32(&c.inflight, -1)
- }
- func (c *Conn) concurrency() int {
- return int(atomic.LoadInt32(&c.inflight))
- }
- func (c *Conn) do(d *connDeadline, write func(time.Time, int32) error, read func(time.Time, int) error) error {
- id, err := c.doRequest(d, write)
- if err != nil {
- return err
- }
- deadline, size, lock, err := c.waitResponse(d, id)
- if err != nil {
- return err
- }
- if err = read(deadline, size); err != nil {
- switch err.(type) {
- case Error:
- default:
- c.conn.Close()
- }
- }
- d.unsetConnReadDeadline()
- lock.Unlock()
- return err
- }
- func (c *Conn) doRequest(d *connDeadline, write func(time.Time, int32) error) (id int32, err error) {
- c.enter()
- c.wlock.Lock()
- c.correlationID++
- id = c.correlationID
- err = write(d.setConnWriteDeadline(c.conn), id)
- d.unsetConnWriteDeadline()
- if err != nil {
- // When an error occurs there's no way to know if the connection is in a
- // recoverable state so we're better off just giving up at this point to
- // avoid any risk of corrupting the following operations.
- c.conn.Close()
- c.leave()
- }
- c.wlock.Unlock()
- return
- }
- func (c *Conn) waitResponse(d *connDeadline, id int32) (deadline time.Time, size int, lock *sync.Mutex, err error) {
- for {
- var rsz int32
- var rid int32
- c.rlock.Lock()
- deadline = d.setConnReadDeadline(c.conn)
- rsz, rid, err = c.peekResponseSizeAndID()
- if err != nil {
- d.unsetConnReadDeadline()
- c.conn.Close()
- c.rlock.Unlock()
- break
- }
- if id == rid {
- c.skipResponseSizeAndID()
- size, lock = int(rsz-4), &c.rlock
- // Don't unlock the read mutex to yield ownership to the caller.
- break
- }
- if c.concurrency() == 1 {
- // If the goroutine is the only one waiting on this connection it
- // should be impossible to read a correlation id different from the
- // one it expects. This is a sign that the data we are reading on
- // the wire is corrupted and the connection needs to be closed.
- err = io.ErrNoProgress
- c.rlock.Unlock()
- break
- }
- // Optimistically release the read lock if a response has already
- // been received but the current operation is not the target for it.
- c.rlock.Unlock()
- runtime.Gosched()
- }
- c.leave()
- return
- }
- func (c *Conn) requestHeader(apiKey apiKey, apiVersion apiVersion, correlationID int32) requestHeader {
- return requestHeader{
- ApiKey: int16(apiKey),
- ApiVersion: int16(apiVersion),
- CorrelationID: correlationID,
- ClientID: c.clientID,
- }
- }
- func (c *Conn) ApiVersions() ([]ApiVersion, error) {
- deadline := &c.rdeadline
- if deadline.deadline().IsZero() {
- // ApiVersions is called automatically when API version negotiation
- // needs to happen, so we are not guaranteed that a read deadline has
- // been set yet. Fallback to use the write deadline in case it was
- // set, for example when version negotiation is initiated during a
- // produce request.
- deadline = &c.wdeadline
- }
- id, err := c.doRequest(deadline, func(_ time.Time, id int32) error {
- h := requestHeader{
- ApiKey: int16(apiVersions),
- ApiVersion: int16(v0),
- CorrelationID: id,
- ClientID: c.clientID,
- }
- h.Size = (h.size() - 4)
- h.writeTo(&c.wb)
- return c.wbuf.Flush()
- })
- if err != nil {
- return nil, err
- }
- _, size, lock, err := c.waitResponse(deadline, id)
- if err != nil {
- return nil, err
- }
- defer lock.Unlock()
- var errorCode int16
- if size, err = readInt16(&c.rbuf, size, &errorCode); err != nil {
- return nil, err
- }
- var arrSize int32
- if size, err = readInt32(&c.rbuf, size, &arrSize); err != nil {
- return nil, err
- }
- r := make([]ApiVersion, arrSize)
- for i := 0; i < int(arrSize); i++ {
- if size, err = readInt16(&c.rbuf, size, &r[i].ApiKey); err != nil {
- return nil, err
- }
- if size, err = readInt16(&c.rbuf, size, &r[i].MinVersion); err != nil {
- return nil, err
- }
- if size, err = readInt16(&c.rbuf, size, &r[i].MaxVersion); err != nil {
- return nil, err
- }
- }
- if errorCode != 0 {
- return r, Error(errorCode)
- }
- return r, nil
- }
- // connDeadline is a helper type to implement read/write deadline management on
- // the kafka connection.
- type connDeadline struct {
- mutex sync.Mutex
- value time.Time
- rconn net.Conn
- wconn net.Conn
- }
- func (d *connDeadline) deadline() time.Time {
- d.mutex.Lock()
- t := d.value
- d.mutex.Unlock()
- return t
- }
- func (d *connDeadline) setDeadline(t time.Time) {
- d.mutex.Lock()
- d.value = t
- if d.rconn != nil {
- d.rconn.SetReadDeadline(t)
- }
- if d.wconn != nil {
- d.wconn.SetWriteDeadline(t)
- }
- d.mutex.Unlock()
- }
- func (d *connDeadline) setConnReadDeadline(conn net.Conn) time.Time {
- d.mutex.Lock()
- deadline := d.value
- d.rconn = conn
- d.rconn.SetReadDeadline(deadline)
- d.mutex.Unlock()
- return deadline
- }
- func (d *connDeadline) setConnWriteDeadline(conn net.Conn) time.Time {
- d.mutex.Lock()
- deadline := d.value
- d.wconn = conn
- d.wconn.SetWriteDeadline(deadline)
- d.mutex.Unlock()
- return deadline
- }
- func (d *connDeadline) unsetConnReadDeadline() {
- d.mutex.Lock()
- d.rconn = nil
- d.mutex.Unlock()
- }
- func (d *connDeadline) unsetConnWriteDeadline() {
- d.mutex.Lock()
- d.wconn = nil
- d.mutex.Unlock()
- }
- // saslHandshake sends the SASL handshake message. This will determine whether
- // the Mechanism is supported by the cluster. If it's not, this function will
- // error out with UnsupportedSASLMechanism.
- //
- // If the mechanism is unsupported, the handshake request will reply with the
- // list of the cluster's configured mechanisms, which could potentially be used
- // to facilitate negotiation. At the moment, we are not negotiating the
- // mechanism as we believe that brokers are usually known to the client, and
- // therefore the client should already know which mechanisms are supported.
- //
- // See http://kafka.apache.org/protocol.html#The_Messages_SaslHandshake
- func (c *Conn) saslHandshake(mechanism string) error {
- // The wire format for V0 and V1 is identical, but the version
- // number will affect how the SASL authentication
- // challenge/responses are sent
- var resp saslHandshakeResponseV0
- version, err := c.negotiateVersion(saslHandshake, v0, v1)
- if err != nil {
- return err
- }
- err = c.writeOperation(
- func(deadline time.Time, id int32) error {
- return c.writeRequest(saslHandshake, version, id, &saslHandshakeRequestV0{Mechanism: mechanism})
- },
- func(deadline time.Time, size int) error {
- return expectZeroSize(func() (int, error) {
- return (&resp).readFrom(&c.rbuf, size)
- }())
- },
- )
- if err == nil && resp.ErrorCode != 0 {
- err = Error(resp.ErrorCode)
- }
- return err
- }
- // saslAuthenticate sends the SASL authenticate message. This function must
- // be immediately preceded by a successful saslHandshake.
- //
- // See http://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate
- func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) {
- // if we sent a v1 handshake, then we must encapsulate the authentication
- // request in a saslAuthenticateRequest. otherwise, we read and write raw
- // bytes.
- version, err := c.negotiateVersion(saslHandshake, v0, v1)
- if err != nil {
- return nil, err
- }
- if version == v1 {
- var request = saslAuthenticateRequestV0{Data: data}
- var response saslAuthenticateResponseV0
- err := c.writeOperation(
- func(deadline time.Time, id int32) error {
- return c.writeRequest(saslAuthenticate, v0, id, request)
- },
- func(deadline time.Time, size int) error {
- return expectZeroSize(func() (remain int, err error) {
- return (&response).readFrom(&c.rbuf, size)
- }())
- },
- )
- if err == nil && response.ErrorCode != 0 {
- err = Error(response.ErrorCode)
- }
- return response.Data, err
- }
- // fall back to opaque bytes on the wire. the broker is expecting these if
- // it just processed a v0 sasl handshake.
- c.wb.writeInt32(int32(len(data)))
- if _, err := c.wb.Write(data); err != nil {
- return nil, err
- }
- if err := c.wb.Flush(); err != nil {
- return nil, err
- }
- var respLen int32
- if _, err := readInt32(&c.rbuf, 4, &respLen); err != nil {
- return nil, err
- }
- resp, _, err := readNewBytes(&c.rbuf, int(respLen), int(respLen))
- return resp, err
- }
|