aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/google.golang.org/grpc/clientconn.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go1508
1 files changed, 1508 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
new file mode 100644
index 000000000..84b6dbe3e
--- /dev/null
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -0,0 +1,1508 @@
+/*
+ *
+ * Copyright 2014 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package grpc
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "math"
+ "net"
+ "reflect"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "google.golang.org/grpc/balancer"
+ _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/connectivity"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal"
+ "google.golang.org/grpc/internal/backoff"
+ "google.golang.org/grpc/internal/channelz"
+ "google.golang.org/grpc/internal/envconfig"
+ "google.golang.org/grpc/internal/grpcsync"
+ "google.golang.org/grpc/internal/transport"
+ "google.golang.org/grpc/keepalive"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/resolver"
+ _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
+ _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
+ "google.golang.org/grpc/status"
+)
+
+const (
+ // minimum time to give a connection to complete
+ minConnectTimeout = 20 * time.Second
+ // must match grpclbName in grpclb/grpclb.go
+ grpclbName = "grpclb"
+)
+
+var (
+ // ErrClientConnClosing indicates that the operation is illegal because
+ // the ClientConn is closing.
+ //
+ // Deprecated: this error should not be relied upon by users; use the status
+ // code of Canceled instead.
+ ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
+ // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
+ errConnDrain = errors.New("grpc: the connection is drained")
+ // errConnClosing indicates that the connection is closing.
+ errConnClosing = errors.New("grpc: the connection is closing")
+ // errBalancerClosed indicates that the balancer is closed.
+ errBalancerClosed = errors.New("grpc: balancer is closed")
+ // We use an accessor so that minConnectTimeout can be
+ // atomically read and updated while testing.
+ getMinConnectTimeout = func() time.Duration {
+ return minConnectTimeout
+ }
+)
+
+// The following errors are returned from Dial and DialContext
+var (
+ // errNoTransportSecurity indicates that there is no transport security
+ // being set for ClientConn. Users should either set one or explicitly
+ // call WithInsecure DialOption to disable security.
+ errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
+ // errTransportCredsAndBundle indicates that creds bundle is used together
+ // with other individual Transport Credentials.
+ errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
+ // errTransportCredentialsMissing indicates that users want to transmit security
+ // information (e.g., oauth2 token) which requires secure connection on an insecure
+ // connection.
+ errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
+ // errCredentialsConflict indicates that grpc.WithTransportCredentials()
+ // and grpc.WithInsecure() are both called for a connection.
+ errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
+)
+
+const (
+ defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
+ defaultClientMaxSendMessageSize = math.MaxInt32
+ // http2IOBufSize specifies the buffer size for sending frames.
+ defaultWriteBufSize = 32 * 1024
+ defaultReadBufSize = 32 * 1024
+)
+
+// Dial creates a client connection to the given target.
+func Dial(target string, opts ...DialOption) (*ClientConn, error) {
+ return DialContext(context.Background(), target, opts...)
+}
+
+// DialContext creates a client connection to the given target. By default, it's
+// a non-blocking dial (the function won't wait for connections to be
+// established, and connecting happens in the background). To make it a blocking
+// dial, use WithBlock() dial option.
+//
+// In the non-blocking case, the ctx does not act against the connection. It
+// only controls the setup steps.
+//
+// In the blocking case, ctx can be used to cancel or expire the pending
+// connection. Once this function returns, the cancellation and expiration of
+// ctx will be noop. Users should call ClientConn.Close to terminate all the
+// pending operations after this function returns.
+//
+// The target name syntax is defined in
+// https://github.com/grpc/grpc/blob/master/doc/naming.md.
+// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
+func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
+ cc := &ClientConn{
+ target: target,
+ csMgr: &connectivityStateManager{},
+ conns: make(map[*addrConn]struct{}),
+ dopts: defaultDialOptions(),
+ blockingpicker: newPickerWrapper(),
+ czData: new(channelzData),
+ firstResolveEvent: grpcsync.NewEvent(),
+ }
+ cc.retryThrottler.Store((*retryThrottler)(nil))
+ cc.ctx, cc.cancel = context.WithCancel(context.Background())
+
+ for _, opt := range opts {
+ opt.apply(&cc.dopts)
+ }
+
+ if channelz.IsOn() {
+ if cc.dopts.channelzParentID != 0 {
+ cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
+ channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
+ Desc: "Channel Created",
+ Severity: channelz.CtINFO,
+ Parent: &channelz.TraceEventDesc{
+ Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
+ Severity: channelz.CtINFO,
+ },
+ })
+ } else {
+ cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
+ channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
+ Desc: "Channel Created",
+ Severity: channelz.CtINFO,
+ })
+ }
+ cc.csMgr.channelzID = cc.channelzID
+ }
+
+ if !cc.dopts.insecure {
+ if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
+ return nil, errNoTransportSecurity
+ }
+ if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
+ return nil, errTransportCredsAndBundle
+ }
+ } else {
+ if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
+ return nil, errCredentialsConflict
+ }
+ for _, cd := range cc.dopts.copts.PerRPCCredentials {
+ if cd.RequireTransportSecurity() {
+ return nil, errTransportCredentialsMissing
+ }
+ }
+ }
+
+ cc.mkp = cc.dopts.copts.KeepaliveParams
+
+ if cc.dopts.copts.Dialer == nil {
+ cc.dopts.copts.Dialer = newProxyDialer(
+ func(ctx context.Context, addr string) (net.Conn, error) {
+ network, addr := parseDialTarget(addr)
+ return (&net.Dialer{}).DialContext(ctx, network, addr)
+ },
+ )
+ }
+
+ if cc.dopts.copts.UserAgent != "" {
+ cc.dopts.copts.UserAgent += " " + grpcUA
+ } else {
+ cc.dopts.copts.UserAgent = grpcUA
+ }
+
+ if cc.dopts.timeout > 0 {
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
+ defer cancel()
+ }
+
+ defer func() {
+ select {
+ case <-ctx.Done():
+ conn, err = nil, ctx.Err()
+ default:
+ }
+
+ if err != nil {
+ cc.Close()
+ }
+ }()
+
+ scSet := false
+ if cc.dopts.scChan != nil {
+ // Try to get an initial service config.
+ select {
+ case sc, ok := <-cc.dopts.scChan:
+ if ok {
+ cc.sc = sc
+ scSet = true
+ }
+ default:
+ }
+ }
+ if cc.dopts.bs == nil {
+ cc.dopts.bs = backoff.Exponential{
+ MaxDelay: DefaultBackoffConfig.MaxDelay,
+ }
+ }
+ if cc.dopts.resolverBuilder == nil {
+ // Only try to parse target when resolver builder is not already set.
+ cc.parsedTarget = parseTarget(cc.target)
+ grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
+ cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
+ if cc.dopts.resolverBuilder == nil {
+ // If resolver builder is still nil, the parse target's scheme is
+ // not registered. Fallback to default resolver and set Endpoint to
+ // the original unparsed target.
+ grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
+ cc.parsedTarget = resolver.Target{
+ Scheme: resolver.GetDefaultScheme(),
+ Endpoint: target,
+ }
+ cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
+ }
+ } else {
+ cc.parsedTarget = resolver.Target{Endpoint: target}
+ }
+ creds := cc.dopts.copts.TransportCredentials
+ if creds != nil && creds.Info().ServerName != "" {
+ cc.authority = creds.Info().ServerName
+ } else if cc.dopts.insecure && cc.dopts.authority != "" {
+ cc.authority = cc.dopts.authority
+ } else {
+ // Use endpoint from "scheme://authority/endpoint" as the default
+ // authority for ClientConn.
+ cc.authority = cc.parsedTarget.Endpoint
+ }
+
+ if cc.dopts.scChan != nil && !scSet {
+ // Blocking wait for the initial service config.
+ select {
+ case sc, ok := <-cc.dopts.scChan:
+ if ok {
+ cc.sc = sc
+ }
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
+ }
+ if cc.dopts.scChan != nil {
+ go cc.scWatcher()
+ }
+
+ var credsClone credentials.TransportCredentials
+ if creds := cc.dopts.copts.TransportCredentials; creds != nil {
+ credsClone = creds.Clone()
+ }
+ cc.balancerBuildOpts = balancer.BuildOptions{
+ DialCreds: credsClone,
+ CredsBundle: cc.dopts.copts.CredsBundle,
+ Dialer: cc.dopts.copts.Dialer,
+ ChannelzParentID: cc.channelzID,
+ }
+
+ // Build the resolver.
+ rWrapper, err := newCCResolverWrapper(cc)
+ if err != nil {
+ return nil, fmt.Errorf("failed to build resolver: %v", err)
+ }
+
+ cc.mu.Lock()
+ cc.resolverWrapper = rWrapper
+ cc.mu.Unlock()
+ // A blocking dial blocks until the clientConn is ready.
+ if cc.dopts.block {
+ for {
+ s := cc.GetState()
+ if s == connectivity.Ready {
+ break
+ } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
+ if err = cc.blockingpicker.connectionError(); err != nil {
+ terr, ok := err.(interface {
+ Temporary() bool
+ })
+ if ok && !terr.Temporary() {
+ return nil, err
+ }
+ }
+ }
+ if !cc.WaitForStateChange(ctx, s) {
+ // ctx got timeout or canceled.
+ return nil, ctx.Err()
+ }
+ }
+ }
+
+ return cc, nil
+}
+
+// connectivityStateManager keeps the connectivity.State of ClientConn.
+// This struct will eventually be exported so the balancers can access it.
+type connectivityStateManager struct {
+ mu sync.Mutex
+ state connectivity.State
+ notifyChan chan struct{}
+ channelzID int64
+}
+
+// updateState updates the connectivity.State of ClientConn.
+// If there's a change it notifies goroutines waiting on state change to
+// happen.
+func (csm *connectivityStateManager) updateState(state connectivity.State) {
+ csm.mu.Lock()
+ defer csm.mu.Unlock()
+ if csm.state == connectivity.Shutdown {
+ return
+ }
+ if csm.state == state {
+ return
+ }
+ csm.state = state
+ if channelz.IsOn() {
+ channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
+ Desc: fmt.Sprintf("Channel Connectivity change to %v", state),
+ Severity: channelz.CtINFO,
+ })
+ }
+ if csm.notifyChan != nil {
+ // There are other goroutines waiting on this channel.
+ close(csm.notifyChan)
+ csm.notifyChan = nil
+ }
+}
+
+func (csm *connectivityStateManager) getState() connectivity.State {
+ csm.mu.Lock()
+ defer csm.mu.Unlock()
+ return csm.state
+}
+
+func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
+ csm.mu.Lock()
+ defer csm.mu.Unlock()
+ if csm.notifyChan == nil {
+ csm.notifyChan = make(chan struct{})
+ }
+ return csm.notifyChan
+}
+
+// ClientConn represents a client connection to an RPC server.
+type ClientConn struct {
+ ctx context.Context
+ cancel context.CancelFunc
+
+ target string
+ parsedTarget resolver.Target
+ authority string
+ dopts dialOptions
+ csMgr *connectivityStateManager
+
+ balancerBuildOpts balancer.BuildOptions
+ blockingpicker *pickerWrapper
+
+ mu sync.RWMutex
+ resolverWrapper *ccResolverWrapper
+ sc ServiceConfig
+ scRaw string
+ conns map[*addrConn]struct{}
+ // Keepalive parameter can be updated if a GoAway is received.
+ mkp keepalive.ClientParameters
+ curBalancerName string
+ preBalancerName string // previous balancer name.
+ curAddresses []resolver.Address
+ balancerWrapper *ccBalancerWrapper
+ retryThrottler atomic.Value
+
+ firstResolveEvent *grpcsync.Event
+
+ channelzID int64 // channelz unique identification number
+ czData *channelzData
+}
+
+// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
+// ctx expires. A true value is returned in former case and false in latter.
+// This is an EXPERIMENTAL API.
+func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
+ ch := cc.csMgr.getNotifyChan()
+ if cc.csMgr.getState() != sourceState {
+ return true
+ }
+ select {
+ case <-ctx.Done():
+ return false
+ case <-ch:
+ return true
+ }
+}
+
+// GetState returns the connectivity.State of ClientConn.
+// This is an EXPERIMENTAL API.
+func (cc *ClientConn) GetState() connectivity.State {
+ return cc.csMgr.getState()
+}
+
+func (cc *ClientConn) scWatcher() {
+ for {
+ select {
+ case sc, ok := <-cc.dopts.scChan:
+ if !ok {
+ return
+ }
+ cc.mu.Lock()
+ // TODO: load balance policy runtime change is ignored.
+ // We may revist this decision in the future.
+ cc.sc = sc
+ cc.scRaw = ""
+ cc.mu.Unlock()
+ case <-cc.ctx.Done():
+ return
+ }
+ }
+}
+
+// waitForResolvedAddrs blocks until the resolver has provided addresses or the
+// context expires. Returns nil unless the context expires first; otherwise
+// returns a status error based on the context.
+func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
+ // This is on the RPC path, so we use a fast path to avoid the
+ // more-expensive "select" below after the resolver has returned once.
+ if cc.firstResolveEvent.HasFired() {
+ return nil
+ }
+ select {
+ case <-cc.firstResolveEvent.Done():
+ return nil
+ case <-ctx.Done():
+ return status.FromContextError(ctx.Err()).Err()
+ case <-cc.ctx.Done():
+ return ErrClientConnClosing
+ }
+}
+
+func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ if cc.conns == nil {
+ // cc was closed.
+ return
+ }
+
+ if reflect.DeepEqual(cc.curAddresses, addrs) {
+ return
+ }
+
+ cc.curAddresses = addrs
+ cc.firstResolveEvent.Fire()
+
+ if cc.dopts.balancerBuilder == nil {
+ // Only look at balancer types and switch balancer if balancer dial
+ // option is not set.
+ var isGRPCLB bool
+ for _, a := range addrs {
+ if a.Type == resolver.GRPCLB {
+ isGRPCLB = true
+ break
+ }
+ }
+ var newBalancerName string
+ if isGRPCLB {
+ newBalancerName = grpclbName
+ } else {
+ // Address list doesn't contain grpclb address. Try to pick a
+ // non-grpclb balancer.
+ newBalancerName = cc.curBalancerName
+ // If current balancer is grpclb, switch to the previous one.
+ if newBalancerName == grpclbName {
+ newBalancerName = cc.preBalancerName
+ }
+ // The following could be true in two cases:
+ // - the first time handling resolved addresses
+ // (curBalancerName="")
+ // - the first time handling non-grpclb addresses
+ // (curBalancerName="grpclb", preBalancerName="")
+ if newBalancerName == "" {
+ newBalancerName = PickFirstBalancerName
+ }
+ }
+ cc.switchBalancer(newBalancerName)
+ } else if cc.balancerWrapper == nil {
+ // Balancer dial option was set, and this is the first time handling
+ // resolved addresses. Build a balancer with dopts.balancerBuilder.
+ cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
+ }
+
+ cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
+}
+
+// switchBalancer starts the switching from current balancer to the balancer
+// with the given name.
+//
+// It will NOT send the current address list to the new balancer. If needed,
+// caller of this function should send address list to the new balancer after
+// this function returns.
+//
+// Caller must hold cc.mu.
+func (cc *ClientConn) switchBalancer(name string) {
+ if cc.conns == nil {
+ return
+ }
+
+ if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
+ return
+ }
+
+ grpclog.Infof("ClientConn switching balancer to %q", name)
+ if cc.dopts.balancerBuilder != nil {
+ grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
+ return
+ }
+ // TODO(bar switching) change this to two steps: drain and close.
+ // Keep track of sc in wrapper.
+ if cc.balancerWrapper != nil {
+ cc.balancerWrapper.close()
+ }
+
+ builder := balancer.Get(name)
+ // TODO(yuxuanli): If user send a service config that does not contain a valid balancer name, should
+ // we reuse previous one?
+ if channelz.IsOn() {
+ if builder == nil {
+ channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
+ Desc: fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
+ Severity: channelz.CtWarning,
+ })
+ } else {
+ channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
+ Desc: fmt.Sprintf("Channel switches to new LB policy %q", name),
+ Severity: channelz.CtINFO,
+ })
+ }
+ }
+ if builder == nil {
+ grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
+ builder = newPickfirstBuilder()
+ }
+
+ cc.preBalancerName = cc.curBalancerName
+ cc.curBalancerName = builder.Name()
+ cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
+}
+
+func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
+ cc.mu.Lock()
+ if cc.conns == nil {
+ cc.mu.Unlock()
+ return
+ }
+ // TODO(bar switching) send updates to all balancer wrappers when balancer
+ // gracefully switching is supported.
+ cc.balancerWrapper.handleSubConnStateChange(sc, s)
+ cc.mu.Unlock()
+}
+
+// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
+//
+// Caller needs to make sure len(addrs) > 0.
+func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
+ ac := &addrConn{
+ cc: cc,
+ addrs: addrs,
+ scopts: opts,
+ dopts: cc.dopts,
+ czData: new(channelzData),
+ successfulHandshake: true, // make the first nextAddr() call _not_ move addrIdx up by 1
+ resetBackoff: make(chan struct{}),
+ }
+ ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
+ // Track ac in cc. This needs to be done before any getTransport(...) is called.
+ cc.mu.Lock()
+ if cc.conns == nil {
+ cc.mu.Unlock()
+ return nil, ErrClientConnClosing
+ }
+ if channelz.IsOn() {
+ ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
+ channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+ Desc: "Subchannel Created",
+ Severity: channelz.CtINFO,
+ Parent: &channelz.TraceEventDesc{
+ Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
+ Severity: channelz.CtINFO,
+ },
+ })
+ }
+ cc.conns[ac] = struct{}{}
+ cc.mu.Unlock()
+ return ac, nil
+}
+
+// removeAddrConn removes the addrConn in the subConn from clientConn.
+// It also tears down the ac with the given error.
+func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
+ cc.mu.Lock()
+ if cc.conns == nil {
+ cc.mu.Unlock()
+ return
+ }
+ delete(cc.conns, ac)
+ cc.mu.Unlock()
+ ac.tearDown(err)
+}
+
+func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
+ return &channelz.ChannelInternalMetric{
+ State: cc.GetState(),
+ Target: cc.target,
+ CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
+ CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
+ CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
+ LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
+ }
+}
+
+// Target returns the target string of the ClientConn.
+// This is an EXPERIMENTAL API.
+func (cc *ClientConn) Target() string {
+ return cc.target
+}
+
+func (cc *ClientConn) incrCallsStarted() {
+ atomic.AddInt64(&cc.czData.callsStarted, 1)
+ atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
+}
+
+func (cc *ClientConn) incrCallsSucceeded() {
+ atomic.AddInt64(&cc.czData.callsSucceeded, 1)
+}
+
+func (cc *ClientConn) incrCallsFailed() {
+ atomic.AddInt64(&cc.czData.callsFailed, 1)
+}
+
+// connect starts creating a transport.
+// It does nothing if the ac is not IDLE.
+// TODO(bar) Move this to the addrConn section.
+func (ac *addrConn) connect() error {
+ ac.mu.Lock()
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return errConnClosing
+ }
+ if ac.state != connectivity.Idle {
+ ac.mu.Unlock()
+ return nil
+ }
+ ac.updateConnectivityState(connectivity.Connecting)
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
+ ac.mu.Unlock()
+
+ // Start a goroutine connecting to the server asynchronously.
+ go ac.resetTransport(false)
+ return nil
+}
+
+// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
+//
+// It checks whether current connected address of ac is in the new addrs list.
+// - If true, it updates ac.addrs and returns true. The ac will keep using
+// the existing connection.
+// - If false, it does nothing and returns false.
+func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
+ ac.mu.Lock()
+ defer ac.mu.Unlock()
+ grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
+ if ac.state == connectivity.Shutdown {
+ ac.addrs = addrs
+ return true
+ }
+
+ var curAddrFound bool
+ for _, a := range addrs {
+ if reflect.DeepEqual(ac.curAddr, a) {
+ curAddrFound = true
+ break
+ }
+ }
+ grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
+ if curAddrFound {
+ ac.addrs = addrs
+ ac.addrIdx = 0 // Start reconnecting from beginning in the new list.
+ }
+
+ return curAddrFound
+}
+
+// GetMethodConfig gets the method config of the input method.
+// If there's an exact match for input method (i.e. /service/method), we return
+// the corresponding MethodConfig.
+// If there isn't an exact match for the input method, we look for the default config
+// under the service (i.e /service/). If there is a default MethodConfig for
+// the service, we return it.
+// Otherwise, we return an empty MethodConfig.
+func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
+ // TODO: Avoid the locking here.
+ cc.mu.RLock()
+ defer cc.mu.RUnlock()
+ m, ok := cc.sc.Methods[method]
+ if !ok {
+ i := strings.LastIndex(method, "/")
+ m = cc.sc.Methods[method[:i+1]]
+ }
+ return m
+}
+
+func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
+ cc.mu.RLock()
+ defer cc.mu.RUnlock()
+ return cc.sc.healthCheckConfig
+}
+
+func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
+ hdr, _ := metadata.FromOutgoingContext(ctx)
+ t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
+ FullMethodName: method,
+ Header: hdr,
+ })
+ if err != nil {
+ return nil, nil, toRPCErr(err)
+ }
+ return t, done, nil
+}
+
+// handleServiceConfig parses the service config string in JSON format to Go native
+// struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
+func (cc *ClientConn) handleServiceConfig(js string) error {
+ if cc.dopts.disableServiceConfig {
+ return nil
+ }
+ if cc.scRaw == js {
+ return nil
+ }
+ if channelz.IsOn() {
+ channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
+ // The special formatting of \"%s\" instead of %q is to provide nice printing of service config
+ // for human consumption.
+ Desc: fmt.Sprintf("Channel has a new service config \"%s\"", js),
+ Severity: channelz.CtINFO,
+ })
+ }
+ sc, err := parseServiceConfig(js)
+ if err != nil {
+ return err
+ }
+ cc.mu.Lock()
+ // Check if the ClientConn is already closed. Some fields (e.g.
+ // balancerWrapper) are set to nil when closing the ClientConn, and could
+ // cause nil pointer panic if we don't have this check.
+ if cc.conns == nil {
+ cc.mu.Unlock()
+ return nil
+ }
+ cc.scRaw = js
+ cc.sc = sc
+
+ if sc.retryThrottling != nil {
+ newThrottler := &retryThrottler{
+ tokens: sc.retryThrottling.MaxTokens,
+ max: sc.retryThrottling.MaxTokens,
+ thresh: sc.retryThrottling.MaxTokens / 2,
+ ratio: sc.retryThrottling.TokenRatio,
+ }
+ cc.retryThrottler.Store(newThrottler)
+ } else {
+ cc.retryThrottler.Store((*retryThrottler)(nil))
+ }
+
+ if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
+ if cc.curBalancerName == grpclbName {
+ // If current balancer is grpclb, there's at least one grpclb
+ // balancer address in the resolved list. Don't switch the balancer,
+ // but change the previous balancer name, so if a new resolved
+ // address list doesn't contain grpclb address, balancer will be
+ // switched to *sc.LB.
+ cc.preBalancerName = *sc.LB
+ } else {
+ cc.switchBalancer(*sc.LB)
+ cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
+ }
+ }
+
+ cc.mu.Unlock()
+ return nil
+}
+
+func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
+ cc.mu.RLock()
+ r := cc.resolverWrapper
+ cc.mu.RUnlock()
+ if r == nil {
+ return
+ }
+ go r.resolveNow(o)
+}
+
+// ResetConnectBackoff wakes up all subchannels in transient failure and causes
+// them to attempt another connection immediately. It also resets the backoff
+// times used for subsequent attempts regardless of the current state.
+//
+// In general, this function should not be used. Typical service or network
+// outages result in a reasonable client reconnection strategy by default.
+// However, if a previously unavailable network becomes available, this may be
+// used to trigger an immediate reconnect.
+//
+// This API is EXPERIMENTAL.
+func (cc *ClientConn) ResetConnectBackoff() {
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ for ac := range cc.conns {
+ ac.resetConnectBackoff()
+ }
+}
+
+// Close tears down the ClientConn and all underlying connections.
+func (cc *ClientConn) Close() error {
+ defer cc.cancel()
+
+ cc.mu.Lock()
+ if cc.conns == nil {
+ cc.mu.Unlock()
+ return ErrClientConnClosing
+ }
+ conns := cc.conns
+ cc.conns = nil
+ cc.csMgr.updateState(connectivity.Shutdown)
+
+ rWrapper := cc.resolverWrapper
+ cc.resolverWrapper = nil
+ bWrapper := cc.balancerWrapper
+ cc.balancerWrapper = nil
+ cc.mu.Unlock()
+
+ cc.blockingpicker.close()
+
+ if rWrapper != nil {
+ rWrapper.close()
+ }
+ if bWrapper != nil {
+ bWrapper.close()
+ }
+
+ for ac := range conns {
+ ac.tearDown(ErrClientConnClosing)
+ }
+ if channelz.IsOn() {
+ ted := &channelz.TraceEventDesc{
+ Desc: "Channel Deleted",
+ Severity: channelz.CtINFO,
+ }
+ if cc.dopts.channelzParentID != 0 {
+ ted.Parent = &channelz.TraceEventDesc{
+ Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
+ Severity: channelz.CtINFO,
+ }
+ }
+ channelz.AddTraceEvent(cc.channelzID, ted)
+ // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
+ // the entity beng deleted, and thus prevent it from being deleted right away.
+ channelz.RemoveEntry(cc.channelzID)
+ }
+ return nil
+}
+
+// addrConn is a network connection to a given address.
+type addrConn struct {
+ ctx context.Context
+ cancel context.CancelFunc
+
+ cc *ClientConn
+ dopts dialOptions
+ acbw balancer.SubConn
+ scopts balancer.NewSubConnOptions
+
+ // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
+ // health checking may require server to report healthy to set ac to READY), and is reset
+ // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
+ // is received, transport is closed, ac has been torn down).
+ transport transport.ClientTransport // The current transport.
+
+ mu sync.Mutex
+ addrIdx int // The index in addrs list to start reconnecting from.
+ curAddr resolver.Address // The current address.
+ addrs []resolver.Address // All addresses that the resolver resolved to.
+
+ // Use updateConnectivityState for updating addrConn's connectivity state.
+ state connectivity.State
+
+ tearDownErr error // The reason this addrConn is torn down.
+
+ backoffIdx int
+ // backoffDeadline is the time until which resetTransport needs to
+ // wait before increasing backoffIdx count.
+ backoffDeadline time.Time
+ // connectDeadline is the time by which all connection
+ // negotiations must complete.
+ connectDeadline time.Time
+
+ resetBackoff chan struct{}
+
+ channelzID int64 // channelz unique identification number
+ czData *channelzData
+
+ successfulHandshake bool
+
+ healthCheckEnabled bool
+}
+
+// Note: this requires a lock on ac.mu.
+func (ac *addrConn) updateConnectivityState(s connectivity.State) {
+ ac.state = s
+ if channelz.IsOn() {
+ channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+ Desc: fmt.Sprintf("Subchannel Connectivity change to %v", s),
+ Severity: channelz.CtINFO,
+ })
+ }
+}
+
+// adjustParams updates parameters used to create transports upon
+// receiving a GoAway.
+func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
+ switch r {
+ case transport.GoAwayTooManyPings:
+ v := 2 * ac.dopts.copts.KeepaliveParams.Time
+ ac.cc.mu.Lock()
+ if v > ac.cc.mkp.Time {
+ ac.cc.mkp.Time = v
+ }
+ ac.cc.mu.Unlock()
+ }
+}
+
+// resetTransport makes sure that a healthy ac.transport exists.
+//
+// The transport will close itself when it encounters an error, or on GOAWAY, or on deadline waiting for handshake, or
+// when the clientconn is closed. Each iteration creating a new transport will try a different address that the balancer
+// assigned to the addrConn, until it has tried all addresses. Once it has tried all addresses, it will re-resolve to
+// get a new address list. If an error is received, the list is re-resolved and the next reset attempt will try from the
+// beginning. This method has backoff built in. The backoff amount starts at 0 and increases each time resolution occurs
+// (addresses are exhausted). The backoff amount is reset to 0 each time a handshake is received.
+//
+// If the DialOption WithWaitForHandshake was set, resetTransport returns successfully only after handshake is received.
+func (ac *addrConn) resetTransport(resolveNow bool) {
+ for {
+ // If this is the first in a line of resets, we want to resolve immediately. The only other time we
+ // want to reset is if we have tried all the addresses handed to us.
+ if resolveNow {
+ ac.mu.Lock()
+ ac.cc.resolveNow(resolver.ResolveNowOption{})
+ ac.mu.Unlock()
+ }
+
+ ac.mu.Lock()
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return
+ }
+
+ // The transport that was used before is no longer viable.
+ ac.transport = nil
+ // If the connection is READY, a failure must have occurred.
+ // Otherwise, we'll consider this is a transient failure when:
+ // We've exhausted all addresses
+ // We're in CONNECTING
+ // And it's not the very first addr to try TODO(deklerk) find a better way to do this than checking ac.successfulHandshake
+ if ac.state == connectivity.Ready || (ac.addrIdx == len(ac.addrs)-1 && ac.state == connectivity.Connecting && !ac.successfulHandshake) {
+ ac.updateConnectivityState(connectivity.TransientFailure)
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
+ }
+ ac.transport = nil
+ ac.mu.Unlock()
+
+ if err := ac.nextAddr(); err != nil {
+ return
+ }
+
+ ac.mu.Lock()
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return
+ }
+
+ backoffIdx := ac.backoffIdx
+ backoffFor := ac.dopts.bs.Backoff(backoffIdx)
+
+ // This will be the duration that dial gets to finish.
+ dialDuration := getMinConnectTimeout()
+ if backoffFor > dialDuration {
+ // Give dial more time as we keep failing to connect.
+ dialDuration = backoffFor
+ }
+ start := time.Now()
+ connectDeadline := start.Add(dialDuration)
+ ac.backoffDeadline = start.Add(backoffFor)
+ ac.connectDeadline = connectDeadline
+
+ ac.mu.Unlock()
+
+ ac.cc.mu.RLock()
+ ac.dopts.copts.KeepaliveParams = ac.cc.mkp
+ ac.cc.mu.RUnlock()
+
+ ac.mu.Lock()
+
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return
+ }
+
+ if ac.state != connectivity.Connecting {
+ ac.updateConnectivityState(connectivity.Connecting)
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
+ }
+
+ addr := ac.addrs[ac.addrIdx]
+ copts := ac.dopts.copts
+ if ac.scopts.CredsBundle != nil {
+ copts.CredsBundle = ac.scopts.CredsBundle
+ }
+ ac.mu.Unlock()
+
+ if channelz.IsOn() {
+ channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+ Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
+ Severity: channelz.CtINFO,
+ })
+ }
+
+ if err := ac.createTransport(backoffIdx, addr, copts, connectDeadline); err != nil {
+ continue
+ }
+
+ return
+ }
+}
+
+// createTransport creates a connection to one of the backends in addrs.
+func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
+ oneReset := sync.Once{}
+ skipReset := make(chan struct{})
+ allowedToReset := make(chan struct{})
+ prefaceReceived := make(chan struct{})
+ onCloseCalled := make(chan struct{})
+
+ var prefaceMu sync.Mutex
+ var serverPrefaceReceived bool
+ var clientPrefaceWrote bool
+
+ hcCtx, hcCancel := context.WithCancel(ac.ctx)
+
+ onGoAway := func(r transport.GoAwayReason) {
+ hcCancel()
+ ac.mu.Lock()
+ ac.adjustParams(r)
+ ac.mu.Unlock()
+ select {
+ case <-skipReset: // The outer resetTransport loop will handle reconnection.
+ return
+ case <-allowedToReset: // We're in the clear to reset.
+ go oneReset.Do(func() { ac.resetTransport(false) })
+ }
+ }
+
+ prefaceTimer := time.NewTimer(connectDeadline.Sub(time.Now()))
+
+ onClose := func() {
+ hcCancel()
+ close(onCloseCalled)
+ prefaceTimer.Stop()
+
+ select {
+ case <-skipReset: // The outer resetTransport loop will handle reconnection.
+ return
+ case <-allowedToReset: // We're in the clear to reset.
+ oneReset.Do(func() { ac.resetTransport(false) })
+ }
+ }
+
+ target := transport.TargetInfo{
+ Addr: addr.Addr,
+ Metadata: addr.Metadata,
+ Authority: ac.cc.authority,
+ }
+
+ onPrefaceReceipt := func() {
+ close(prefaceReceived)
+ prefaceTimer.Stop()
+
+ // TODO(deklerk): optimization; does anyone else actually use this lock? maybe we can just remove it for this scope
+ ac.mu.Lock()
+
+ prefaceMu.Lock()
+ serverPrefaceReceived = true
+ if clientPrefaceWrote {
+ ac.successfulHandshake = true
+ }
+ prefaceMu.Unlock()
+
+ ac.mu.Unlock()
+ }
+
+ connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
+ defer cancel()
+ if channelz.IsOn() {
+ copts.ChannelzParentID = ac.channelzID
+ }
+
+ newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
+
+ if err == nil {
+ prefaceMu.Lock()
+ clientPrefaceWrote = true
+ if serverPrefaceReceived || ac.dopts.reqHandshake == envconfig.RequireHandshakeOff {
+ ac.successfulHandshake = true
+ }
+ prefaceMu.Unlock()
+
+ if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
+ select {
+ case <-prefaceTimer.C:
+ // We didn't get the preface in time.
+ newTr.Close()
+ err = errors.New("timed out waiting for server handshake")
+ case <-prefaceReceived:
+ // We got the preface - huzzah! things are good.
+ case <-onCloseCalled:
+ // The transport has already closed - noop.
+ close(allowedToReset)
+ return nil
+ }
+ } else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
+ go func() {
+ select {
+ case <-prefaceTimer.C:
+ // We didn't get the preface in time.
+ newTr.Close()
+ case <-prefaceReceived:
+ // We got the preface just in the nick of time - huzzah!
+ case <-onCloseCalled:
+ // The transport has already closed - noop.
+ }
+ }()
+ }
+ }
+
+ if err != nil {
+ // newTr is either nil, or closed.
+ ac.cc.blockingpicker.updateConnectionError(err)
+ ac.mu.Lock()
+ if ac.state == connectivity.Shutdown {
+ // ac.tearDown(...) has been invoked.
+ ac.mu.Unlock()
+
+ // We don't want to reset during this close because we prefer to kick out of this function and let the loop
+ // in resetTransport take care of reconnecting.
+ close(skipReset)
+
+ return errConnClosing
+ }
+ ac.mu.Unlock()
+ grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
+
+ // We don't want to reset during this close because we prefer to kick out of this function and let the loop
+ // in resetTransport take care of reconnecting.
+ close(skipReset)
+
+ return err
+ }
+
+ // Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
+ ac.mu.Lock()
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ close(skipReset)
+ newTr.Close()
+ return nil
+ }
+ ac.transport = newTr
+ ac.mu.Unlock()
+
+ healthCheckConfig := ac.cc.healthCheckConfig()
+ // LB channel health checking is only enabled when all the four requirements below are met:
+ // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
+ // 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
+ // 3. a service config with non-empty healthCheckConfig field is provided,
+ // 4. the current load balancer allows it.
+ if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
+ if internal.HealthCheckFunc != nil {
+ go ac.startHealthCheck(hcCtx, newTr, addr, healthCheckConfig.ServiceName)
+ close(allowedToReset)
+ return nil
+ }
+ // TODO: add a link to the health check doc in the error message.
+ grpclog.Error("the client side LB channel health check function has not been set.")
+ }
+
+ // No LB channel health check case
+ ac.mu.Lock()
+
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+
+ // unblock onGoAway/onClose callback.
+ close(skipReset)
+ return errConnClosing
+ }
+
+ ac.updateConnectivityState(connectivity.Ready)
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
+ ac.curAddr = addr
+
+ ac.mu.Unlock()
+
+ // Ok, _now_ we will finally let the transport reset if it encounters a closable error. Without this, the reader
+ // goroutine failing races with all the code in this method that sets the connection to "ready".
+ close(allowedToReset)
+ return nil
+}
+
+func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
+ // Set up the health check helper functions
+ newStream := func() (interface{}, error) {
+ return ac.newClientStream(ctx, &StreamDesc{ServerStreams: true}, "/grpc.health.v1.Health/Watch", newTr)
+ }
+ firstReady := true
+ reportHealth := func(ok bool) {
+ ac.mu.Lock()
+ defer ac.mu.Unlock()
+ if ac.transport != newTr {
+ return
+ }
+ if ok {
+ if firstReady {
+ firstReady = false
+ ac.curAddr = addr
+ }
+ if ac.state != connectivity.Ready {
+ ac.updateConnectivityState(connectivity.Ready)
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
+ }
+ } else {
+ if ac.state != connectivity.TransientFailure {
+ ac.updateConnectivityState(connectivity.TransientFailure)
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
+ }
+ }
+ }
+
+ err := internal.HealthCheckFunc(ctx, newStream, reportHealth, serviceName)
+ if err != nil {
+ if status.Code(err) == codes.Unimplemented {
+ if channelz.IsOn() {
+ channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+ Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
+ Severity: channelz.CtError,
+ })
+ }
+ grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
+ } else {
+ grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
+ }
+ }
+}
+
+// nextAddr increments the addrIdx if there are more addresses to try. If
+// there are no more addrs to try it will re-resolve, set addrIdx to 0, and
+// increment the backoffIdx.
+//
+// nextAddr must be called without ac.mu being held.
+func (ac *addrConn) nextAddr() error {
+ ac.mu.Lock()
+
+ // If a handshake has been observed, we want the next usage to start at
+ // index 0 immediately.
+ if ac.successfulHandshake {
+ ac.successfulHandshake = false
+ ac.backoffDeadline = time.Time{}
+ ac.connectDeadline = time.Time{}
+ ac.addrIdx = 0
+ ac.backoffIdx = 0
+ ac.mu.Unlock()
+ return nil
+ }
+
+ if ac.addrIdx < len(ac.addrs)-1 {
+ ac.addrIdx++
+ ac.mu.Unlock()
+ return nil
+ }
+
+ ac.addrIdx = 0
+ ac.backoffIdx++
+
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return errConnClosing
+ }
+ ac.cc.resolveNow(resolver.ResolveNowOption{})
+ backoffDeadline := ac.backoffDeadline
+ b := ac.resetBackoff
+ ac.mu.Unlock()
+ timer := time.NewTimer(backoffDeadline.Sub(time.Now()))
+ select {
+ case <-timer.C:
+ case <-b:
+ timer.Stop()
+ case <-ac.ctx.Done():
+ timer.Stop()
+ return ac.ctx.Err()
+ }
+ return nil
+}
+
+func (ac *addrConn) resetConnectBackoff() {
+ ac.mu.Lock()
+ close(ac.resetBackoff)
+ ac.backoffIdx = 0
+ ac.resetBackoff = make(chan struct{})
+ ac.mu.Unlock()
+}
+
+// getReadyTransport returns the transport if ac's state is READY.
+// Otherwise it returns nil, false.
+// If ac's state is IDLE, it will trigger ac to connect.
+func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
+ ac.mu.Lock()
+ if ac.state == connectivity.Ready && ac.transport != nil {
+ t := ac.transport
+ ac.mu.Unlock()
+ return t, true
+ }
+ var idle bool
+ if ac.state == connectivity.Idle {
+ idle = true
+ }
+ ac.mu.Unlock()
+ // Trigger idle ac to connect.
+ if idle {
+ ac.connect()
+ }
+ return nil, false
+}
+
+// tearDown starts to tear down the addrConn.
+// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
+// some edge cases (e.g., the caller opens and closes many addrConn's in a
+// tight loop.
+// tearDown doesn't remove ac from ac.cc.conns.
+func (ac *addrConn) tearDown(err error) {
+ ac.mu.Lock()
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return
+ }
+ curTr := ac.transport
+ ac.transport = nil
+ // We have to set the state to Shutdown before anything else to prevent races
+ // between setting the state and logic that waits on context cancelation / etc.
+ ac.updateConnectivityState(connectivity.Shutdown)
+ ac.cancel()
+ ac.tearDownErr = err
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
+ ac.curAddr = resolver.Address{}
+ if err == errConnDrain && curTr != nil {
+ // GracefulClose(...) may be executed multiple times when
+ // i) receiving multiple GoAway frames from the server; or
+ // ii) there are concurrent name resolver/Balancer triggered
+ // address removal and GoAway.
+ // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
+ ac.mu.Unlock()
+ curTr.GracefulClose()
+ ac.mu.Lock()
+ }
+ if channelz.IsOn() {
+ channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+ Desc: "Subchannel Deleted",
+ Severity: channelz.CtINFO,
+ Parent: &channelz.TraceEventDesc{
+ Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
+ Severity: channelz.CtINFO,
+ },
+ })
+ // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
+ // the entity beng deleted, and thus prevent it from being deleted right away.
+ channelz.RemoveEntry(ac.channelzID)
+ }
+ ac.mu.Unlock()
+}
+
+func (ac *addrConn) getState() connectivity.State {
+ ac.mu.Lock()
+ defer ac.mu.Unlock()
+ return ac.state
+}
+
+func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
+ ac.mu.Lock()
+ addr := ac.curAddr.Addr
+ ac.mu.Unlock()
+ return &channelz.ChannelInternalMetric{
+ State: ac.getState(),
+ Target: addr,
+ CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
+ CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
+ CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
+ LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
+ }
+}
+
+func (ac *addrConn) incrCallsStarted() {
+ atomic.AddInt64(&ac.czData.callsStarted, 1)
+ atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
+}
+
+func (ac *addrConn) incrCallsSucceeded() {
+ atomic.AddInt64(&ac.czData.callsSucceeded, 1)
+}
+
+func (ac *addrConn) incrCallsFailed() {
+ atomic.AddInt64(&ac.czData.callsFailed, 1)
+}
+
+type retryThrottler struct {
+ max float64
+ thresh float64
+ ratio float64
+
+ mu sync.Mutex
+ tokens float64 // TODO(dfawley): replace with atomic and remove lock.
+}
+
+// throttle subtracts a retry token from the pool and returns whether a retry
+// should be throttled (disallowed) based upon the retry throttling policy in
+// the service config.
+func (rt *retryThrottler) throttle() bool {
+ if rt == nil {
+ return false
+ }
+ rt.mu.Lock()
+ defer rt.mu.Unlock()
+ rt.tokens--
+ if rt.tokens < 0 {
+ rt.tokens = 0
+ }
+ return rt.tokens <= rt.thresh
+}
+
+func (rt *retryThrottler) successfulRPC() {
+ if rt == nil {
+ return
+ }
+ rt.mu.Lock()
+ defer rt.mu.Unlock()
+ rt.tokens += rt.ratio
+ if rt.tokens > rt.max {
+ rt.tokens = rt.max
+ }
+}
+
+type channelzChannel struct {
+ cc *ClientConn
+}
+
+func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
+ return c.cc.channelzMetric()
+}
+
+// ErrClientConnTimeout indicates that the ClientConn cannot establish the
+// underlying connections within the specified timeout.
+//
+// Deprecated: This error is never returned by grpc and should not be
+// referenced by users.
+var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")