123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605 |
- package grpc
- import (
- "context"
- "fmt"
- "net"
- "time"
- "google.golang.org/grpc/backoff"
- "google.golang.org/grpc/balancer"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/internal"
- internalbackoff "google.golang.org/grpc/internal/backoff"
- "google.golang.org/grpc/internal/envconfig"
- "google.golang.org/grpc/internal/transport"
- "google.golang.org/grpc/keepalive"
- "google.golang.org/grpc/resolver"
- "google.golang.org/grpc/stats"
- )
- type dialOptions struct {
- unaryInt UnaryClientInterceptor
- streamInt StreamClientInterceptor
- chainUnaryInts []UnaryClientInterceptor
- chainStreamInts []StreamClientInterceptor
- cp Compressor
- dc Decompressor
- bs internalbackoff.Strategy
- block bool
- returnLastError bool
- insecure bool
- timeout time.Duration
- scChan <-chan ServiceConfig
- authority string
- copts transport.ConnectOptions
- callOptions []CallOption
-
- balancerBuilder balancer.Builder
- channelzParentID int64
- disableServiceConfig bool
- disableRetry bool
- disableHealthCheck bool
- healthCheckFunc internal.HealthChecker
- minConnectTimeout func() time.Duration
- defaultServiceConfig *ServiceConfig
- defaultServiceConfigRawJSON *string
-
-
-
- resolveNowBackoff func(int) time.Duration
- resolvers []resolver.Builder
- withProxy bool
- }
- type DialOption interface {
- apply(*dialOptions)
- }
- type EmptyDialOption struct{}
- func (EmptyDialOption) apply(*dialOptions) {}
- type funcDialOption struct {
- f func(*dialOptions)
- }
- func (fdo *funcDialOption) apply(do *dialOptions) {
- fdo.f(do)
- }
- func newFuncDialOption(f func(*dialOptions)) *funcDialOption {
- return &funcDialOption{
- f: f,
- }
- }
- func WithWriteBufferSize(s int) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.copts.WriteBufferSize = s
- })
- }
- func WithReadBufferSize(s int) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.copts.ReadBufferSize = s
- })
- }
- func WithInitialWindowSize(s int32) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.copts.InitialWindowSize = s
- })
- }
- func WithInitialConnWindowSize(s int32) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.copts.InitialConnWindowSize = s
- })
- }
- func WithMaxMsgSize(s int) DialOption {
- return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
- }
- func WithDefaultCallOptions(cos ...CallOption) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.callOptions = append(o.callOptions, cos...)
- })
- }
- func WithCodec(c Codec) DialOption {
- return WithDefaultCallOptions(CallCustomCodec(c))
- }
- func WithCompressor(cp Compressor) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.cp = cp
- })
- }
- func WithDecompressor(dc Decompressor) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.dc = dc
- })
- }
- func WithBalancerName(balancerName string) DialOption {
- builder := balancer.Get(balancerName)
- if builder == nil {
- panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
- }
- return newFuncDialOption(func(o *dialOptions) {
- o.balancerBuilder = builder
- })
- }
- func WithServiceConfig(c <-chan ServiceConfig) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.scChan = c
- })
- }
- func WithConnectParams(p ConnectParams) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.bs = internalbackoff.Exponential{Config: p.Backoff}
- o.minConnectTimeout = func() time.Duration {
- return p.MinConnectTimeout
- }
- })
- }
- func WithBackoffMaxDelay(md time.Duration) DialOption {
- return WithBackoffConfig(BackoffConfig{MaxDelay: md})
- }
- func WithBackoffConfig(b BackoffConfig) DialOption {
- bc := backoff.DefaultConfig
- bc.MaxDelay = b.MaxDelay
- return withBackoff(internalbackoff.Exponential{Config: bc})
- }
- func withBackoff(bs internalbackoff.Strategy) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.bs = bs
- })
- }
- func WithBlock() DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.block = true
- })
- }
- func WithReturnConnectionError() DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.block = true
- o.returnLastError = true
- })
- }
- func WithInsecure() DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.insecure = true
- })
- }
- func WithNoProxy() DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.withProxy = false
- })
- }
- func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.copts.TransportCredentials = creds
- })
- }
- func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
- })
- }
- func WithCredentialsBundle(b credentials.Bundle) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.copts.CredsBundle = b
- })
- }
- func WithTimeout(d time.Duration) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.timeout = d
- })
- }
- func WithContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.copts.Dialer = f
- })
- }
- func init() {
- internal.WithHealthCheckFunc = withHealthCheckFunc
- }
- func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
- return WithContextDialer(
- func(ctx context.Context, addr string) (net.Conn, error) {
- if deadline, ok := ctx.Deadline(); ok {
- return f(addr, time.Until(deadline))
- }
- return f(addr, 0)
- })
- }
- func WithStatsHandler(h stats.Handler) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.copts.StatsHandler = h
- })
- }
- func FailOnNonTempDialError(f bool) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.copts.FailOnNonTempDialError = f
- })
- }
- func WithUserAgent(s string) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.copts.UserAgent = s
- })
- }
- func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
- if kp.Time < internal.KeepaliveMinPingTime {
- logger.Warningf("Adjusting keepalive ping interval to minimum period of %v", internal.KeepaliveMinPingTime)
- kp.Time = internal.KeepaliveMinPingTime
- }
- return newFuncDialOption(func(o *dialOptions) {
- o.copts.KeepaliveParams = kp
- })
- }
- func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.unaryInt = f
- })
- }
- func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
- })
- }
- func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.streamInt = f
- })
- }
- func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.chainStreamInts = append(o.chainStreamInts, interceptors...)
- })
- }
- func WithAuthority(a string) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.authority = a
- })
- }
- func WithChannelzParentID(id int64) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.channelzParentID = id
- })
- }
- func WithDisableServiceConfig() DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.disableServiceConfig = true
- })
- }
- func WithDefaultServiceConfig(s string) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.defaultServiceConfigRawJSON = &s
- })
- }
- func WithDisableRetry() DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.disableRetry = true
- })
- }
- func WithMaxHeaderListSize(s uint32) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.copts.MaxHeaderListSize = &s
- })
- }
- func WithDisableHealthCheck() DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.disableHealthCheck = true
- })
- }
- func withHealthCheckFunc(f internal.HealthChecker) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.healthCheckFunc = f
- })
- }
- func defaultDialOptions() dialOptions {
- return dialOptions{
- disableRetry: !envconfig.Retry,
- healthCheckFunc: internal.HealthCheckFunc,
- copts: transport.ConnectOptions{
- WriteBufferSize: defaultWriteBufSize,
- ReadBufferSize: defaultReadBufSize,
- },
- resolveNowBackoff: internalbackoff.DefaultExponential.Backoff,
- withProxy: true,
- }
- }
- func withMinConnectDeadline(f func() time.Duration) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.minConnectTimeout = f
- })
- }
- func withResolveNowBackoff(f func(int) time.Duration) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.resolveNowBackoff = f
- })
- }
- func WithResolvers(rs ...resolver.Builder) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.resolvers = append(o.resolvers, rs...)
- })
- }
|