Skip to content

Commit

Permalink
Merge pull request #161 from MbolotSuse/resource-schema-improved-cache
Browse files Browse the repository at this point in the history
Resource schema improved cache
  • Loading branch information
MbolotSuse authored Mar 14, 2024
2 parents ca29f47 + b761846 commit 870824d
Show file tree
Hide file tree
Showing 10 changed files with 751 additions and 302 deletions.
51 changes: 51 additions & 0 deletions pkg/debounce/refresher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package debounce

import (
"context"
"sync"
"time"

"github.com/sirupsen/logrus"
)

// Refreshable represents an object which can be refreshed. This should be protected by a mutex for concurrent operation.
type Refreshable interface {
Refresh() error
}

// DebounceableRefresher is used to debounce multiple attempts to refresh a refreshable type.
type DebounceableRefresher struct {
sync.Mutex
// Refreshable is any type that can be refreshed. The refresh method should by protected by a mutex internally.
Refreshable Refreshable
current context.CancelFunc
}

// RefreshAfter requests a refresh after a certain time has passed. Subsequent calls to this method will
// delay the requested refresh by the new duration. Note that this is a total override of the previous calls - calling
// RefreshAfter(time.Second * 2) and then immediately calling RefreshAfter(time.Microsecond * 1) will run a refresh
// in one microsecond
func (d *DebounceableRefresher) RefreshAfter(duration time.Duration) {
d.Lock()
defer d.Unlock()
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
if d.current != nil {
d.current()
}
d.current = cancel
go func() {
timer := time.NewTimer(duration)
defer timer.Stop()
select {
case <-ctx.Done():
// this indicates that the context was cancelled. Do nothing.
case <-timer.C:
// note this can cause multiple refreshes to happen concurrently
err := d.Refreshable.Refresh()
if err != nil {
logrus.Errorf("failed to refresh with error: %v", err)
}
}
}()
}
47 changes: 47 additions & 0 deletions pkg/debounce/refresher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package debounce

import (
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
)

type refreshable struct {
wasRefreshed atomic.Bool
retErr error
}

func (r *refreshable) Refresh() error {
r.wasRefreshed.Store(true)
return r.retErr
}

func TestRefreshAfter(t *testing.T) {
ref := refreshable{}
debounce := DebounceableRefresher{
Refreshable: &ref,
}
debounce.RefreshAfter(time.Millisecond * 2)
debounce.RefreshAfter(time.Microsecond * 2)
time.Sleep(time.Millisecond * 1)
// test that the second refresh call overrode the first - Micro < Milli so this should have ran
require.True(t, ref.wasRefreshed.Load())
ref.wasRefreshed.Store(false)
time.Sleep(time.Millisecond * 2)
// test that the call was debounced - though we called this twice only one refresh should be called
require.False(t, ref.wasRefreshed.Load())

ref = refreshable{
retErr: fmt.Errorf("Some error"),
}
debounce = DebounceableRefresher{
Refreshable: &ref,
}
debounce.RefreshAfter(time.Microsecond * 2)
// test the error case
time.Sleep(time.Millisecond * 1)
require.True(t, ref.wasRefreshed.Load())
}
127 changes: 36 additions & 91 deletions pkg/schema/definitions/handler.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
package definitions

import (
"errors"
"fmt"
"net/http"
"sync"
"time"

"github.com/rancher/apiserver/pkg/apierror"
"github.com/rancher/apiserver/pkg/types"
"github.com/rancher/steve/pkg/schema/converter"
"github.com/rancher/wrangler/v2/pkg/schemas/validation"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/runtime/schema"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/discovery"
"k8s.io/kube-openapi/pkg/util/proto"
)
Expand All @@ -28,15 +25,11 @@ var (
}
)

// schemaDefinitionHandler is a byID handler for a specific schema, which provides field definitions for all schemas.
// SchemaDefinitionHandler is a byID handler for a specific schema, which provides field definitions for all schemas.
// Does not implement any method allowing a caller to list definitions for all schemas.
type schemaDefinitionHandler struct {
type SchemaDefinitionHandler struct {
sync.RWMutex

// lastRefresh is the last time that the handler retrieved models from kubernetes.
lastRefresh time.Time
// refreshStale is the duration between lastRefresh and the next refresh of models.
refreshStale time.Duration
// client is the discovery client used to get the groups/resources/fields from kubernetes.
client discovery.DiscoveryInterface
// models are the cached models from the last response from kubernetes.
Expand All @@ -46,24 +39,39 @@ type schemaDefinitionHandler struct {
schemaToModel map[string]string
}

// Refresh writeLocks and updates the cache with new schemaDefinitions. Will result in a call to kubernetes to retrieve
// the openAPI schemas.
func (s *SchemaDefinitionHandler) Refresh() error {
openapi, err := s.client.OpenAPISchema()
if err != nil {
return fmt.Errorf("unable to fetch openapi definition: %w", err)
}
models, err := proto.NewOpenAPIData(openapi)
if err != nil {
return fmt.Errorf("unable to parse openapi definition into models: %w", err)
}
groups, err := s.client.ServerGroups()
if err != nil {
return fmt.Errorf("unable to retrieve groups: %w", err)
}
s.Lock()
defer s.Unlock()
nameIndex := s.indexSchemaNames(models, groups)
s.schemaToModel = nameIndex
s.models = &models
return nil
}

// byIDHandler is the Handler method for a request to get the schema definition for a specifc schema. Will use the
// cached models found during the last refresh as part of this process.
func (s *schemaDefinitionHandler) byIDHandler(request *types.APIRequest) (types.APIObject, error) {
func (s *SchemaDefinitionHandler) byIDHandler(request *types.APIRequest) (types.APIObject, error) {
// pseudo-access check, designed to make sure that users have access to the schema for the definition that they
// are accessing.
requestSchema := request.Schemas.LookupSchema(request.Name)
if requestSchema == nil {
return types.APIObject{}, apierror.NewAPIError(validation.NotFound, "no such schema")
}

if s.needsRefresh() {
err := s.refresh()
if err != nil {
logrus.Errorf("error refreshing schemas %s", err.Error())
return types.APIObject{}, apierror.NewAPIError(internalServerErrorCode, "error refreshing schemas")
}
}

// lock only in read-mode so that we don't read while refresh writes. Only use a read-lock - using a write lock
// would make this endpoint only usable by one caller at a time
s.RLock()
Expand Down Expand Up @@ -100,72 +108,13 @@ func (s *schemaDefinitionHandler) byIDHandler(request *types.APIRequest) (types.
}, nil
}

// needsRefresh readLocks and checks if the cache needs to be refreshed.
func (s *schemaDefinitionHandler) needsRefresh() bool {
s.RLock()
defer s.RUnlock()
if s.lastRefresh.IsZero() {
return true
}
return s.lastRefresh.Add(s.refreshStale).Before(time.Now())
}

// refresh writeLocks and updates the cache with new schemaDefinitions. Will result in a call to kubernetes to retrieve
// the openAPI schemas.
func (s *schemaDefinitionHandler) refresh() error {
s.Lock()
defer s.Unlock()
openapi, err := s.client.OpenAPISchema()
if err != nil {
return fmt.Errorf("unable to fetch openapi definition: %w", err)
}
models, err := proto.NewOpenAPIData(openapi)
if err != nil {
return fmt.Errorf("unable to parse openapi definition into models: %w", err)
}
s.models = &models
nameIndex, err := s.indexSchemaNames(models)
// indexSchemaNames may successfully refresh some definitions, but still return an error
// in these cases, store what we could find, but still return up an error
if nameIndex != nil {
s.schemaToModel = nameIndex
s.lastRefresh = time.Now()
}
if err != nil {
return fmt.Errorf("unable to index schema name to model name: %w", err)
}
return nil
}

// indexSchemaNames returns a map of schemaID to the modelName for a given schema. Will use the preferred version of a
// resource if possible. May return a map and an error if it was able to index some schemas but not others.
func (s *schemaDefinitionHandler) indexSchemaNames(models proto.Models) (map[string]string, error) {
_, resourceLists, err := s.client.ServerGroupsAndResources()
// this may occasionally fail to discover certain groups, but we still can refresh the others in those cases
if _, ok := err.(*discovery.ErrGroupDiscoveryFailed); err != nil && !ok {
return nil, fmt.Errorf("unable to retrieve groups and resources: %w", err)
}
preferredResourceVersions := map[schema.GroupKind]string{}
for _, resourceList := range resourceLists {
if resourceList == nil {
continue
}
groupVersion, gvErr := schema.ParseGroupVersion(resourceList.GroupVersion)
// we may fail to parse the GV of one group, but can still parse out the others
if gvErr != nil {
err = errors.Join(err, fmt.Errorf("unable to parse group version %s: %w", resourceList.GroupVersion, gvErr))
continue
}
for _, resource := range resourceList.APIResources {
gk := schema.GroupKind{
Group: groupVersion.Group,
Kind: resource.Kind,
}
// per the resource docs, if the resource.Version is empty, the preferred version for
// this resource is the version of the APIResourceList it is in
if resource.Version == "" || resource.Version == groupVersion.Version {
preferredResourceVersions[gk] = groupVersion.Version
}
// resource if possible. Can return an error if unable to find groups.
func (s *SchemaDefinitionHandler) indexSchemaNames(models proto.Models, groups *metav1.APIGroupList) map[string]string {
preferredResourceVersions := map[string]string{}
if groups != nil {
for _, group := range groups.Groups {
preferredResourceVersions[group.Name] = group.PreferredVersion.Version
}
}
schemaToModel := map[string]string{}
Expand All @@ -181,17 +130,13 @@ func (s *schemaDefinitionHandler) indexSchemaNames(models proto.Models) (map[str
// we can safely continue
continue
}
gk := schema.GroupKind{
Group: gvk.Group,
Kind: gvk.Kind,
}
prefVersion, ok := preferredResourceVersions[gk]
prefVersion := preferredResourceVersions[gvk.Group]
// if we don't have a known preferred version for this group or we are the preferred version
// add this as the model name for the schema
if !ok || prefVersion == gvk.Version {
if prefVersion == "" || prefVersion == gvk.Version {
schemaID := converter.GVKToSchemaID(*gvk)
schemaToModel[schemaID] = modelName
}
}
return schemaToModel, err
return schemaToModel
}
Loading

0 comments on commit 870824d

Please sign in to comment.