1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297 |
- package kafka
- import (
- "context"
- "crypto/tls"
- "errors"
- "fmt"
- "io"
- "math/rand"
- "net"
- "runtime/pprof"
- "sort"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "github.com/segmentio/kafka-go/protocol"
- "github.com/segmentio/kafka-go/protocol/apiversions"
- "github.com/segmentio/kafka-go/protocol/createtopics"
- "github.com/segmentio/kafka-go/protocol/findcoordinator"
- meta "github.com/segmentio/kafka-go/protocol/metadata"
- "github.com/segmentio/kafka-go/protocol/saslauthenticate"
- "github.com/segmentio/kafka-go/protocol/saslhandshake"
- "github.com/segmentio/kafka-go/sasl"
- )
- // Request is an interface implemented by types that represent messages sent
- // from kafka clients to brokers.
- type Request = protocol.Message
- // Response is an interface implemented by types that represent messages sent
- // from kafka brokers in response to client requests.
- type Response = protocol.Message
- // RoundTripper is an interface implemented by types which support interacting
- // with kafka brokers.
- type RoundTripper interface {
- // RoundTrip sends a request to a kafka broker and returns the response that
- // was received, or a non-nil error.
- //
- // The context passed as first argument can be used to asynchronnously abort
- // the call if needed.
- RoundTrip(context.Context, net.Addr, Request) (Response, error)
- }
- // Transport is an implementation of the RoundTripper interface.
- //
- // Transport values manage a pool of connections and automatically discovers the
- // clusters layout to route requests to the appropriate brokers.
- //
- // Transport values are safe to use concurrently from multiple goroutines.
- //
- // Note: The intent is for the Transport to become the underlying layer of the
- // kafka.Reader and kafka.Writer types.
- type Transport struct {
- // A function used to establish connections to the kafka cluster.
- Dial func(context.Context, string, string) (net.Conn, error)
- // Time limit set for establishing connections to the kafka cluster. This
- // limit includes all round trips done to establish the connections (TLS
- // hadbhaske, SASL negotiation, etc...).
- //
- // Defaults to 5s.
- DialTimeout time.Duration
- // Maximum amount of time that connections will remain open and unused.
- // The transport will manage to automatically close connections that have
- // been idle for too long, and re-open them on demand when the transport is
- // used again.
- //
- // Defaults to 30s.
- IdleTimeout time.Duration
- // TTL for the metadata cached by this transport. Note that the value
- // configured here is an upper bound, the transport randomizes the TTLs to
- // avoid getting into states where multiple clients end up synchronized and
- // cause bursts of requests to the kafka broker.
- //
- // Default to 6s.
- MetadataTTL time.Duration
- // Unique identifier that the transport communicates to the brokers when it
- // sends requests.
- ClientID string
- // An optional configuration for TLS connections established by this
- // transport.
- //
- // If the Server
- TLS *tls.Config
- // SASL configures the Transfer to use SASL authentication.
- SASL sasl.Mechanism
- // An optional resolver used to translate broker host names into network
- // addresses.
- //
- // The resolver will be called for every request (not every connection),
- // making it possible to implement ACL policies by validating that the
- // program is allowed to connect to the kafka broker. This also means that
- // the resolver should probably provide a caching layer to avoid storming
- // the service discovery backend with requests.
- //
- // When set, the Dial function is not responsible for performing name
- // resolution, and is always called with a pre-resolved address.
- Resolver BrokerResolver
- // The background context used to control goroutines started internally by
- // the transport.
- //
- // If nil, context.Background() is used instead.
- Context context.Context
- mutex sync.RWMutex
- pools map[networkAddress]*connPool
- }
- // DefaultTransport is the default transport used by kafka clients in this
- // package.
- var DefaultTransport RoundTripper = &Transport{
- Dial: (&net.Dialer{
- Timeout: 3 * time.Second,
- DualStack: true,
- }).DialContext,
- }
- // CloseIdleConnections closes all idle connections immediately, and marks all
- // connections that are in use to be closed when they become idle again.
- func (t *Transport) CloseIdleConnections() {
- t.mutex.Lock()
- defer t.mutex.Unlock()
- for _, pool := range t.pools {
- pool.unref()
- }
- for k := range t.pools {
- delete(t.pools, k)
- }
- }
- // RoundTrip sends a request to a kafka cluster and returns the response, or an
- // error if no responses were received.
- //
- // Message types are available in sub-packages of the protocol package. Each
- // kafka API is implemented in a different sub-package. For example, the request
- // and response types for the Fetch API are available in the protocol/fetch
- // package.
- //
- // The type of the response message will match the type of the request. For
- // exmple, if RoundTrip was called with a *fetch.Request as argument, the value
- // returned will be of type *fetch.Response. It is safe for the program to do a
- // type assertion after checking that no error was returned.
- //
- // This example illustrates the way this method is expected to be used:
- //
- // r, err := transport.RoundTrip(ctx, addr, &fetch.Request{ ... })
- // if err != nil {
- // ...
- // } else {
- // res := r.(*fetch.Response)
- // ...
- // }
- //
- // The transport automatically selects the highest version of the API that is
- // supported by both the kafka-go package and the kafka broker. The negotiation
- // happens transparently once when connections are established.
- //
- // This API was introduced in version 0.4 as a way to leverage the lower-level
- // features of the kafka protocol, but also provide a more efficient way of
- // managing connections to kafka brokers.
- func (t *Transport) RoundTrip(ctx context.Context, addr net.Addr, req Request) (Response, error) {
- p := t.grabPool(addr)
- defer p.unref()
- return p.roundTrip(ctx, req)
- }
- func (t *Transport) dial() func(context.Context, string, string) (net.Conn, error) {
- if t.Dial != nil {
- return t.Dial
- }
- return defaultDialer.DialContext
- }
- func (t *Transport) dialTimeout() time.Duration {
- if t.DialTimeout > 0 {
- return t.DialTimeout
- }
- return 5 * time.Second
- }
- func (t *Transport) idleTimeout() time.Duration {
- if t.IdleTimeout > 0 {
- return t.IdleTimeout
- }
- return 30 * time.Second
- }
- func (t *Transport) metadataTTL() time.Duration {
- if t.MetadataTTL > 0 {
- return t.MetadataTTL
- }
- return 6 * time.Second
- }
- func (t *Transport) grabPool(addr net.Addr) *connPool {
- k := networkAddress{
- network: addr.Network(),
- address: addr.String(),
- }
- t.mutex.RLock()
- p := t.pools[k]
- if p != nil {
- p.ref()
- }
- t.mutex.RUnlock()
- if p != nil {
- return p
- }
- t.mutex.Lock()
- defer t.mutex.Unlock()
- if p := t.pools[k]; p != nil {
- p.ref()
- return p
- }
- ctx, cancel := context.WithCancel(t.context())
- p = &connPool{
- refc: 2,
- dial: t.dial(),
- dialTimeout: t.dialTimeout(),
- idleTimeout: t.idleTimeout(),
- metadataTTL: t.metadataTTL(),
- clientID: t.ClientID,
- tls: t.TLS,
- sasl: t.SASL,
- resolver: t.Resolver,
- ready: make(event),
- wake: make(chan event),
- conns: make(map[int32]*connGroup),
- cancel: cancel,
- }
- p.ctrl = p.newConnGroup(addr)
- go p.discover(ctx, p.wake)
- if t.pools == nil {
- t.pools = make(map[networkAddress]*connPool)
- }
- t.pools[k] = p
- return p
- }
- func (t *Transport) context() context.Context {
- if t.Context != nil {
- return t.Context
- }
- return context.Background()
- }
- type event chan struct{}
- func (e event) trigger() { close(e) }
- type connPool struct {
- refc uintptr
- // Immutable fields of the connection pool. Connections access these field
- // on their parent pool in a ready-only fashion, so no synchronization is
- // required.
- dial func(context.Context, string, string) (net.Conn, error)
- dialTimeout time.Duration
- idleTimeout time.Duration
- metadataTTL time.Duration
- clientID string
- tls *tls.Config
- sasl sasl.Mechanism
- resolver BrokerResolver
- // Signaling mechanisms to orchestrate communications between the pool and
- // the rest of the program.
- once sync.Once // ensure that `ready` is triggered only once
- ready event // triggered after the first metadata update
- wake chan event // used to force metadata updates
- cancel context.CancelFunc
- // Mutable fields of the connection pool, access must be synchronized.
- mutex sync.RWMutex
- conns map[int32]*connGroup // data connections used for produce/fetch/etc...
- ctrl *connGroup // control connections used for metadata requests
- state atomic.Value // cached cluster state
- }
- type connPoolState struct {
- metadata *meta.Response // last metadata response seen by the pool
- err error // last error from metadata requests
- layout protocol.Cluster // cluster layout built from metadata response
- }
- func (p *connPool) grabState() connPoolState {
- state, _ := p.state.Load().(connPoolState)
- return state
- }
- func (p *connPool) setState(state connPoolState) {
- p.state.Store(state)
- }
- func (p *connPool) ref() {
- atomic.AddUintptr(&p.refc, +1)
- }
- func (p *connPool) unref() {
- if atomic.AddUintptr(&p.refc, ^uintptr(0)) == 0 {
- p.mutex.Lock()
- defer p.mutex.Unlock()
- for _, conns := range p.conns {
- conns.closeIdleConns()
- }
- p.ctrl.closeIdleConns()
- p.cancel()
- }
- }
- func (p *connPool) roundTrip(ctx context.Context, req Request) (Response, error) {
- // This first select should never block after the first metadata response
- // that would mark the pool as `ready`.
- select {
- case <-p.ready:
- case <-ctx.Done():
- return nil, ctx.Err()
- }
- var expectTopics []string
- defer func() {
- if len(expectTopics) != 0 {
- p.refreshMetadata(ctx, expectTopics)
- }
- }()
- state := p.grabState()
- var response promise
- switch m := req.(type) {
- case *meta.Request:
- // We serve metadata requests directly from the transport cache.
- //
- // This reduces the number of round trips to kafka brokers while keeping
- // the logic simple when applying partitioning strategies.
- if state.err != nil {
- return nil, state.err
- }
- return filterMetadataResponse(m, state.metadata), nil
- case *createtopics.Request:
- // Force an update of the metadata when adding topics,
- // otherwise the cached state would get out of sync.
- expectTopics = make([]string, len(m.Topics))
- for i := range m.Topics {
- expectTopics[i] = m.Topics[i].Name
- }
- case protocol.Splitter:
- // Messages that implement the Splitter interface trigger the creation of
- // multiple requests that are all merged back into a single results by
- // a merger.
- messages, merger, err := m.Split(state.layout)
- if err != nil {
- return nil, err
- }
- promises := make([]promise, len(messages))
- for i, m := range messages {
- promises[i] = p.sendRequest(ctx, m, state)
- }
- response = join(promises, messages, merger)
- }
- if response == nil {
- response = p.sendRequest(ctx, req, state)
- }
- return response.await(ctx)
- }
- // refreshMetadata forces an update of the cached cluster metadata, and waits
- // for the given list of topics to appear. This waiting mechanism is necessary
- // to account for the fact that topic creation is asynchronous in kafka, and
- // causes subsequent requests to fail while the cluster state is propagated to
- // all the brokers.
- func (p *connPool) refreshMetadata(ctx context.Context, expectTopics []string) {
- minBackoff := 100 * time.Millisecond
- maxBackoff := 2 * time.Second
- cancel := ctx.Done()
- for ctx.Err() == nil {
- notify := make(event)
- select {
- case <-cancel:
- return
- case p.wake <- notify:
- select {
- case <-notify:
- case <-cancel:
- return
- }
- }
- state := p.grabState()
- found := 0
- for _, topic := range expectTopics {
- if _, ok := state.layout.Topics[topic]; ok {
- found++
- }
- }
- if found == len(expectTopics) {
- return
- }
- if delay := time.Duration(rand.Int63n(int64(minBackoff))); delay > 0 {
- timer := time.NewTimer(minBackoff)
- select {
- case <-cancel:
- case <-timer.C:
- }
- timer.Stop()
- if minBackoff *= 2; minBackoff > maxBackoff {
- minBackoff = maxBackoff
- }
- }
- }
- }
- func (p *connPool) setReady() {
- p.once.Do(p.ready.trigger)
- }
- // update is called periodically by the goroutine running the discover method
- // to refresh the cluster layout information used by the transport to route
- // requests to brokers.
- func (p *connPool) update(ctx context.Context, metadata *meta.Response, err error) {
- var layout protocol.Cluster
- if metadata != nil {
- metadata.ThrottleTimeMs = 0
- // Normalize the lists so we can apply binary search on them.
- sortMetadataBrokers(metadata.Brokers)
- sortMetadataTopics(metadata.Topics)
- for i := range metadata.Topics {
- t := &metadata.Topics[i]
- sortMetadataPartitions(t.Partitions)
- }
- layout = makeLayout(metadata)
- }
- state := p.grabState()
- addBrokers := make(map[int32]struct{})
- delBrokers := make(map[int32]struct{})
- if err != nil {
- // Only update the error on the transport if the cluster layout was
- // unknown. This ensures that we prioritize a previously known state
- // of the cluster to reduce the impact of transient failures.
- if state.metadata != nil {
- return
- }
- state.err = err
- } else {
- for id, b2 := range layout.Brokers {
- if b1, ok := state.layout.Brokers[id]; !ok {
- addBrokers[id] = struct{}{}
- } else if b1 != b2 {
- addBrokers[id] = struct{}{}
- delBrokers[id] = struct{}{}
- }
- }
- for id := range state.layout.Brokers {
- if _, ok := layout.Brokers[id]; !ok {
- delBrokers[id] = struct{}{}
- }
- }
- state.metadata, state.layout = metadata, layout
- state.err = nil
- }
- defer p.setReady()
- defer p.setState(state)
- if len(addBrokers) != 0 || len(delBrokers) != 0 {
- // Only acquire the lock when there is a change of layout. This is an
- // infrequent event so we don't risk introducing regular contention on
- // the mutex if we were to lock it on every update.
- p.mutex.Lock()
- defer p.mutex.Unlock()
- if ctx.Err() != nil {
- return // the pool has been closed, no need to update
- }
- for id := range delBrokers {
- if broker := p.conns[id]; broker != nil {
- broker.closeIdleConns()
- delete(p.conns, id)
- }
- }
- for id := range addBrokers {
- broker := layout.Brokers[id]
- p.conns[id] = p.newBrokerConnGroup(Broker{
- Rack: broker.Rack,
- Host: broker.Host,
- Port: int(broker.Port),
- ID: int(broker.ID),
- })
- }
- }
- }
- // discover is the entry point of an internal goroutine for the transport which
- // periodically requests updates of the cluster metadata and refreshes the
- // transport cached cluster layout.
- func (p *connPool) discover(ctx context.Context, wake <-chan event) {
- prng := rand.New(rand.NewSource(time.Now().UnixNano()))
- metadataTTL := func() time.Duration {
- return time.Duration(prng.Int63n(int64(p.metadataTTL)))
- }
- timer := time.NewTimer(metadataTTL())
- defer timer.Stop()
- var notify event
- done := ctx.Done()
- for {
- c, err := p.grabClusterConn(ctx)
- if err != nil {
- p.update(ctx, nil, err)
- } else {
- res := make(async, 1)
- req := &meta.Request{
- IncludeClusterAuthorizedOperations: true,
- IncludeTopicAuthorizedOperations: true,
- }
- deadline, cancel := context.WithTimeout(ctx, p.metadataTTL)
- c.reqs <- connRequest{
- ctx: deadline,
- req: req,
- res: res,
- }
- r, err := res.await(deadline)
- cancel()
- if err != nil && err == ctx.Err() {
- return
- }
- ret, _ := r.(*meta.Response)
- p.update(ctx, ret, err)
- }
- if notify != nil {
- notify.trigger()
- notify = nil
- }
- select {
- case <-timer.C:
- timer.Reset(metadataTTL())
- case <-done:
- return
- case notify = <-wake:
- }
- }
- }
- // grabBrokerConn returns a connection to a specific broker represented by the
- // broker id passed as argument. If the broker id was not known, an error is
- // returned.
- func (p *connPool) grabBrokerConn(ctx context.Context, brokerID int32) (*conn, error) {
- p.mutex.RLock()
- g := p.conns[brokerID]
- p.mutex.RUnlock()
- if g == nil {
- return nil, BrokerNotAvailable
- }
- return g.grabConnOrConnect(ctx)
- }
- // grabClusterConn returns the connection to the kafka cluster that the pool is
- // configured to connect to.
- //
- // The transport uses a shared `control` connection to the cluster for any
- // requests that aren't supposed to be sent to specific brokers (e.g. Fetch or
- // Produce requests). Requests intended to be routed to specific brokers are
- // dispatched on a separate pool of connections that the transport maintains.
- // This split help avoid head-of-line blocking situations where control requests
- // like Metadata would be queued behind large responses from Fetch requests for
- // example.
- //
- // In either cases, the requests are multiplexed so we can keep a minimal number
- // of connections open (N+1, where N is the number of brokers in the cluster).
- func (p *connPool) grabClusterConn(ctx context.Context) (*conn, error) {
- return p.ctrl.grabConnOrConnect(ctx)
- }
- func (p *connPool) sendRequest(ctx context.Context, req Request, state connPoolState) promise {
- brokerID := int32(-1)
- switch m := req.(type) {
- case protocol.BrokerMessage:
- // Some requests are supposed to be sent to specific brokers (e.g. the
- // partition leaders). They implement the BrokerMessage interface to
- // delegate the routing decision to each message type.
- broker, err := m.Broker(state.layout)
- if err != nil {
- return reject(err)
- }
- brokerID = broker.ID
- case protocol.GroupMessage:
- // Some requests are supposed to be sent to a group coordinator,
- // look up which broker is currently the coordinator for the group
- // so we can get a connection to that broker.
- //
- // TODO: should we cache the coordinator info?
- p := p.sendRequest(ctx, &findcoordinator.Request{Key: m.Group()}, state)
- r, err := p.await(ctx)
- if err != nil {
- return reject(err)
- }
- brokerID = r.(*findcoordinator.Response).NodeID
- }
- var c *conn
- var err error
- if brokerID >= 0 {
- c, err = p.grabBrokerConn(ctx, brokerID)
- } else {
- c, err = p.grabClusterConn(ctx)
- }
- if err != nil {
- return reject(err)
- }
- res := make(async, 1)
- c.reqs <- connRequest{
- ctx: ctx,
- req: req,
- res: res,
- }
- return res
- }
- func filterMetadataResponse(req *meta.Request, res *meta.Response) *meta.Response {
- ret := *res
- if req.TopicNames != nil {
- ret.Topics = make([]meta.ResponseTopic, len(req.TopicNames))
- for i, topicName := range req.TopicNames {
- j, ok := findMetadataTopic(res.Topics, topicName)
- if ok {
- ret.Topics[i] = res.Topics[j]
- } else {
- ret.Topics[i] = meta.ResponseTopic{
- ErrorCode: int16(UnknownTopicOrPartition),
- Name: topicName,
- }
- }
- }
- }
- return &ret
- }
- func findMetadataTopic(topics []meta.ResponseTopic, topicName string) (int, bool) {
- i := sort.Search(len(topics), func(i int) bool {
- return topics[i].Name >= topicName
- })
- return i, i >= 0 && i < len(topics) && topics[i].Name == topicName
- }
- func sortMetadataBrokers(brokers []meta.ResponseBroker) {
- sort.Slice(brokers, func(i, j int) bool {
- return brokers[i].NodeID < brokers[j].NodeID
- })
- }
- func sortMetadataTopics(topics []meta.ResponseTopic) {
- sort.Slice(topics, func(i, j int) bool {
- return topics[i].Name < topics[j].Name
- })
- }
- func sortMetadataPartitions(partitions []meta.ResponsePartition) {
- sort.Slice(partitions, func(i, j int) bool {
- return partitions[i].PartitionIndex < partitions[j].PartitionIndex
- })
- }
- func makeLayout(metadataResponse *meta.Response) protocol.Cluster {
- layout := protocol.Cluster{
- Controller: metadataResponse.ControllerID,
- Brokers: make(map[int32]protocol.Broker),
- Topics: make(map[string]protocol.Topic),
- }
- for _, broker := range metadataResponse.Brokers {
- layout.Brokers[broker.NodeID] = protocol.Broker{
- Rack: broker.Rack,
- Host: broker.Host,
- Port: broker.Port,
- ID: broker.NodeID,
- }
- }
- for _, topic := range metadataResponse.Topics {
- if topic.IsInternal {
- continue // TODO: do we need to expose those?
- }
- layout.Topics[topic.Name] = protocol.Topic{
- Name: topic.Name,
- Error: topic.ErrorCode,
- Partitions: makePartitions(topic.Partitions),
- }
- }
- return layout
- }
- func makePartitions(metadataPartitions []meta.ResponsePartition) map[int32]protocol.Partition {
- protocolPartitions := make(map[int32]protocol.Partition, len(metadataPartitions))
- numBrokerIDs := 0
- for _, p := range metadataPartitions {
- numBrokerIDs += len(p.ReplicaNodes) + len(p.IsrNodes) + len(p.OfflineReplicas)
- }
- // Reduce the memory footprint a bit by allocating a single buffer to write
- // all broker ids.
- brokerIDs := make([]int32, 0, numBrokerIDs)
- for _, p := range metadataPartitions {
- var rep, isr, off []int32
- brokerIDs, rep = appendBrokerIDs(brokerIDs, p.ReplicaNodes)
- brokerIDs, isr = appendBrokerIDs(brokerIDs, p.IsrNodes)
- brokerIDs, off = appendBrokerIDs(brokerIDs, p.OfflineReplicas)
- protocolPartitions[p.PartitionIndex] = protocol.Partition{
- ID: p.PartitionIndex,
- Error: p.ErrorCode,
- Leader: p.LeaderID,
- Replicas: rep,
- ISR: isr,
- Offline: off,
- }
- }
- return protocolPartitions
- }
- func appendBrokerIDs(ids, brokers []int32) ([]int32, []int32) {
- i := len(ids)
- ids = append(ids, brokers...)
- return ids, ids[i:len(ids):len(ids)]
- }
- func (p *connPool) newConnGroup(a net.Addr) *connGroup {
- return &connGroup{
- addr: a,
- pool: p,
- broker: Broker{
- ID: -1,
- },
- }
- }
- func (p *connPool) newBrokerConnGroup(broker Broker) *connGroup {
- return &connGroup{
- addr: &networkAddress{
- network: "tcp",
- address: net.JoinHostPort(broker.Host, strconv.Itoa(broker.Port)),
- },
- pool: p,
- broker: broker,
- }
- }
- type connRequest struct {
- ctx context.Context
- req Request
- res async
- }
- // The promise interface is used as a message passing abstraction to coordinate
- // between goroutines that handle requests and responses.
- type promise interface {
- // Waits until the promise is resolved, rejected, or the context canceled.
- await(context.Context) (Response, error)
- }
- // async is an implementation of the promise interface which supports resolving
- // or rejecting the await call asynchronously.
- type async chan interface{}
- func (p async) await(ctx context.Context) (Response, error) {
- select {
- case x := <-p:
- switch v := x.(type) {
- case nil:
- return nil, nil // A nil response is ok (e.g. when RequiredAcks is None)
- case Response:
- return v, nil
- case error:
- return nil, v
- default:
- panic(fmt.Errorf("BUG: promise resolved with impossible value of type %T", v))
- }
- case <-ctx.Done():
- return nil, ctx.Err()
- }
- }
- func (p async) resolve(res Response) { p <- res }
- func (p async) reject(err error) { p <- err }
- // rejected is an implementation of the promise interface which is always
- // returns an error. Values of this type are constructed using the reject
- // function.
- type rejected struct{ err error }
- func reject(err error) promise { return &rejected{err: err} }
- func (p *rejected) await(ctx context.Context) (Response, error) {
- return nil, p.err
- }
- // joined is an implementation of the promise interface which merges results
- // from multiple promises into one await call using a merger.
- type joined struct {
- promises []promise
- requests []Request
- merger protocol.Merger
- }
- func join(promises []promise, requests []Request, merger protocol.Merger) promise {
- return &joined{
- promises: promises,
- requests: requests,
- merger: merger,
- }
- }
- func (p *joined) await(ctx context.Context) (Response, error) {
- results := make([]interface{}, len(p.promises))
- for i, sub := range p.promises {
- m, err := sub.await(ctx)
- if err != nil {
- results[i] = err
- } else {
- results[i] = m
- }
- }
- return p.merger.Merge(p.requests, results)
- }
- // Default dialer used by the transport connections when no Dial function
- // was configured by the program.
- var defaultDialer = net.Dialer{
- Timeout: 3 * time.Second,
- DualStack: true,
- }
- // connGroup represents a logical connection group to a kafka broker. The
- // actual network connections are lazily open before sending requests, and
- // closed if they are unused for longer than the idle timeout.
- type connGroup struct {
- addr net.Addr
- broker Broker
- // Immutable state of the connection.
- pool *connPool
- // Shared state of the connection, this is synchronized on the mutex through
- // calls to the synchronized method. Both goroutines of the connection share
- // the state maintained in these fields.
- mutex sync.Mutex
- closed bool
- idleConns []*conn // stack of idle connections
- }
- func (g *connGroup) closeIdleConns() {
- g.mutex.Lock()
- conns := g.idleConns
- g.idleConns = nil
- g.closed = true
- g.mutex.Unlock()
- for _, c := range conns {
- c.close()
- }
- }
- func (g *connGroup) grabConnOrConnect(ctx context.Context) (*conn, error) {
- rslv := g.pool.resolver
- addr := g.addr
- var c *conn
- if rslv == nil {
- c = g.grabConn()
- } else {
- var err error
- broker := g.broker
- if broker.ID < 0 {
- host, port, err := net.SplitHostPort(addr.String())
- if err != nil {
- return nil, fmt.Errorf("%s: %w", addr, err)
- }
- portNumber, err := strconv.Atoi(port)
- if err != nil {
- return nil, fmt.Errorf("%s: %w", addr, err)
- }
- broker.Host = host
- broker.Port = portNumber
- }
- ipAddrs, err := rslv.LookupBrokerIPAddr(ctx, broker)
- if err != nil {
- return nil, err
- }
- for _, ipAddr := range ipAddrs {
- network := addr.Network()
- address := net.JoinHostPort(ipAddr.String(), strconv.Itoa(broker.Port))
- if c = g.grabConnTo(network, address); c != nil {
- break
- }
- }
- }
- if c == nil {
- connChan := make(chan *conn)
- errChan := make(chan error)
- go func() {
- c, err := g.connect(ctx, addr)
- if err != nil {
- select {
- case errChan <- err:
- case <-ctx.Done():
- }
- } else {
- select {
- case connChan <- c:
- case <-ctx.Done():
- if !g.releaseConn(c) {
- c.close()
- }
- }
- }
- }()
- select {
- case c = <-connChan:
- case err := <-errChan:
- return nil, err
- case <-ctx.Done():
- return nil, ctx.Err()
- }
- }
- return c, nil
- }
- func (g *connGroup) grabConnTo(network, address string) *conn {
- g.mutex.Lock()
- defer g.mutex.Unlock()
- for i := len(g.idleConns) - 1; i >= 0; i-- {
- c := g.idleConns[i]
- if c.network == network && c.address == address {
- copy(g.idleConns[i:], g.idleConns[i+1:])
- n := len(g.idleConns) - 1
- g.idleConns[n] = nil
- g.idleConns = g.idleConns[:n]
- if c.timer != nil {
- c.timer.Stop()
- }
- return c
- }
- }
- return nil
- }
- func (g *connGroup) grabConn() *conn {
- g.mutex.Lock()
- defer g.mutex.Unlock()
- if len(g.idleConns) == 0 {
- return nil
- }
- n := len(g.idleConns) - 1
- c := g.idleConns[n]
- g.idleConns[n] = nil
- g.idleConns = g.idleConns[:n]
- if c.timer != nil {
- c.timer.Stop()
- }
- return c
- }
- func (g *connGroup) removeConn(c *conn) bool {
- g.mutex.Lock()
- defer g.mutex.Unlock()
- if c.timer != nil {
- c.timer.Stop()
- }
- for i, x := range g.idleConns {
- if x == c {
- copy(g.idleConns[i:], g.idleConns[i+1:])
- n := len(g.idleConns) - 1
- g.idleConns[n] = nil
- g.idleConns = g.idleConns[:n]
- return true
- }
- }
- return false
- }
- func (g *connGroup) releaseConn(c *conn) bool {
- idleTimeout := g.pool.idleTimeout
- g.mutex.Lock()
- defer g.mutex.Unlock()
- if g.closed {
- return false
- }
- if c.timer != nil {
- c.timer.Reset(idleTimeout)
- } else {
- c.timer = time.AfterFunc(idleTimeout, func() {
- if g.removeConn(c) {
- c.close()
- }
- })
- }
- g.idleConns = append(g.idleConns, c)
- return true
- }
- func (g *connGroup) connect(ctx context.Context, addr net.Addr) (*conn, error) {
- deadline := time.Now().Add(g.pool.dialTimeout)
- ctx, cancel := context.WithDeadline(ctx, deadline)
- defer cancel()
- network := strings.Split(addr.Network(), ",")
- address := strings.Split(addr.String(), ",")
- var netConn net.Conn
- var netAddr net.Addr
- var err error
- if len(address) > 1 {
- // Shuffle the list of addresses to randomize the order in which
- // connections are attempted. This prevents routing all connections
- // to the first broker (which will usually succeed).
- rand.Shuffle(len(address), func(i, j int) {
- network[i], network[j] = network[j], network[i]
- address[i], address[j] = address[j], address[i]
- })
- }
- for i := range address {
- netConn, err = g.pool.dial(ctx, network[i], address[i])
- if err == nil {
- netAddr = &networkAddress{
- network: network[i],
- address: address[i],
- }
- break
- }
- }
- if err != nil {
- return nil, err
- }
- defer func() {
- if netConn != nil {
- netConn.Close()
- }
- }()
- if tlsConfig := g.pool.tls; tlsConfig != nil {
- if tlsConfig.ServerName == "" && !tlsConfig.InsecureSkipVerify {
- host, _, _ := net.SplitHostPort(netAddr.String())
- tlsConfig = tlsConfig.Clone()
- tlsConfig.ServerName = host
- }
- netConn = tls.Client(netConn, tlsConfig)
- }
- pc := protocol.NewConn(netConn, g.pool.clientID)
- pc.SetDeadline(deadline)
- r, err := pc.RoundTrip(new(apiversions.Request))
- if err != nil {
- return nil, err
- }
- res := r.(*apiversions.Response)
- ver := make(map[protocol.ApiKey]int16, len(res.ApiKeys))
- if res.ErrorCode != 0 {
- return nil, fmt.Errorf("negotating API versions with kafka broker at %s: %w", g.addr, Error(res.ErrorCode))
- }
- for _, r := range res.ApiKeys {
- apiKey := protocol.ApiKey(r.ApiKey)
- ver[apiKey] = apiKey.SelectVersion(r.MinVersion, r.MaxVersion)
- }
- pc.SetVersions(ver)
- pc.SetDeadline(time.Time{})
- if g.pool.sasl != nil {
- if err := authenticateSASL(ctx, pc, g.pool.sasl); err != nil {
- return nil, err
- }
- }
- reqs := make(chan connRequest)
- c := &conn{
- network: netAddr.Network(),
- address: netAddr.String(),
- reqs: reqs,
- group: g,
- }
- go c.run(pc, reqs)
- netConn = nil
- return c, nil
- }
- type conn struct {
- reqs chan<- connRequest
- network string
- address string
- once sync.Once
- group *connGroup
- timer *time.Timer
- }
- func (c *conn) close() {
- c.once.Do(func() { close(c.reqs) })
- }
- func (c *conn) run(pc *protocol.Conn, reqs <-chan connRequest) {
- defer pc.Close()
- for cr := range reqs {
- r, err := c.roundTrip(cr.ctx, pc, cr.req)
- if err != nil {
- cr.res.reject(err)
- if !errors.Is(err, protocol.ErrNoRecord) {
- break
- }
- } else {
- cr.res.resolve(r)
- }
- if !c.group.releaseConn(c) {
- break
- }
- }
- }
- func (c *conn) roundTrip(ctx context.Context, pc *protocol.Conn, req Request) (Response, error) {
- pprof.SetGoroutineLabels(ctx)
- defer pprof.SetGoroutineLabels(context.Background())
- if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
- pc.SetDeadline(deadline)
- defer pc.SetDeadline(time.Time{})
- }
- return pc.RoundTrip(req)
- }
- // authenticateSASL performs all of the required requests to authenticate this
- // connection. If any step fails, this function returns with an error. A nil
- // error indicates successful authentication.
- func authenticateSASL(ctx context.Context, pc *protocol.Conn, mechanism sasl.Mechanism) error {
- if err := saslHandshakeRoundTrip(pc, mechanism.Name()); err != nil {
- return err
- }
- sess, state, err := mechanism.Start(ctx)
- if err != nil {
- return err
- }
- for completed := false; !completed; {
- challenge, err := saslAuthenticateRoundTrip(pc, state)
- switch err {
- case nil:
- case io.EOF:
- // the broker may communicate a failed exchange by closing the
- // connection (esp. in the case where we're passing opaque sasl
- // data over the wire since there's no protocol info).
- return SASLAuthenticationFailed
- default:
- return err
- }
- completed, state, err = sess.Next(ctx, challenge)
- if err != nil {
- return err
- }
- }
- return nil
- }
- // saslHandshake sends the SASL handshake message. This will determine whether
- // the Mechanism is supported by the cluster. If it's not, this function will
- // error out with UnsupportedSASLMechanism.
- //
- // If the mechanism is unsupported, the handshake request will reply with the
- // list of the cluster's configured mechanisms, which could potentially be used
- // to facilitate negotiation. At the moment, we are not negotiating the
- // mechanism as we believe that brokers are usually known to the client, and
- // therefore the client should already know which mechanisms are supported.
- //
- // See http://kafka.apache.org/protocol.html#The_Messages_SaslHandshake
- func saslHandshakeRoundTrip(pc *protocol.Conn, mechanism string) error {
- msg, err := pc.RoundTrip(&saslhandshake.Request{
- Mechanism: mechanism,
- })
- if err != nil {
- return err
- }
- res := msg.(*saslhandshake.Response)
- if res.ErrorCode != 0 {
- err = Error(res.ErrorCode)
- }
- return err
- }
- // saslAuthenticate sends the SASL authenticate message. This function must
- // be immediately preceded by a successful saslHandshake.
- //
- // See http://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate
- func saslAuthenticateRoundTrip(pc *protocol.Conn, data []byte) ([]byte, error) {
- msg, err := pc.RoundTrip(&saslauthenticate.Request{
- AuthBytes: data,
- })
- if err != nil {
- return nil, err
- }
- res := msg.(*saslauthenticate.Response)
- if res.ErrorCode != 0 {
- err = makeError(res.ErrorCode, res.ErrorMessage)
- }
- return res.AuthBytes, err
- }
- var _ RoundTripper = (*Transport)(nil)
|