diff --git a/pkg/debounce/refresher.go b/pkg/debounce/refresher.go new file mode 100644 index 00000000..fd7843c4 --- /dev/null +++ b/pkg/debounce/refresher.go @@ -0,0 +1,51 @@ +package debounce + +import ( + "context" + "sync" + "time" + + "github.com/sirupsen/logrus" +) + +// Refreshable represents an object which can be refreshed. This should be protected by a mutex for concurrent operation. +type Refreshable interface { + Refresh() error +} + +// DebounceableRefresher is used to debounce multiple attempts to refresh a refreshable type. +type DebounceableRefresher struct { + sync.Mutex + // Refreshable is any type that can be refreshed. The refresh method should by protected by a mutex internally. + Refreshable Refreshable + current context.CancelFunc +} + +// RefreshAfter requests a refresh after a certain time has passed. Subsequent calls to this method will +// delay the requested refresh by the new duration. Note that this is a total override of the previous calls - calling +// RefreshAfter(time.Second * 2) and then immediately calling RefreshAfter(time.Microsecond * 1) will run a refresh +// in one microsecond +func (d *DebounceableRefresher) RefreshAfter(duration time.Duration) { + d.Lock() + defer d.Unlock() + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + if d.current != nil { + d.current() + } + d.current = cancel + go func() { + timer := time.NewTimer(duration) + defer timer.Stop() + select { + case <-ctx.Done(): + // this indicates that the context was cancelled. Do nothing. + case <-timer.C: + // note this can cause multiple refreshes to happen concurrently + err := d.Refreshable.Refresh() + if err != nil { + logrus.Errorf("failed to refresh with error: %v", err) + } + } + }() +} diff --git a/pkg/debounce/refresher_test.go b/pkg/debounce/refresher_test.go new file mode 100644 index 00000000..50ad896e --- /dev/null +++ b/pkg/debounce/refresher_test.go @@ -0,0 +1,47 @@ +package debounce + +import ( + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type refreshable struct { + wasRefreshed atomic.Bool + retErr error +} + +func (r *refreshable) Refresh() error { + r.wasRefreshed.Store(true) + return r.retErr +} + +func TestRefreshAfter(t *testing.T) { + ref := refreshable{} + debounce := DebounceableRefresher{ + Refreshable: &ref, + } + debounce.RefreshAfter(time.Millisecond * 2) + debounce.RefreshAfter(time.Microsecond * 2) + time.Sleep(time.Millisecond * 1) + // test that the second refresh call overrode the first - Micro < Milli so this should have ran + require.True(t, ref.wasRefreshed.Load()) + ref.wasRefreshed.Store(false) + time.Sleep(time.Millisecond * 2) + // test that the call was debounced - though we called this twice only one refresh should be called + require.False(t, ref.wasRefreshed.Load()) + + ref = refreshable{ + retErr: fmt.Errorf("Some error"), + } + debounce = DebounceableRefresher{ + Refreshable: &ref, + } + debounce.RefreshAfter(time.Microsecond * 2) + // test the error case + time.Sleep(time.Millisecond * 1) + require.True(t, ref.wasRefreshed.Load()) +} diff --git a/pkg/schema/definitions/handler.go b/pkg/schema/definitions/handler.go index 094cb7ea..c54da3f9 100644 --- a/pkg/schema/definitions/handler.go +++ b/pkg/schema/definitions/handler.go @@ -1,18 +1,15 @@ package definitions import ( - "errors" "fmt" "net/http" "sync" - "time" "github.com/rancher/apiserver/pkg/apierror" "github.com/rancher/apiserver/pkg/types" "github.com/rancher/steve/pkg/schema/converter" "github.com/rancher/wrangler/v2/pkg/schemas/validation" - "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/runtime/schema" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/discovery" "k8s.io/kube-openapi/pkg/util/proto" ) @@ -28,15 +25,11 @@ var ( } ) -// schemaDefinitionHandler is a byID handler for a specific schema, which provides field definitions for all schemas. +// SchemaDefinitionHandler is a byID handler for a specific schema, which provides field definitions for all schemas. // Does not implement any method allowing a caller to list definitions for all schemas. -type schemaDefinitionHandler struct { +type SchemaDefinitionHandler struct { sync.RWMutex - // lastRefresh is the last time that the handler retrieved models from kubernetes. - lastRefresh time.Time - // refreshStale is the duration between lastRefresh and the next refresh of models. - refreshStale time.Duration // client is the discovery client used to get the groups/resources/fields from kubernetes. client discovery.DiscoveryInterface // models are the cached models from the last response from kubernetes. @@ -46,9 +39,32 @@ type schemaDefinitionHandler struct { schemaToModel map[string]string } +// Refresh writeLocks and updates the cache with new schemaDefinitions. Will result in a call to kubernetes to retrieve +// the openAPI schemas. +func (s *SchemaDefinitionHandler) Refresh() error { + openapi, err := s.client.OpenAPISchema() + if err != nil { + return fmt.Errorf("unable to fetch openapi definition: %w", err) + } + models, err := proto.NewOpenAPIData(openapi) + if err != nil { + return fmt.Errorf("unable to parse openapi definition into models: %w", err) + } + groups, err := s.client.ServerGroups() + if err != nil { + return fmt.Errorf("unable to retrieve groups: %w", err) + } + s.Lock() + defer s.Unlock() + nameIndex := s.indexSchemaNames(models, groups) + s.schemaToModel = nameIndex + s.models = &models + return nil +} + // byIDHandler is the Handler method for a request to get the schema definition for a specifc schema. Will use the // cached models found during the last refresh as part of this process. -func (s *schemaDefinitionHandler) byIDHandler(request *types.APIRequest) (types.APIObject, error) { +func (s *SchemaDefinitionHandler) byIDHandler(request *types.APIRequest) (types.APIObject, error) { // pseudo-access check, designed to make sure that users have access to the schema for the definition that they // are accessing. requestSchema := request.Schemas.LookupSchema(request.Name) @@ -56,14 +72,6 @@ func (s *schemaDefinitionHandler) byIDHandler(request *types.APIRequest) (types. return types.APIObject{}, apierror.NewAPIError(validation.NotFound, "no such schema") } - if s.needsRefresh() { - err := s.refresh() - if err != nil { - logrus.Errorf("error refreshing schemas %s", err.Error()) - return types.APIObject{}, apierror.NewAPIError(internalServerErrorCode, "error refreshing schemas") - } - } - // lock only in read-mode so that we don't read while refresh writes. Only use a read-lock - using a write lock // would make this endpoint only usable by one caller at a time s.RLock() @@ -100,72 +108,13 @@ func (s *schemaDefinitionHandler) byIDHandler(request *types.APIRequest) (types. }, nil } -// needsRefresh readLocks and checks if the cache needs to be refreshed. -func (s *schemaDefinitionHandler) needsRefresh() bool { - s.RLock() - defer s.RUnlock() - if s.lastRefresh.IsZero() { - return true - } - return s.lastRefresh.Add(s.refreshStale).Before(time.Now()) -} - -// refresh writeLocks and updates the cache with new schemaDefinitions. Will result in a call to kubernetes to retrieve -// the openAPI schemas. -func (s *schemaDefinitionHandler) refresh() error { - s.Lock() - defer s.Unlock() - openapi, err := s.client.OpenAPISchema() - if err != nil { - return fmt.Errorf("unable to fetch openapi definition: %w", err) - } - models, err := proto.NewOpenAPIData(openapi) - if err != nil { - return fmt.Errorf("unable to parse openapi definition into models: %w", err) - } - s.models = &models - nameIndex, err := s.indexSchemaNames(models) - // indexSchemaNames may successfully refresh some definitions, but still return an error - // in these cases, store what we could find, but still return up an error - if nameIndex != nil { - s.schemaToModel = nameIndex - s.lastRefresh = time.Now() - } - if err != nil { - return fmt.Errorf("unable to index schema name to model name: %w", err) - } - return nil -} - // indexSchemaNames returns a map of schemaID to the modelName for a given schema. Will use the preferred version of a -// resource if possible. May return a map and an error if it was able to index some schemas but not others. -func (s *schemaDefinitionHandler) indexSchemaNames(models proto.Models) (map[string]string, error) { - _, resourceLists, err := s.client.ServerGroupsAndResources() - // this may occasionally fail to discover certain groups, but we still can refresh the others in those cases - if _, ok := err.(*discovery.ErrGroupDiscoveryFailed); err != nil && !ok { - return nil, fmt.Errorf("unable to retrieve groups and resources: %w", err) - } - preferredResourceVersions := map[schema.GroupKind]string{} - for _, resourceList := range resourceLists { - if resourceList == nil { - continue - } - groupVersion, gvErr := schema.ParseGroupVersion(resourceList.GroupVersion) - // we may fail to parse the GV of one group, but can still parse out the others - if gvErr != nil { - err = errors.Join(err, fmt.Errorf("unable to parse group version %s: %w", resourceList.GroupVersion, gvErr)) - continue - } - for _, resource := range resourceList.APIResources { - gk := schema.GroupKind{ - Group: groupVersion.Group, - Kind: resource.Kind, - } - // per the resource docs, if the resource.Version is empty, the preferred version for - // this resource is the version of the APIResourceList it is in - if resource.Version == "" || resource.Version == groupVersion.Version { - preferredResourceVersions[gk] = groupVersion.Version - } +// resource if possible. Can return an error if unable to find groups. +func (s *SchemaDefinitionHandler) indexSchemaNames(models proto.Models, groups *metav1.APIGroupList) map[string]string { + preferredResourceVersions := map[string]string{} + if groups != nil { + for _, group := range groups.Groups { + preferredResourceVersions[group.Name] = group.PreferredVersion.Version } } schemaToModel := map[string]string{} @@ -181,17 +130,13 @@ func (s *schemaDefinitionHandler) indexSchemaNames(models proto.Models) (map[str // we can safely continue continue } - gk := schema.GroupKind{ - Group: gvk.Group, - Kind: gvk.Kind, - } - prefVersion, ok := preferredResourceVersions[gk] + prefVersion := preferredResourceVersions[gvk.Group] // if we don't have a known preferred version for this group or we are the preferred version // add this as the model name for the schema - if !ok || prefVersion == gvk.Version { + if prefVersion == "" || prefVersion == gvk.Version { schemaID := converter.GVKToSchemaID(*gvk) schemaToModel[schemaID] = modelName } } - return schemaToModel, err + return schemaToModel } diff --git a/pkg/schema/definitions/handler_test.go b/pkg/schema/definitions/handler_test.go index 72b8d7dd..f7a96b7d 100644 --- a/pkg/schema/definitions/handler_test.go +++ b/pkg/schema/definitions/handler_test.go @@ -3,7 +3,6 @@ package definitions import ( "fmt" "testing" - "time" openapi_v2 "github.com/google/gnostic-models/openapiv2" "github.com/rancher/apiserver/pkg/apierror" @@ -11,85 +10,105 @@ import ( wschemas "github.com/rancher/wrangler/v2/pkg/schemas" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/version" "k8s.io/client-go/discovery" "k8s.io/client-go/openapi" restclient "k8s.io/client-go/rest" + "k8s.io/kube-openapi/pkg/util/proto" ) -var globalRoleObject = types.APIObject{ - ID: "management.cattle.io.globalrole", - Type: "schemaDefinition", - Object: schemaDefinition{ - DefinitionType: "io.cattle.management.v2.GlobalRole", - Definitions: map[string]definition{ - "io.cattle.management.v2.GlobalRole": { - ResourceFields: map[string]definitionField{ - "apiVersion": { - Type: "string", - Description: "The APIVersion of this resource", - }, - "kind": { - Type: "string", - Description: "The kind", - }, - "metadata": { - Type: "io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta", - Description: "The metadata", - }, - "spec": { - Type: "io.cattle.management.v2.GlobalRole.spec", Description: "The spec for the project", - }, - }, - Type: "io.cattle.management.v2.GlobalRole", - Description: "A Global Role V2 provides Global Permissions in Rancher", - }, - "io.cattle.management.v2.GlobalRole.spec": { - ResourceFields: map[string]definitionField{ - "clusterName": { - Type: "string", - Description: "The name of the cluster", - Required: true, - }, - "displayName": { - Type: "string", - Description: "The UI readable name", - Required: true, - }, - "newField": { - Type: "string", - Description: "A new field not present in v1", - }, - "notRequired": { - Type: "boolean", - Description: "Some field that isn't required", - }, - }, - Type: "io.cattle.management.v2.GlobalRole.spec", - Description: "The spec for the project", - }, - "io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta": { - ResourceFields: map[string]definitionField{ - "annotations": { - Type: "map", - SubType: "string", - Description: "annotations of the resource", - }, - "name": { - Type: "string", - SubType: "", - Description: "name of the resource", - }, - }, - Type: "io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta", - Description: "Object Metadata", +func TestRefresh(t *testing.T) { + defaultDocument, err := openapi_v2.ParseDocument([]byte(openapi_raw)) + require.NoError(t, err) + defaultModels, err := proto.NewOpenAPIData(defaultDocument) + require.NoError(t, err) + defaultSchemaToModel := map[string]string{ + "management.cattle.io.globalrole": "io.cattle.management.v1.GlobalRole", + "noversion.cattle.io.resource": "io.cattle.noversion.v2.Resource", + "missinggroup.cattle.io.resource": "io.cattle.missinggroup.v2.Resource", + } + tests := []struct { + name string + openapiError error + serverGroupsErr error + useBadOpenApiDoc bool + nilGroups bool + wantModels *proto.Models + wantSchemaToModel map[string]string + wantError bool + }{ + { + name: "success", + wantModels: &defaultModels, + wantSchemaToModel: defaultSchemaToModel, + }, + { + name: "error - openapi doc unavailable", + openapiError: fmt.Errorf("server unavailable"), + wantError: true, + }, + { + name: "error - unable to parse openapi doc", + useBadOpenApiDoc: true, + wantError: true, + }, + { + name: "error - unable to retrieve groups and resources", + serverGroupsErr: fmt.Errorf("server not available"), + wantError: true, + }, + { + name: "no groups or error from server", + nilGroups: true, + wantModels: &defaultModels, + wantSchemaToModel: map[string]string{ + "management.cattle.io.globalrole": "io.cattle.management.v2.GlobalRole", + "noversion.cattle.io.resource": "io.cattle.noversion.v2.Resource", + "missinggroup.cattle.io.resource": "io.cattle.missinggroup.v2.Resource", }, }, - }, + } + for _, test := range tests { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + client, err := buildDefaultDiscovery() + client.DocumentErr = test.openapiError + client.GroupsErr = test.serverGroupsErr + if test.useBadOpenApiDoc { + schema := client.Document.Definitions.AdditionalProperties[0] + schema.Value.Type = &openapi_v2.TypeItem{ + Value: []string{"multiple", "entries"}, + } + } + if test.nilGroups { + client.Groups = nil + } + require.Nil(t, err) + handler := SchemaDefinitionHandler{ + client: client, + } + err = handler.Refresh() + if test.wantError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Equal(t, test.wantModels, handler.models) + require.Equal(t, test.wantSchemaToModel, handler.schemaToModel) + }) + + } } -func TestByID(t *testing.T) { +func Test_byID(t *testing.T) { + defaultDocument, err := openapi_v2.ParseDocument([]byte(openapi_raw)) + require.NoError(t, err) + defaultModels, err := proto.NewOpenAPIData(defaultDocument) + require.NoError(t, err) + defaultSchemaToModel := map[string]string{ + "management.cattle.io.globalrole": "io.cattle.management.v2.GlobalRole", + } schemas := types.EmptyAPISchemas() addBaseSchema := func(names ...string) { for _, name := range names { @@ -107,121 +126,134 @@ func TestByID(t *testing.T) { return &input } - addBaseSchema("management.cattle.io.globalrole", "management.cattle.io.missingfrommodel") + addBaseSchema("management.cattle.io.globalrole", "management.cattle.io.missingfrommodel", "management.cattle.io.notakind") tests := []struct { - name string - schemaName string - needsRefresh bool - openapiError error - serverGroupsResourcesErr error - useBadOpenApiDoc bool - unparseableGV bool - wantObject *types.APIObject - wantError bool - wantErrorCode *int + name string + schemaName string + models *proto.Models + schemaToModel map[string]string + wantObject *types.APIObject + wantError bool + wantErrorCode *int }{ { - name: "global role definition", - schemaName: "management.cattle.io.globalrole", - needsRefresh: true, - wantObject: &globalRoleObject, + name: "global role definition", + schemaName: "management.cattle.io.globalrole", + models: &defaultModels, + schemaToModel: defaultSchemaToModel, + wantObject: &types.APIObject{ + ID: "management.cattle.io.globalrole", + Type: "schemaDefinition", + Object: schemaDefinition{ + DefinitionType: "io.cattle.management.v2.GlobalRole", + Definitions: map[string]definition{ + "io.cattle.management.v2.GlobalRole": { + ResourceFields: map[string]definitionField{ + "apiVersion": { + Type: "string", + Description: "The APIVersion of this resource", + }, + "kind": { + Type: "string", + Description: "The kind", + }, + "metadata": { + Type: "io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta", + Description: "The metadata", + }, + "spec": { + Type: "io.cattle.management.v2.GlobalRole.spec", Description: "The spec for the project", + }, + }, + Type: "io.cattle.management.v2.GlobalRole", + Description: "A Global Role V2 provides Global Permissions in Rancher", + }, + "io.cattle.management.v2.GlobalRole.spec": { + ResourceFields: map[string]definitionField{ + "clusterName": { + Type: "string", + Description: "The name of the cluster", + Required: true, + }, + "displayName": { + Type: "string", + Description: "The UI readable name", + Required: true, + }, + "newField": { + Type: "string", + Description: "A new field not present in v1", + }, + "notRequired": { + Type: "boolean", + Description: "Some field that isn't required", + }, + }, + Type: "io.cattle.management.v2.GlobalRole.spec", + Description: "The spec for the project", + }, + "io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta": { + ResourceFields: map[string]definitionField{ + "annotations": { + Type: "map", + SubType: "string", + Description: "annotations of the resource", + }, + "name": { + Type: "string", + SubType: "", + Description: "name of the resource", + }, + }, + Type: "io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta", + Description: "Object Metadata", + }, + }, + }, + }, }, { name: "missing definition", schemaName: "management.cattle.io.cluster", - needsRefresh: true, + models: &defaultModels, + schemaToModel: defaultSchemaToModel, wantError: true, wantErrorCode: intPtr(404), }, { name: "not refreshed", schemaName: "management.cattle.io.globalrole", - needsRefresh: false, wantError: true, wantErrorCode: intPtr(503), }, { - name: "missing from model", + name: "has schema, missing from model", schemaName: "management.cattle.io.missingfrommodel", - needsRefresh: true, + models: &defaultModels, + schemaToModel: defaultSchemaToModel, wantError: true, wantErrorCode: intPtr(503), }, { - name: "refresh error - openapi doc unavailable", - schemaName: "management.cattle.io.globalrole", - needsRefresh: true, - openapiError: fmt.Errorf("server unavailable"), - wantError: true, - wantErrorCode: intPtr(500), - }, - { - name: "refresh error - unable to parse openapi doc", - schemaName: "management.cattle.io.globalrole", - needsRefresh: true, - useBadOpenApiDoc: true, - wantError: true, - wantErrorCode: intPtr(500), - }, - { - name: "refresh error - unable to retrieve groups and resources", - schemaName: "management.cattle.io.globalrole", - needsRefresh: true, - serverGroupsResourcesErr: fmt.Errorf("server not available"), - wantError: true, - wantErrorCode: intPtr(500), - }, - { - name: "refresh error - unable to retrieve all groups and resources", - schemaName: "management.cattle.io.globalrole", - needsRefresh: true, - serverGroupsResourcesErr: &discovery.ErrGroupDiscoveryFailed{ - Groups: map[schema.GroupVersion]error{ - { - Group: "other.cattle.io", - Version: "v1", - }: fmt.Errorf("some group error"), - }, + name: "has schema, model is not a kind", + schemaName: "management.cattle.io.notakind", + models: &defaultModels, + schemaToModel: map[string]string{ + "management.cattle.io.notakind": "io.management.cattle.NotAKind", }, wantError: true, wantErrorCode: intPtr(500), }, - { - name: "refresh error - unparesable gv", - schemaName: "management.cattle.io.globalrole", - needsRefresh: true, - unparseableGV: true, - wantError: true, - wantErrorCode: intPtr(500), - }, } for _, test := range tests { test := test t.Run(test.name, func(t *testing.T) { t.Parallel() - client, err := buildDefaultDiscovery() - client.DocumentErr = test.openapiError - client.GroupResourcesErr = test.serverGroupsResourcesErr - if test.useBadOpenApiDoc { - schema := client.Document.Definitions.AdditionalProperties[0] - schema.Value.Type = &openapi_v2.TypeItem{ - Value: []string{"multiple", "entries"}, - } - } - if test.unparseableGV { - client.Resources = append(client.Resources, &metav1.APIResourceList{ - GroupVersion: "not/parse/able", - }) - } - require.Nil(t, err) - handler := schemaDefinitionHandler{ - client: client, - } - if !test.needsRefresh { - handler.lastRefresh = time.Now() - handler.refreshStale = time.Minute * 1 + handler := SchemaDefinitionHandler{ + models: test.models, + schemaToModel: test.schemaToModel, } request := types.APIRequest{ Schemas: schemas, @@ -248,66 +280,64 @@ func buildDefaultDiscovery() (*fakeDiscovery, error) { if err != nil { return nil, fmt.Errorf("unable to parse openapi document %w", err) } - groups := []*metav1.APIGroup{ + groups := []metav1.APIGroup{ { Name: "management.cattle.io", PreferredVersion: metav1.GroupVersionForDiscovery{ - Version: "v2", + GroupVersion: "management.cattle.io/v2", + Version: "v1", }, - }, - } - resources := []*metav1.APIResourceList{ - { - GroupVersion: schema.GroupVersion{ - Group: "management.cattle.io", - Version: "v2", - }.String(), - APIResources: []metav1.APIResource{ + Versions: []metav1.GroupVersionForDiscovery{ { - Group: "management.cattle.io", - Kind: "GlobalRole", - Version: "v2", + GroupVersion: "management.cattle.io/v1", + Version: "v1", + }, + { + GroupVersion: "management.cattle.io/v2", + Version: "v2", }, }, }, { - GroupVersion: schema.GroupVersion{ - Group: "management.cattle.io", - Version: "v1", - }.String(), - APIResources: []metav1.APIResource{ + Name: "noversion.cattle.io", + Versions: []metav1.GroupVersionForDiscovery{ + { + GroupVersion: "noversion.cattle.io/v1", + Version: "v1", + }, { - Group: "management.cattle.io", - Kind: "GlobalRole", - Version: "v2", + GroupVersion: "noversion.cattle.io/v2", + Version: "v2", }, }, }, - nil, } return &fakeDiscovery{ - Groups: groups, - Resources: resources, - Document: document, + Groups: &metav1.APIGroupList{ + Groups: groups, + }, + Document: document, }, nil } type fakeDiscovery struct { - Groups []*metav1.APIGroup - Resources []*metav1.APIResourceList - Document *openapi_v2.Document - GroupResourcesErr error - DocumentErr error + Groups *metav1.APIGroupList + Document *openapi_v2.Document + GroupsErr error + DocumentErr error } -// ServerGroupsAndResources is the only method we actually need for the test - just returns what is on the struct -func (f *fakeDiscovery) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) { - return f.Groups, f.Resources, f.GroupResourcesErr +// ServerGroups is the only method that needs to be mocked +func (f *fakeDiscovery) ServerGroups() (*metav1.APIGroupList, error) { + return f.Groups, f.GroupsErr } // The rest of these methods are just here to conform to discovery.DiscoveryInterface -func (f *fakeDiscovery) RESTClient() restclient.Interface { return nil } -func (f *fakeDiscovery) ServerGroups() (*metav1.APIGroupList, error) { return nil, nil } +func (f *fakeDiscovery) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) { + return nil, nil, nil +} + +func (f *fakeDiscovery) RESTClient() restclient.Interface { return nil } func (f *fakeDiscovery) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) { return nil, nil } diff --git a/pkg/schema/definitions/openapi_test.go b/pkg/schema/definitions/openapi_test.go index 40487349..1ab4fac3 100644 --- a/pkg/schema/definitions/openapi_test.go +++ b/pkg/schema/definitions/openapi_test.go @@ -82,7 +82,129 @@ definitions: - group: "management.cattle.io" version: "v2" kind: "GlobalRole" - io.management.cattle.NotAKind: + io.cattle.noversion.v2.Resource: + description: "A No Version V2 resource is for a group with no preferred version" + type: "object" + properties: + apiVersion: + description: "The APIVersion of this resource" + type: "string" + kind: + description: "The kind" + type: "string" + metadata: + description: "The metadata" + $ref: "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta" + spec: + description: "The spec for the resource" + type: "object" + required: + - "name" + properties: + name: + description: "The name of the resource" + type: "string" + notRequired: + description: "Some field that isn't required" + type: "boolean" + newField: + description: "A new field not present in v1" + type: "string" + x-kubernetes-group-version-kind: + - group: "noversion.cattle.io" + version: "v2" + kind: "Resource" + io.cattle.noversion.v1.Resource: + description: "A No Version V1 resource is for a group with no preferred version" + type: "object" + properties: + apiVersion: + description: "The APIVersion of this resource" + type: "string" + kind: + description: "The kind" + type: "string" + metadata: + description: "The metadata" + $ref: "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta" + spec: + description: "The spec for the resource" + type: "object" + required: + - "name" + properties: + name: + description: "The name of the resource" + type: "string" + notRequired: + description: "Some field that isn't required" + type: "boolean" + x-kubernetes-group-version-kind: + - group: "noversion.cattle.io" + version: "v1" + kind: "Resource" + io.cattle.missinggroup.v2.Resource: + description: "A Missing Group V2 resource is for a group not listed by server groups" + type: "object" + properties: + apiVersion: + description: "The APIVersion of this resource" + type: "string" + kind: + description: "The kind" + type: "string" + metadata: + description: "The metadata" + $ref: "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta" + spec: + description: "The spec for the resource" + type: "object" + required: + - "name" + properties: + name: + description: "The name of the resource" + type: "string" + notRequired: + description: "Some field that isn't required" + type: "boolean" + newField: + description: "A new field not present in v1" + type: "string" + x-kubernetes-group-version-kind: + - group: "missinggroup.cattle.io" + version: "v2" + kind: "Resource" + io.cattle.missinggroup.v1.Resource: + description: "A Missing Group V1 resource is for a group not listed by server groups" + type: "object" + properties: + apiVersion: + description: "The APIVersion of this resource" + type: "string" + kind: + description: "The kind" + type: "string" + metadata: + description: "The metadata" + $ref: "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta" + spec: + description: "The spec for the resource" + type: "object" + required: + - "name" + properties: + name: + description: "The name of the resource" + type: "string" + notRequired: + description: "Some field that isn't required" + type: "boolean" + x-kubernetes-group-version-kind: + - group: "missinggroup.cattle.io" + version: "v1" + kind: "Resource" + io.cattle.management.NotAKind: type: "string" description: "Some string which isn't a kind" io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta: diff --git a/pkg/schema/definitions/refresh.go b/pkg/schema/definitions/refresh.go new file mode 100644 index 00000000..394873cb --- /dev/null +++ b/pkg/schema/definitions/refresh.go @@ -0,0 +1,48 @@ +package definitions + +import ( + "context" + "time" + + "github.com/rancher/steve/pkg/debounce" + apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiregv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" +) + +// refreshHandler triggers refreshes for a Debounceable refresher after a CRD/APIService has been changed +// intended to refresh the schema definitions after a CRD has been added and is hopefully available in k8s. +type refreshHandler struct { + // debounceRef is the debounceableRefresher containing the Refreshable (typically the schema definition handler) + debounceRef *debounce.DebounceableRefresher + // debounceDuration is the duration that the handler should ask the DebounceableRefresher to wait before refreshing + debounceDuration time.Duration +} + +// onChangeCRD refreshes the debounceRef after a CRD is added/changed +func (r *refreshHandler) onChangeCRD(key string, crd *apiextv1.CustomResourceDefinition) (*apiextv1.CustomResourceDefinition, error) { + r.debounceRef.RefreshAfter(r.debounceDuration) + return crd, nil +} + +// onChangeAPIService refreshes the debounceRef after an APIService is added/changed +func (r *refreshHandler) onChangeAPIService(key string, api *apiregv1.APIService) (*apiregv1.APIService, error) { + r.debounceRef.RefreshAfter(r.debounceDuration) + return api, nil +} + +// startBackgroundRefresh starts a force refresh that runs for every tick of duration. Can be stopped +// by cancelling the context +func (r *refreshHandler) startBackgroundRefresh(ctx context.Context, duration time.Duration) { + go func() { + ticker := time.NewTicker(duration) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + r.debounceRef.RefreshAfter(r.debounceDuration) + } + } + }() +} diff --git a/pkg/schema/definitions/refresh_test.go b/pkg/schema/definitions/refresh_test.go new file mode 100644 index 00000000..627aecc8 --- /dev/null +++ b/pkg/schema/definitions/refresh_test.go @@ -0,0 +1,84 @@ +package definitions + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/rancher/steve/pkg/debounce" + "github.com/stretchr/testify/require" + apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apiregv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" +) + +type refreshable struct { + wasRefreshed atomic.Bool +} + +func (r *refreshable) Refresh() error { + r.wasRefreshed.Store(true) + return nil +} + +func Test_onChangeCRD(t *testing.T) { + internalRefresh := refreshable{} + refresher := debounce.DebounceableRefresher{ + Refreshable: &internalRefresh, + } + refreshHandler := refreshHandler{ + debounceRef: &refresher, + debounceDuration: time.Microsecond * 5, + } + input := apiextv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-crd", + }, + } + output, err := refreshHandler.onChangeCRD("test-crd", &input) + require.Nil(t, err) + require.Equal(t, input, *output) + // waiting to allow the debouncer to refresh the refreshable + time.Sleep(time.Millisecond * 2) + require.True(t, internalRefresh.wasRefreshed.Load()) +} + +func Test_onChangeAPIService(t *testing.T) { + internalRefresh := refreshable{} + refresher := debounce.DebounceableRefresher{ + Refreshable: &internalRefresh, + } + refreshHandler := refreshHandler{ + debounceRef: &refresher, + debounceDuration: time.Microsecond * 5, + } + input := apiregv1.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-apiservice", + }, + } + output, err := refreshHandler.onChangeAPIService("test-apiservice", &input) + require.Nil(t, err) + require.Equal(t, input, *output) + // waiting to allow the debouncer to refresh the refreshable + time.Sleep(time.Millisecond * 2) + require.True(t, internalRefresh.wasRefreshed.Load()) + +} + +func Test_startBackgroundRefresh(t *testing.T) { + internalRefresh := refreshable{} + refresher := debounce.DebounceableRefresher{ + Refreshable: &internalRefresh, + } + refreshHandler := refreshHandler{ + debounceRef: &refresher, + debounceDuration: time.Microsecond * 5, + } + ctx, cancel := context.WithCancel(context.Background()) + refreshHandler.startBackgroundRefresh(ctx, time.Microsecond*10) + time.Sleep(time.Millisecond * 2) + require.True(t, internalRefresh.wasRefreshed.Load()) + cancel() +} diff --git a/pkg/schema/definitions/schema.go b/pkg/schema/definitions/schema.go index b6ddf44c..b9ba1ebc 100644 --- a/pkg/schema/definitions/schema.go +++ b/pkg/schema/definitions/schema.go @@ -1,37 +1,30 @@ package definitions import ( + "context" + "os" + "strconv" "time" "github.com/rancher/apiserver/pkg/types" + "github.com/rancher/steve/pkg/debounce" + apiextcontrollerv1 "github.com/rancher/wrangler/v2/pkg/generated/controllers/apiextensions.k8s.io/v1" + v1 "github.com/rancher/wrangler/v2/pkg/generated/controllers/apiregistration.k8s.io/v1" "github.com/rancher/wrangler/v2/pkg/schemas" + "github.com/sirupsen/logrus" "k8s.io/client-go/discovery" ) const ( - gvkExtensionName = "x-kubernetes-group-version-kind" - gvkExtensionGroup = "group" - gvkExtensionVersion = "version" - gvkExtensionKind = "kind" - defaultDuration = time.Second * 5 + handlerKey = "schema-definitions" + delayEnvVar = "CATTLE_CRD_REFRESH_DELAY_SECONDS" + defaultDelay = 2 + delayUnit = time.Second + refreshEnvVar = "CATTLE_BACKGROUND_REFRESH_MINUTES" + defaultRefresh = 10 + refreshUnit = time.Minute ) -// Register registers the schemaDefinition schema. -func Register(baseSchema *types.APISchemas, client discovery.DiscoveryInterface) { - handler := schemaDefinitionHandler{ - client: client, - refreshStale: defaultDuration, - } - baseSchema.MustAddSchema(types.APISchema{ - Schema: &schemas.Schema{ - ID: "schemaDefinition", - PluralName: "schemaDefinitions", - ResourceMethods: []string{"GET"}, - }, - ByIDHandler: handler.byIDHandler, - }) -} - type schemaDefinition struct { DefinitionType string `json:"definitionType"` Definitions map[string]definition `json:"definitions"` @@ -49,3 +42,55 @@ type definitionField struct { Description string `json:"description,omitempty"` Required bool `json:"required,omitempty"` } + +// Register registers the schemaDefinition schema. +func Register(ctx context.Context, + baseSchema *types.APISchemas, + client discovery.DiscoveryInterface, + crd apiextcontrollerv1.CustomResourceDefinitionController, + apiService v1.APIServiceController) { + handler := SchemaDefinitionHandler{ + client: client, + } + baseSchema.MustAddSchema(types.APISchema{ + Schema: &schemas.Schema{ + ID: "schemaDefinition", + PluralName: "schemaDefinitions", + ResourceMethods: []string{"GET"}, + }, + ByIDHandler: handler.byIDHandler, + }) + + debounce := debounce.DebounceableRefresher{ + Refreshable: &handler, + } + crdDebounce := getDurationEnvVarOrDefault(delayEnvVar, defaultDelay, delayUnit) + refHandler := refreshHandler{ + debounceRef: &debounce, + debounceDuration: crdDebounce, + } + crd.OnChange(ctx, handlerKey, refHandler.onChangeCRD) + apiService.OnChange(ctx, handlerKey, refHandler.onChangeAPIService) + refreshFrequency := getDurationEnvVarOrDefault(refreshEnvVar, defaultRefresh, refreshUnit) + // there's a delay between when a CRD is created and when it is available in the openapi/v2 endpoint + // the crd/apiservice controllers use a delay of 2 seconds to account for this, but it's possible that this isn't + // enough in certain environments, so we also use an infrequent background refresh to eventually correct any misses + refHandler.startBackgroundRefresh(ctx, refreshFrequency) +} + +// getDurationEnvVarOrDefault gets the duration value for a given envVar. If not found, it returns the provided default. +// unit is the unit of time (time.Second/time.Minute/etc.) that the returned duration should be in +func getDurationEnvVarOrDefault(envVar string, defaultVal int, unit time.Duration) time.Duration { + defaultDuration := time.Duration(defaultVal) * unit + envValue, ok := os.LookupEnv(envVar) + if !ok { + return defaultDuration + } + parsed, err := strconv.Atoi(envValue) + if err != nil { + logrus.Errorf("Env var %s was specified, but could not be converted to an int, default of %d seconds will be used", + envVar, int64(defaultDuration.Seconds())) + return defaultDuration + } + return time.Duration(parsed) * unit +} diff --git a/pkg/schema/definitions/schema_test.go b/pkg/schema/definitions/schema_test.go new file mode 100644 index 00000000..b47d6108 --- /dev/null +++ b/pkg/schema/definitions/schema_test.go @@ -0,0 +1,76 @@ +package definitions + +import ( + "context" + "os" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/rancher/apiserver/pkg/types" + "github.com/rancher/wrangler/v2/pkg/generic/fake" + "github.com/stretchr/testify/require" + apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiregv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" +) + +func TestRegister(t *testing.T) { + schemas := types.EmptyAPISchemas() + client := fakeDiscovery{} + ctrl := gomock.NewController(t) + crdController := fake.NewMockNonNamespacedControllerInterface[*apiextv1.CustomResourceDefinition, *apiextv1.CustomResourceDefinitionList](ctrl) + apisvcController := fake.NewMockNonNamespacedControllerInterface[*apiregv1.APIService, *apiregv1.APIServiceList](ctrl) + ctx, cancel := context.WithCancel(context.Background()) + crdController.EXPECT().OnChange(ctx, handlerKey, gomock.Any()) + apisvcController.EXPECT().OnChange(ctx, handlerKey, gomock.Any()) + Register(ctx, schemas, &client, crdController, apisvcController) + registeredSchema := schemas.LookupSchema("schemaDefinition") + require.NotNil(t, registeredSchema) + require.Len(t, registeredSchema.ResourceMethods, 1) + require.Equal(t, registeredSchema.ResourceMethods[0], "GET") + require.NotNil(t, registeredSchema.ByIDHandler) + // Register will spawn a background thread, so we want to stop that to not impact other tests + cancel() +} + +func Test_getDurationEnvVarOrDefault(t *testing.T) { + os.Setenv("VALID", "1") + os.Setenv("INVALID", "NOTANUMBER") + tests := []struct { + name string + envVar string + defaultValue int + unit time.Duration + wantDuration time.Duration + }{ + { + name: "not found, use default", + envVar: "NOT_FOUND", + defaultValue: 12, + unit: time.Second, + wantDuration: time.Second * 12, + }, + { + name: "found but not an int", + envVar: "INVALID", + defaultValue: 24, + unit: time.Minute, + wantDuration: time.Minute * 24, + }, + { + name: "found and valid int", + envVar: "VALID", + defaultValue: 30, + unit: time.Hour, + wantDuration: time.Hour * 1, + }, + } + for _, test := range tests { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + got := getDurationEnvVarOrDefault(test.envVar, test.defaultValue, test.unit) + require.Equal(t, test.wantDuration, got) + }) + } +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 88ee2da4..2097aea2 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -142,7 +142,8 @@ func setup(ctx context.Context, server *Server) error { if err = resources.DefaultSchemas(ctx, server.BaseSchemas, ccache, server.ClientFactory, sf, server.Version); err != nil { return err } - definitions.Register(server.BaseSchemas, server.controllers.K8s.Discovery()) + definitions.Register(ctx, server.BaseSchemas, server.controllers.K8s.Discovery(), + server.controllers.CRD.CustomResourceDefinition(), server.controllers.API.APIService()) summaryCache := summarycache.New(sf, ccache) summaryCache.Start(ctx)