Skip to content

Commit

Permalink
xds: Initial implementation of a client using the v2 API (grpc#3144)
Browse files Browse the repository at this point in the history
This object will be used by a higher level xdsClient object, which will
provide the watch API used by the xds resolver and balancer
implementations.
  • Loading branch information
easwars authored Nov 12, 2019
1 parent 460b1d2 commit dc9615b
Show file tree
Hide file tree
Showing 10 changed files with 2,095 additions and 3 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module google.golang.org/grpc
go 1.11

require (
github.com/envoyproxy/go-control-plane v0.9.0
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473
github.com/envoyproxy/protoc-gen-validate v0.1.0
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/mock v1.1.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/envoyproxy/go-control-plane v0.9.0 h1:67WMNTvGrl7V1dWdKCeTwxDr7nio9clKoTlLhwIPnT4=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473 h1:4cmBvAEBNJaGARUEs3/suWRyfyBfhf7I60WBZq+bv2w=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
Expand Down
156 changes: 156 additions & 0 deletions xds/internal/client/fakexds/fakexds.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package fakexds provides a very basic fake implementation of the xDS server
// for unit testing purposes.
package fakexds

import (
"context"
"net"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

discoverypb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
)

// TODO: Make this a var or a field in the server if there is a need to use a
// value other than this default.
const defaultChannelBufferSize = 50

// Request wraps an xDS request and error.
type Request struct {
Req *discoverypb.DiscoveryRequest
Err error
}

// Response wraps an xDS response and error.
type Response struct {
Resp *discoverypb.DiscoveryResponse
Err error
}

// Server is a very basic implementation of a fake xDS server. It provides a
// request and response channel for the user to control the requests that are
// expected and the responses that needs to be sent out.
type Server struct {
// RequestChan is a buffered channel on which the fake server writes the
// received requests onto.
RequestChan chan *Request
// ResponseChan is a buffered channel from which the fake server reads the
// responses that it must send out to the client.
ResponseChan chan *Response
}

// StartClientAndServer starts a fakexds.Server and creates a ClientConn
// talking to it. The returned cleanup function should be invoked by the caller
// once the test is done.
func StartClientAndServer(t *testing.T) (*Server, *grpc.ClientConn, func()) {
t.Helper()

var lis net.Listener
var err error
lis, err = net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("net.Listen() failed: %v", err)
}

server := grpc.NewServer()
fs := &Server{
RequestChan: make(chan *Request, defaultChannelBufferSize),
ResponseChan: make(chan *Response, defaultChannelBufferSize),
}
adsgrpc.RegisterAggregatedDiscoveryServiceServer(server, fs)
go server.Serve(lis)
t.Logf("Starting fake xDS server at %v...", lis.Addr().String())
defer func() {
if err != nil {
server.Stop()
lis.Close()
}
}()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

var cc *grpc.ClientConn
cc, err = grpc.DialContext(ctx, lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
t.Fatalf("grpc.DialContext(%s) failed: %v", lis.Addr().String(), err)
}
t.Log("Started xDS gRPC client...")

return fs, cc, func() {
server.Stop()
lis.Close()
}
}

// StreamAggregatedResources is the fake implementation to handle an ADS
// stream.
func (fs *Server) StreamAggregatedResources(s adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
errCh := make(chan error, 2)
go func() {
for {
req, err := s.Recv()
if err != nil {
errCh <- err
return
}
fs.RequestChan <- &Request{req, err}
}
}()
go func() {
var retErr error
defer func() {
errCh <- retErr
}()

for {
select {
case r := <-fs.ResponseChan:
if r.Err != nil {
retErr = r.Err
return
}
if err := s.Send(r.Resp); err != nil {
retErr = err
return
}
case <-s.Context().Done():
retErr = s.Context().Err()
return
}
}
}()

if err := <-errCh; err != nil {
return err
}
return nil
}

// DeltaAggregatedResources helps implement the ADS service.
func (fs *Server) DeltaAggregatedResources(adsgrpc.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
return status.Error(codes.Unimplemented, "")
}
125 changes: 125 additions & 0 deletions xds/internal/client/lds.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package client

import (
"fmt"

"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/grpclog"

xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
httppb "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2"
)

// newLDSRequest generates an LDS request proto for the provided target, to be
// sent out on the wire.
func (v2c *v2Client) newLDSRequest(target []string) *xdspb.DiscoveryRequest {
return &xdspb.DiscoveryRequest{
Node: v2c.nodeProto,
TypeUrl: listenerURL,
ResourceNames: target,
}
}

// sendLDS sends an LDS request for provided target on the provided stream.
func (v2c *v2Client) sendLDS(stream adsStream, target []string) bool {
if err := stream.Send(v2c.newLDSRequest(target)); err != nil {
grpclog.Warningf("xds: LDS request for resource %v failed: %v", target, err)
return false
}
return true
}

// handleLDSResponse processes an LDS response received from the xDS server. On
// receipt of a good response, it also invokes the registered watcher callback.
func (v2c *v2Client) handleLDSResponse(resp *xdspb.DiscoveryResponse) error {
v2c.mu.Lock()
defer v2c.mu.Unlock()

wi := v2c.watchMap[ldsResource]
if wi == nil {
return fmt.Errorf("xds: no LDS watcher found when handling LDS response: %+v", resp)
}

routeName := ""
for _, r := range resp.GetResources() {
var resource ptypes.DynamicAny
if err := ptypes.UnmarshalAny(r, &resource); err != nil {
return fmt.Errorf("xds: failed to unmarshal resource in LDS response: %v", err)
}
lis, ok := resource.Message.(*xdspb.Listener)
if !ok {
return fmt.Errorf("xds: unexpected resource type: %T in LDS response", resource.Message)
}
if lis.GetName() != wi.target[0] {
// We ignore listeners we are not watching for because LDS is
// special in the sense that there is only one resource we are
// interested in, and this resource does not change over the
// lifetime of the v2Client. So, we don't have to cache other
// listeners which we are not interested in.
continue
}
var err error
routeName, err = getRouteConfigNameFromListener(lis)
if err != nil {
return err
}
}

var err error
if routeName == "" {
err = fmt.Errorf("xds: LDS target %s not found in received response %+v", wi.target, resp)
}
wi.expiryTimer.Stop()
wi.callback.(ldsCallback)(ldsUpdate{routeName: routeName}, err)
return nil
}

// getRouteConfigNameFromListener checks if the provided Listener proto meets
// the expected criteria. If so, it returns a non-empty routeConfigName.
func getRouteConfigNameFromListener(lis *xdspb.Listener) (string, error) {
if lis.GetApiListener() == nil {
return "", fmt.Errorf("xds: no api_listener field in LDS response %+v", lis)
}
var apiAny ptypes.DynamicAny
if err := ptypes.UnmarshalAny(lis.GetApiListener().GetApiListener(), &apiAny); err != nil {
return "", fmt.Errorf("xds: failed to unmarshal api_listner in LDS response: %v", err)
}
apiLis, ok := apiAny.Message.(*httppb.HttpConnectionManager)
if !ok {
return "", fmt.Errorf("xds: unexpected api_listener type: %T in LDS response", apiAny.Message)
}
switch apiLis.RouteSpecifier.(type) {
case *httppb.HttpConnectionManager_Rds:
name := apiLis.GetRds().GetRouteConfigName()
if name == "" {
return "", fmt.Errorf("xds: empty route_config_name in LDS response: %+v", lis)
}
return name, nil
case *httppb.HttpConnectionManager_RouteConfig:
// TODO: Add support for specifying the RouteConfiguration inline
// in the LDS response.
return "", fmt.Errorf("xds: LDS response contains RDS config inline. Not supported for now: %+v", apiLis)
case nil:
return "", fmt.Errorf("xds: no RouteSpecifier in received LDS response: %+v", apiLis)
default:
return "", fmt.Errorf("xds: unsupported type %T for RouteSpecifier in received LDS response", apiLis.RouteSpecifier)
}
}
Loading

0 comments on commit dc9615b

Please sign in to comment.