12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556 |
- package grpc
- import (
- "context"
- "errors"
- "fmt"
- "math"
- "net"
- "reflect"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "google.golang.org/grpc/balancer"
- "google.golang.org/grpc/balancer/base"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/connectivity"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/internal/backoff"
- "google.golang.org/grpc/internal/channelz"
- "google.golang.org/grpc/internal/grpcsync"
- "google.golang.org/grpc/internal/grpcutil"
- "google.golang.org/grpc/internal/transport"
- "google.golang.org/grpc/keepalive"
- "google.golang.org/grpc/resolver"
- "google.golang.org/grpc/serviceconfig"
- "google.golang.org/grpc/status"
- _ "google.golang.org/grpc/balancer/roundrobin"
- _ "google.golang.org/grpc/internal/resolver/dns"
- _ "google.golang.org/grpc/internal/resolver/passthrough"
- )
- const (
-
- minConnectTimeout = 20 * time.Second
-
- grpclbName = "grpclb"
- )
- var (
-
-
-
-
-
- ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
-
- errConnDrain = errors.New("grpc: the connection is drained")
-
- errConnClosing = errors.New("grpc: the connection is closing")
-
-
- invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
- )
- var (
-
-
-
- errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
-
-
- errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
-
-
-
- errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
-
-
- errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
- )
- const (
- defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
- defaultClientMaxSendMessageSize = math.MaxInt32
-
- defaultWriteBufSize = 32 * 1024
- defaultReadBufSize = 32 * 1024
- )
- func Dial(target string, opts ...DialOption) (*ClientConn, error) {
- return DialContext(context.Background(), target, opts...)
- }
- func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
- cc := &ClientConn{
- target: target,
- csMgr: &connectivityStateManager{},
- conns: make(map[*addrConn]struct{}),
- dopts: defaultDialOptions(),
- blockingpicker: newPickerWrapper(),
- czData: new(channelzData),
- firstResolveEvent: grpcsync.NewEvent(),
- }
- cc.retryThrottler.Store((*retryThrottler)(nil))
- cc.ctx, cc.cancel = context.WithCancel(context.Background())
- for _, opt := range opts {
- opt.apply(&cc.dopts)
- }
- chainUnaryClientInterceptors(cc)
- chainStreamClientInterceptors(cc)
- defer func() {
- if err != nil {
- cc.Close()
- }
- }()
- if channelz.IsOn() {
- if cc.dopts.channelzParentID != 0 {
- cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
- channelz.AddTraceEvent(logger, cc.channelzID, 0, &channelz.TraceEventDesc{
- Desc: "Channel Created",
- Severity: channelz.CtINFO,
- Parent: &channelz.TraceEventDesc{
- Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
- Severity: channelz.CtINFO,
- },
- })
- } else {
- cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
- channelz.Info(logger, cc.channelzID, "Channel Created")
- }
- cc.csMgr.channelzID = cc.channelzID
- }
- if !cc.dopts.insecure {
- if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
- return nil, errNoTransportSecurity
- }
- if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
- return nil, errTransportCredsAndBundle
- }
- } else {
- if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
- return nil, errCredentialsConflict
- }
- for _, cd := range cc.dopts.copts.PerRPCCredentials {
- if cd.RequireTransportSecurity() {
- return nil, errTransportCredentialsMissing
- }
- }
- }
- if cc.dopts.defaultServiceConfigRawJSON != nil {
- scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
- if scpr.Err != nil {
- return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
- }
- cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
- }
- cc.mkp = cc.dopts.copts.KeepaliveParams
- if cc.dopts.copts.Dialer == nil {
- cc.dopts.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
- network, addr := parseDialTarget(addr)
- return (&net.Dialer{}).DialContext(ctx, network, addr)
- }
- if cc.dopts.withProxy {
- cc.dopts.copts.Dialer = newProxyDialer(cc.dopts.copts.Dialer)
- }
- }
- if cc.dopts.copts.UserAgent != "" {
- cc.dopts.copts.UserAgent += " " + grpcUA
- } else {
- cc.dopts.copts.UserAgent = grpcUA
- }
- if cc.dopts.timeout > 0 {
- var cancel context.CancelFunc
- ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
- defer cancel()
- }
- defer func() {
- select {
- case <-ctx.Done():
- switch {
- case ctx.Err() == err:
- conn = nil
- case err == nil || !cc.dopts.returnLastError:
- conn, err = nil, ctx.Err()
- default:
- conn, err = nil, fmt.Errorf("%v: %v", ctx.Err(), err)
- }
- default:
- }
- }()
- scSet := false
- if cc.dopts.scChan != nil {
-
- select {
- case sc, ok := <-cc.dopts.scChan:
- if ok {
- cc.sc = &sc
- scSet = true
- }
- default:
- }
- }
- if cc.dopts.bs == nil {
- cc.dopts.bs = backoff.DefaultExponential
- }
-
- cc.parsedTarget = grpcutil.ParseTarget(cc.target)
- unixScheme := strings.HasPrefix(cc.target, "unix:")
- channelz.Infof(logger, cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
- resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
- if resolverBuilder == nil {
-
-
-
- channelz.Infof(logger, cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
- cc.parsedTarget = resolver.Target{
- Scheme: resolver.GetDefaultScheme(),
- Endpoint: target,
- }
- resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
- if resolverBuilder == nil {
- return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
- }
- }
- creds := cc.dopts.copts.TransportCredentials
- if creds != nil && creds.Info().ServerName != "" {
- cc.authority = creds.Info().ServerName
- } else if cc.dopts.insecure && cc.dopts.authority != "" {
- cc.authority = cc.dopts.authority
- } else if unixScheme {
- cc.authority = "localhost"
- } else {
-
-
- cc.authority = cc.parsedTarget.Endpoint
- }
- if cc.dopts.scChan != nil && !scSet {
-
- select {
- case sc, ok := <-cc.dopts.scChan:
- if ok {
- cc.sc = &sc
- }
- case <-ctx.Done():
- return nil, ctx.Err()
- }
- }
- if cc.dopts.scChan != nil {
- go cc.scWatcher()
- }
- var credsClone credentials.TransportCredentials
- if creds := cc.dopts.copts.TransportCredentials; creds != nil {
- credsClone = creds.Clone()
- }
- cc.balancerBuildOpts = balancer.BuildOptions{
- DialCreds: credsClone,
- CredsBundle: cc.dopts.copts.CredsBundle,
- Dialer: cc.dopts.copts.Dialer,
- ChannelzParentID: cc.channelzID,
- Target: cc.parsedTarget,
- }
-
- rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
- if err != nil {
- return nil, fmt.Errorf("failed to build resolver: %v", err)
- }
- cc.mu.Lock()
- cc.resolverWrapper = rWrapper
- cc.mu.Unlock()
-
- if cc.dopts.block {
- for {
- s := cc.GetState()
- if s == connectivity.Ready {
- break
- } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
- if err = cc.connectionError(); err != nil {
- terr, ok := err.(interface {
- Temporary() bool
- })
- if ok && !terr.Temporary() {
- return nil, err
- }
- }
- }
- if !cc.WaitForStateChange(ctx, s) {
-
- if err = cc.connectionError(); err != nil && cc.dopts.returnLastError {
- return nil, err
- }
- return nil, ctx.Err()
- }
- }
- }
- return cc, nil
- }
- func chainUnaryClientInterceptors(cc *ClientConn) {
- interceptors := cc.dopts.chainUnaryInts
-
-
- if cc.dopts.unaryInt != nil {
- interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
- }
- var chainedInt UnaryClientInterceptor
- if len(interceptors) == 0 {
- chainedInt = nil
- } else if len(interceptors) == 1 {
- chainedInt = interceptors[0]
- } else {
- chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
- return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
- }
- }
- cc.dopts.unaryInt = chainedInt
- }
- func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
- if curr == len(interceptors)-1 {
- return finalInvoker
- }
- return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
- return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
- }
- }
- func chainStreamClientInterceptors(cc *ClientConn) {
- interceptors := cc.dopts.chainStreamInts
-
-
- if cc.dopts.streamInt != nil {
- interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
- }
- var chainedInt StreamClientInterceptor
- if len(interceptors) == 0 {
- chainedInt = nil
- } else if len(interceptors) == 1 {
- chainedInt = interceptors[0]
- } else {
- chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
- return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
- }
- }
- cc.dopts.streamInt = chainedInt
- }
- func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
- if curr == len(interceptors)-1 {
- return finalStreamer
- }
- return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
- return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
- }
- }
- type connectivityStateManager struct {
- mu sync.Mutex
- state connectivity.State
- notifyChan chan struct{}
- channelzID int64
- }
- func (csm *connectivityStateManager) updateState(state connectivity.State) {
- csm.mu.Lock()
- defer csm.mu.Unlock()
- if csm.state == connectivity.Shutdown {
- return
- }
- if csm.state == state {
- return
- }
- csm.state = state
- channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)
- if csm.notifyChan != nil {
-
- close(csm.notifyChan)
- csm.notifyChan = nil
- }
- }
- func (csm *connectivityStateManager) getState() connectivity.State {
- csm.mu.Lock()
- defer csm.mu.Unlock()
- return csm.state
- }
- func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
- csm.mu.Lock()
- defer csm.mu.Unlock()
- if csm.notifyChan == nil {
- csm.notifyChan = make(chan struct{})
- }
- return csm.notifyChan
- }
- type ClientConnInterface interface {
-
-
- Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
-
- NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
- }
- var _ ClientConnInterface = (*ClientConn)(nil)
- type ClientConn struct {
- ctx context.Context
- cancel context.CancelFunc
- target string
- parsedTarget resolver.Target
- authority string
- dopts dialOptions
- csMgr *connectivityStateManager
- balancerBuildOpts balancer.BuildOptions
- blockingpicker *pickerWrapper
- mu sync.RWMutex
- resolverWrapper *ccResolverWrapper
- sc *ServiceConfig
- conns map[*addrConn]struct{}
-
- mkp keepalive.ClientParameters
- curBalancerName string
- balancerWrapper *ccBalancerWrapper
- retryThrottler atomic.Value
- firstResolveEvent *grpcsync.Event
- channelzID int64
- czData *channelzData
- lceMu sync.Mutex
- lastConnectionError error
- }
- func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
- ch := cc.csMgr.getNotifyChan()
- if cc.csMgr.getState() != sourceState {
- return true
- }
- select {
- case <-ctx.Done():
- return false
- case <-ch:
- return true
- }
- }
- func (cc *ClientConn) GetState() connectivity.State {
- return cc.csMgr.getState()
- }
- func (cc *ClientConn) scWatcher() {
- for {
- select {
- case sc, ok := <-cc.dopts.scChan:
- if !ok {
- return
- }
- cc.mu.Lock()
-
-
- cc.sc = &sc
- cc.mu.Unlock()
- case <-cc.ctx.Done():
- return
- }
- }
- }
- func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
-
-
- if cc.firstResolveEvent.HasFired() {
- return nil
- }
- select {
- case <-cc.firstResolveEvent.Done():
- return nil
- case <-ctx.Done():
- return status.FromContextError(ctx.Err()).Err()
- case <-cc.ctx.Done():
- return ErrClientConnClosing
- }
- }
- var emptyServiceConfig *ServiceConfig
- func init() {
- cfg := parseServiceConfig("{}")
- if cfg.Err != nil {
- panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
- }
- emptyServiceConfig = cfg.Config.(*ServiceConfig)
- }
- func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
- if cc.sc != nil {
- cc.applyServiceConfigAndBalancer(cc.sc, addrs)
- return
- }
- if cc.dopts.defaultServiceConfig != nil {
- cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs)
- } else {
- cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs)
- }
- }
- func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
- defer cc.firstResolveEvent.Fire()
- cc.mu.Lock()
-
-
-
- if cc.conns == nil {
- cc.mu.Unlock()
- return nil
- }
- if err != nil {
-
-
-
- cc.maybeApplyDefaultServiceConfig(nil)
- if cc.balancerWrapper != nil {
- cc.balancerWrapper.resolverError(err)
- }
-
- cc.mu.Unlock()
- return balancer.ErrBadResolverState
- }
- var ret error
- if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
- cc.maybeApplyDefaultServiceConfig(s.Addresses)
-
-
- } else {
- if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
- cc.applyServiceConfigAndBalancer(sc, s.Addresses)
- } else {
- ret = balancer.ErrBadResolverState
- if cc.balancerWrapper == nil {
- var err error
- if s.ServiceConfig.Err != nil {
- err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
- } else {
- err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
- }
- cc.blockingpicker.updatePicker(base.NewErrPicker(err))
- cc.csMgr.updateState(connectivity.TransientFailure)
- cc.mu.Unlock()
- return ret
- }
- }
- }
- var balCfg serviceconfig.LoadBalancingConfig
- if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
- balCfg = cc.sc.lbConfig.cfg
- }
- cbn := cc.curBalancerName
- bw := cc.balancerWrapper
- cc.mu.Unlock()
- if cbn != grpclbName {
-
- for i := 0; i < len(s.Addresses); {
- if s.Addresses[i].Type == resolver.GRPCLB {
- copy(s.Addresses[i:], s.Addresses[i+1:])
- s.Addresses = s.Addresses[:len(s.Addresses)-1]
- continue
- }
- i++
- }
- }
- uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
- if ret == nil {
- ret = uccsErr
-
- }
- return ret
- }
- func (cc *ClientConn) switchBalancer(name string) {
- if strings.EqualFold(cc.curBalancerName, name) {
- return
- }
- channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)
- if cc.dopts.balancerBuilder != nil {
- channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")
- return
- }
- if cc.balancerWrapper != nil {
- cc.balancerWrapper.close()
- }
- builder := balancer.Get(name)
- if builder == nil {
- channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
- channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
- builder = newPickfirstBuilder()
- } else {
- channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name)
- }
- cc.curBalancerName = builder.Name()
- cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
- }
- func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
- cc.mu.Lock()
- if cc.conns == nil {
- cc.mu.Unlock()
- return
- }
-
-
- cc.balancerWrapper.handleSubConnStateChange(sc, s, err)
- cc.mu.Unlock()
- }
- func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
- ac := &addrConn{
- state: connectivity.Idle,
- cc: cc,
- addrs: addrs,
- scopts: opts,
- dopts: cc.dopts,
- czData: new(channelzData),
- resetBackoff: make(chan struct{}),
- }
- ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
-
- cc.mu.Lock()
- if cc.conns == nil {
- cc.mu.Unlock()
- return nil, ErrClientConnClosing
- }
- if channelz.IsOn() {
- ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
- channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
- Desc: "Subchannel Created",
- Severity: channelz.CtINFO,
- Parent: &channelz.TraceEventDesc{
- Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
- Severity: channelz.CtINFO,
- },
- })
- }
- cc.conns[ac] = struct{}{}
- cc.mu.Unlock()
- return ac, nil
- }
- func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
- cc.mu.Lock()
- if cc.conns == nil {
- cc.mu.Unlock()
- return
- }
- delete(cc.conns, ac)
- cc.mu.Unlock()
- ac.tearDown(err)
- }
- func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
- return &channelz.ChannelInternalMetric{
- State: cc.GetState(),
- Target: cc.target,
- CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
- CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
- CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
- LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
- }
- }
- func (cc *ClientConn) Target() string {
- return cc.target
- }
- func (cc *ClientConn) incrCallsStarted() {
- atomic.AddInt64(&cc.czData.callsStarted, 1)
- atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
- }
- func (cc *ClientConn) incrCallsSucceeded() {
- atomic.AddInt64(&cc.czData.callsSucceeded, 1)
- }
- func (cc *ClientConn) incrCallsFailed() {
- atomic.AddInt64(&cc.czData.callsFailed, 1)
- }
- func (ac *addrConn) connect() error {
- ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- return errConnClosing
- }
- if ac.state != connectivity.Idle {
- ac.mu.Unlock()
- return nil
- }
-
-
- ac.updateConnectivityState(connectivity.Connecting, nil)
- ac.mu.Unlock()
-
- go ac.resetTransport()
- return nil
- }
- func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
- ac.mu.Lock()
- defer ac.mu.Unlock()
- channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
- if ac.state == connectivity.Shutdown ||
- ac.state == connectivity.TransientFailure ||
- ac.state == connectivity.Idle {
- ac.addrs = addrs
- return true
- }
- if ac.state == connectivity.Connecting {
- return false
- }
-
- var curAddrFound bool
- for _, a := range addrs {
- if reflect.DeepEqual(ac.curAddr, a) {
- curAddrFound = true
- break
- }
- }
- channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
- if curAddrFound {
- ac.addrs = addrs
- }
- return curAddrFound
- }
- func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
-
- cc.mu.RLock()
- defer cc.mu.RUnlock()
- if cc.sc == nil {
- return MethodConfig{}
- }
- if m, ok := cc.sc.Methods[method]; ok {
- return m
- }
- i := strings.LastIndex(method, "/")
- if m, ok := cc.sc.Methods[method[:i+1]]; ok {
- return m
- }
- return cc.sc.Methods[""]
- }
- func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
- cc.mu.RLock()
- defer cc.mu.RUnlock()
- if cc.sc == nil {
- return nil
- }
- return cc.sc.healthCheckConfig
- }
- func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
- t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
- Ctx: ctx,
- FullMethodName: method,
- })
- if err != nil {
- return nil, nil, toRPCErr(err)
- }
- return t, done, nil
- }
- func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) {
- if sc == nil {
-
- return
- }
- cc.sc = sc
- if cc.sc.retryThrottling != nil {
- newThrottler := &retryThrottler{
- tokens: cc.sc.retryThrottling.MaxTokens,
- max: cc.sc.retryThrottling.MaxTokens,
- thresh: cc.sc.retryThrottling.MaxTokens / 2,
- ratio: cc.sc.retryThrottling.TokenRatio,
- }
- cc.retryThrottler.Store(newThrottler)
- } else {
- cc.retryThrottler.Store((*retryThrottler)(nil))
- }
- if cc.dopts.balancerBuilder == nil {
-
-
- var newBalancerName string
- if cc.sc != nil && cc.sc.lbConfig != nil {
- newBalancerName = cc.sc.lbConfig.name
- } else {
- var isGRPCLB bool
- for _, a := range addrs {
- if a.Type == resolver.GRPCLB {
- isGRPCLB = true
- break
- }
- }
- if isGRPCLB {
- newBalancerName = grpclbName
- } else if cc.sc != nil && cc.sc.LB != nil {
- newBalancerName = *cc.sc.LB
- } else {
- newBalancerName = PickFirstBalancerName
- }
- }
- cc.switchBalancer(newBalancerName)
- } else if cc.balancerWrapper == nil {
-
-
- cc.curBalancerName = cc.dopts.balancerBuilder.Name()
- cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
- }
- }
- func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
- cc.mu.RLock()
- r := cc.resolverWrapper
- cc.mu.RUnlock()
- if r == nil {
- return
- }
- go r.resolveNow(o)
- }
- func (cc *ClientConn) ResetConnectBackoff() {
- cc.mu.Lock()
- conns := cc.conns
- cc.mu.Unlock()
- for ac := range conns {
- ac.resetConnectBackoff()
- }
- }
- func (cc *ClientConn) Close() error {
- defer cc.cancel()
- cc.mu.Lock()
- if cc.conns == nil {
- cc.mu.Unlock()
- return ErrClientConnClosing
- }
- conns := cc.conns
- cc.conns = nil
- cc.csMgr.updateState(connectivity.Shutdown)
- rWrapper := cc.resolverWrapper
- cc.resolverWrapper = nil
- bWrapper := cc.balancerWrapper
- cc.balancerWrapper = nil
- cc.mu.Unlock()
- cc.blockingpicker.close()
- if rWrapper != nil {
- rWrapper.close()
- }
- if bWrapper != nil {
- bWrapper.close()
- }
- for ac := range conns {
- ac.tearDown(ErrClientConnClosing)
- }
- if channelz.IsOn() {
- ted := &channelz.TraceEventDesc{
- Desc: "Channel Deleted",
- Severity: channelz.CtINFO,
- }
- if cc.dopts.channelzParentID != 0 {
- ted.Parent = &channelz.TraceEventDesc{
- Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
- Severity: channelz.CtINFO,
- }
- }
- channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)
-
-
- channelz.RemoveEntry(cc.channelzID)
- }
- return nil
- }
- type addrConn struct {
- ctx context.Context
- cancel context.CancelFunc
- cc *ClientConn
- dopts dialOptions
- acbw balancer.SubConn
- scopts balancer.NewSubConnOptions
-
-
-
-
- transport transport.ClientTransport
- mu sync.Mutex
- curAddr resolver.Address
- addrs []resolver.Address
-
- state connectivity.State
- backoffIdx int
- resetBackoff chan struct{}
- channelzID int64
- czData *channelzData
- }
- func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
- if ac.state == s {
- return
- }
- ac.state = s
- channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v", s)
- ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
- }
- func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
- switch r {
- case transport.GoAwayTooManyPings:
- v := 2 * ac.dopts.copts.KeepaliveParams.Time
- ac.cc.mu.Lock()
- if v > ac.cc.mkp.Time {
- ac.cc.mkp.Time = v
- }
- ac.cc.mu.Unlock()
- }
- }
- func (ac *addrConn) resetTransport() {
- for i := 0; ; i++ {
- if i > 0 {
- ac.cc.resolveNow(resolver.ResolveNowOptions{})
- }
- ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- return
- }
- addrs := ac.addrs
- backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
-
- dialDuration := minConnectTimeout
- if ac.dopts.minConnectTimeout != nil {
- dialDuration = ac.dopts.minConnectTimeout()
- }
- if dialDuration < backoffFor {
-
- dialDuration = backoffFor
- }
-
-
-
-
-
-
- connectDeadline := time.Now().Add(dialDuration)
- ac.updateConnectivityState(connectivity.Connecting, nil)
- ac.transport = nil
- ac.mu.Unlock()
- newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
- if err != nil {
-
-
- ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- return
- }
- ac.updateConnectivityState(connectivity.TransientFailure, err)
-
- b := ac.resetBackoff
- ac.mu.Unlock()
- timer := time.NewTimer(backoffFor)
- select {
- case <-timer.C:
- ac.mu.Lock()
- ac.backoffIdx++
- ac.mu.Unlock()
- case <-b:
- timer.Stop()
- case <-ac.ctx.Done():
- timer.Stop()
- return
- }
- continue
- }
- ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- newTr.Close()
- return
- }
- ac.curAddr = addr
- ac.transport = newTr
- ac.backoffIdx = 0
- hctx, hcancel := context.WithCancel(ac.ctx)
- ac.startHealthCheck(hctx)
- ac.mu.Unlock()
-
-
- <-reconnect.Done()
- hcancel()
-
-
-
-
-
-
-
-
-
-
- }
- }
- func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
- var firstConnErr error
- for _, addr := range addrs {
- ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- return nil, resolver.Address{}, nil, errConnClosing
- }
- ac.cc.mu.RLock()
- ac.dopts.copts.KeepaliveParams = ac.cc.mkp
- ac.cc.mu.RUnlock()
- copts := ac.dopts.copts
- if ac.scopts.CredsBundle != nil {
- copts.CredsBundle = ac.scopts.CredsBundle
- }
- ac.mu.Unlock()
- channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
- newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
- if err == nil {
- return newTr, addr, reconnect, nil
- }
- if firstConnErr == nil {
- firstConnErr = err
- }
- ac.cc.updateConnectionError(err)
- }
-
- return nil, resolver.Address{}, nil, firstConnErr
- }
- func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
- prefaceReceived := make(chan struct{})
- onCloseCalled := make(chan struct{})
- reconnect := grpcsync.NewEvent()
-
- if addr.ServerName == "" {
- addr.ServerName = ac.cc.authority
- }
- once := sync.Once{}
- onGoAway := func(r transport.GoAwayReason) {
- ac.mu.Lock()
- ac.adjustParams(r)
- once.Do(func() {
- if ac.state == connectivity.Ready {
-
-
-
-
- ac.updateConnectivityState(connectivity.Connecting, nil)
- }
- })
- ac.mu.Unlock()
- reconnect.Fire()
- }
- onClose := func() {
- ac.mu.Lock()
- once.Do(func() {
- if ac.state == connectivity.Ready {
-
-
-
-
- ac.updateConnectivityState(connectivity.Connecting, nil)
- }
- })
- ac.mu.Unlock()
- close(onCloseCalled)
- reconnect.Fire()
- }
- onPrefaceReceipt := func() {
- close(prefaceReceived)
- }
- connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
- defer cancel()
- if channelz.IsOn() {
- copts.ChannelzParentID = ac.channelzID
- }
- newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onPrefaceReceipt, onGoAway, onClose)
- if err != nil {
-
- channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err)
- return nil, nil, err
- }
- select {
- case <-time.After(time.Until(connectDeadline)):
-
- newTr.Close()
- channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
- return nil, nil, errors.New("timed out waiting for server handshake")
- case <-prefaceReceived:
-
- case <-onCloseCalled:
-
- return nil, nil, errors.New("connection closed")
-
- }
- return newTr, reconnect, nil
- }
- func (ac *addrConn) startHealthCheck(ctx context.Context) {
- var healthcheckManagingState bool
- defer func() {
- if !healthcheckManagingState {
- ac.updateConnectivityState(connectivity.Ready, nil)
- }
- }()
- if ac.cc.dopts.disableHealthCheck {
- return
- }
- healthCheckConfig := ac.cc.healthCheckConfig()
- if healthCheckConfig == nil {
- return
- }
- if !ac.scopts.HealthCheckEnabled {
- return
- }
- healthCheckFunc := ac.cc.dopts.healthCheckFunc
- if healthCheckFunc == nil {
-
-
-
- channelz.Error(logger, ac.channelzID, "Health check is requested but health check function is not set.")
- return
- }
- healthcheckManagingState = true
-
- currentTr := ac.transport
- newStream := func(method string) (interface{}, error) {
- ac.mu.Lock()
- if ac.transport != currentTr {
- ac.mu.Unlock()
- return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
- }
- ac.mu.Unlock()
- return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
- }
- setConnectivityState := func(s connectivity.State, lastErr error) {
- ac.mu.Lock()
- defer ac.mu.Unlock()
- if ac.transport != currentTr {
- return
- }
- ac.updateConnectivityState(s, lastErr)
- }
-
- go func() {
- err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
- if err != nil {
- if status.Code(err) == codes.Unimplemented {
- channelz.Error(logger, ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")
- } else {
- channelz.Errorf(logger, ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err)
- }
- }
- }()
- }
- func (ac *addrConn) resetConnectBackoff() {
- ac.mu.Lock()
- close(ac.resetBackoff)
- ac.backoffIdx = 0
- ac.resetBackoff = make(chan struct{})
- ac.mu.Unlock()
- }
- func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
- ac.mu.Lock()
- if ac.state == connectivity.Ready && ac.transport != nil {
- t := ac.transport
- ac.mu.Unlock()
- return t, true
- }
- var idle bool
- if ac.state == connectivity.Idle {
- idle = true
- }
- ac.mu.Unlock()
-
- if idle {
- ac.connect()
- }
- return nil, false
- }
- func (ac *addrConn) tearDown(err error) {
- ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- return
- }
- curTr := ac.transport
- ac.transport = nil
-
-
- ac.updateConnectivityState(connectivity.Shutdown, nil)
- ac.cancel()
- ac.curAddr = resolver.Address{}
- if err == errConnDrain && curTr != nil {
-
-
-
-
-
- ac.mu.Unlock()
- curTr.GracefulClose()
- ac.mu.Lock()
- }
- if channelz.IsOn() {
- channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
- Desc: "Subchannel Deleted",
- Severity: channelz.CtINFO,
- Parent: &channelz.TraceEventDesc{
- Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
- Severity: channelz.CtINFO,
- },
- })
-
-
- channelz.RemoveEntry(ac.channelzID)
- }
- ac.mu.Unlock()
- }
- func (ac *addrConn) getState() connectivity.State {
- ac.mu.Lock()
- defer ac.mu.Unlock()
- return ac.state
- }
- func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
- ac.mu.Lock()
- addr := ac.curAddr.Addr
- ac.mu.Unlock()
- return &channelz.ChannelInternalMetric{
- State: ac.getState(),
- Target: addr,
- CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
- CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
- CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
- LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
- }
- }
- func (ac *addrConn) incrCallsStarted() {
- atomic.AddInt64(&ac.czData.callsStarted, 1)
- atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
- }
- func (ac *addrConn) incrCallsSucceeded() {
- atomic.AddInt64(&ac.czData.callsSucceeded, 1)
- }
- func (ac *addrConn) incrCallsFailed() {
- atomic.AddInt64(&ac.czData.callsFailed, 1)
- }
- type retryThrottler struct {
- max float64
- thresh float64
- ratio float64
- mu sync.Mutex
- tokens float64
- }
- func (rt *retryThrottler) throttle() bool {
- if rt == nil {
- return false
- }
- rt.mu.Lock()
- defer rt.mu.Unlock()
- rt.tokens--
- if rt.tokens < 0 {
- rt.tokens = 0
- }
- return rt.tokens <= rt.thresh
- }
- func (rt *retryThrottler) successfulRPC() {
- if rt == nil {
- return
- }
- rt.mu.Lock()
- defer rt.mu.Unlock()
- rt.tokens += rt.ratio
- if rt.tokens > rt.max {
- rt.tokens = rt.max
- }
- }
- type channelzChannel struct {
- cc *ClientConn
- }
- func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
- return c.cc.channelzMetric()
- }
- var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
- func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
- for _, rb := range cc.dopts.resolvers {
- if scheme == rb.Scheme() {
- return rb
- }
- }
- return resolver.Get(scheme)
- }
- func (cc *ClientConn) updateConnectionError(err error) {
- cc.lceMu.Lock()
- cc.lastConnectionError = err
- cc.lceMu.Unlock()
- }
- func (cc *ClientConn) connectionError() error {
- cc.lceMu.Lock()
- defer cc.lceMu.Unlock()
- return cc.lastConnectionError
- }
|