diff --git a/README.md b/README.md index c034e4c3..e91b143f 100644 --- a/README.md +++ b/README.md @@ -108,7 +108,7 @@ is empty. #### `filter` Filter results by a designated field. Filter keys use dot notation to denote -the subfield of an object to filter on. The filter value is matched as a +the subfield of an object to filter on. The filter value is normally matched as a substring. Example, filtering by object name: @@ -117,6 +117,23 @@ Example, filtering by object name: /v1/{type}?filter=metadata.name=foo ``` +if a target value is surrounded by single-quotes, it succeeds only on an exact match: + +Example, filtering by object name: + +``` +/v1/{type}?filter=metadata.name='match-this-exactly' +``` +``` + +A target value can be delimited by double-quotes, but this will succeed on a partial match: + +Example, filtering by object name: + +``` +/v1/{type}?filter=metadata.name="can-be-a-substri" +``` + One filter can list multiple possible fields to match, these are ORed together: ``` diff --git a/go.mod b/go.mod index 36c7be58..0e2d70f4 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/adrg/xdg v0.5.0 github.com/golang/protobuf v1.5.4 github.com/google/gnostic-models v0.6.8 + github.com/google/go-cmp v0.6.0 github.com/gorilla/mux v1.8.1 github.com/gorilla/websocket v1.5.1 github.com/pborman/uuid v1.2.1 @@ -22,7 +23,7 @@ require ( github.com/rancher/apiserver v0.0.0-20241009200134-5a4ecca7b988 github.com/rancher/dynamiclistener v0.6.1-rc.2 github.com/rancher/kubernetes-provider-detector v0.1.5 - github.com/rancher/lasso v0.0.0-20241202185148-04649f379358 + github.com/rancher/lasso v0.0.0-20250116001102-9e2b68739ccc github.com/rancher/norman v0.0.0-20241001183610-78a520c160ab github.com/rancher/remotedialer v0.3.2 github.com/rancher/wrangler/v3 v3.0.1-rc.2 @@ -78,7 +79,6 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/google/cel-go v0.20.1 // indirect - github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect diff --git a/go.sum b/go.sum index 3b37583b..325642c2 100644 --- a/go.sum +++ b/go.sum @@ -230,8 +230,8 @@ github.com/rancher/dynamiclistener v0.6.1-rc.2 h1:PTKNKcYXZjc/lo40EivRcXuEbCXwjp github.com/rancher/dynamiclistener v0.6.1-rc.2/go.mod h1:0KhUMHy3VcGMGavTY3i1/Mr8rVM02wFqNlUzjc+Cplg= github.com/rancher/kubernetes-provider-detector v0.1.5 h1:hWRAsWuJOemzGjz/XrbTlM7QmfO4OedvFE3QwXiH60I= github.com/rancher/kubernetes-provider-detector v0.1.5/go.mod h1:ypuJS7kP7rUiAn330xG46mj+Nhvym05GM8NqMVekpH0= -github.com/rancher/lasso v0.0.0-20241202185148-04649f379358 h1:pJwgJXPt4fi0ysXsJcl28rvxhx/Z/9SNCDwFOEyeGu0= -github.com/rancher/lasso v0.0.0-20241202185148-04649f379358/go.mod h1:IxgTBO55lziYhTEETyVKiT8/B5Rg92qYiRmcIIYoPgI= +github.com/rancher/lasso v0.0.0-20250116001102-9e2b68739ccc h1:5Hb++qaO2TJJCr4z2Ks0GrR5wXVxmr9sdiAFr7iItSg= +github.com/rancher/lasso v0.0.0-20250116001102-9e2b68739ccc/go.mod h1:IxgTBO55lziYhTEETyVKiT8/B5Rg92qYiRmcIIYoPgI= github.com/rancher/norman v0.0.0-20241001183610-78a520c160ab h1:ihK6See3y/JilqZlc0CG7NXPN+ue5nY9U7xUZUA8M7I= github.com/rancher/norman v0.0.0-20241001183610-78a520c160ab/go.mod h1:qX/OG/4wY27xSAcSdRilUBxBumV6Ey2CWpAeaKnBQDs= github.com/rancher/remotedialer v0.3.2 h1:kstZbRwPS5gPWpGg8VjEHT2poHtArs+Fc317YM8JCzU= diff --git a/pkg/resources/virtual/virtual_test.go b/pkg/resources/virtual/virtual_test.go index f9f05d27..3dee8ad7 100644 --- a/pkg/resources/virtual/virtual_test.go +++ b/pkg/resources/virtual/virtual_test.go @@ -1,17 +1,17 @@ package virtual_test import ( - "github.com/rancher/steve/pkg/resources/virtual" - "k8s.io/apimachinery/pkg/runtime/schema" "strings" "testing" + "github.com/rancher/steve/pkg/resources/virtual" "github.com/rancher/steve/pkg/resources/virtual/common" "github.com/rancher/steve/pkg/summarycache" "github.com/rancher/wrangler/v3/pkg/summary" "github.com/stretchr/testify/require" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" ) func TestTransformChain(t *testing.T) { diff --git a/pkg/server/cli/clicontext.go b/pkg/server/cli/clicontext.go index 56c644d2..d0ddbd60 100644 --- a/pkg/server/cli/clicontext.go +++ b/pkg/server/cli/clicontext.go @@ -19,6 +19,8 @@ type Config struct { HTTPListenPort int UIPath string + SQLCache bool + WebhookConfig authcli.WebhookConfig } @@ -57,6 +59,10 @@ func (c *Config) ToServer(ctx context.Context, sqlCache bool) (*server.Server, e func Flags(config *Config) []cli.Flag { flags := []cli.Flag{ + cli.BoolFlag{ + Name: "sqlcache", + Destination: &config.SQLCache, + }, cli.StringFlag{ Name: "kubeconfig", EnvVar: "KUBECONFIG", diff --git a/pkg/sqlcache/informer/factory/informer_factory.go b/pkg/sqlcache/informer/factory/informer_factory.go index a8f3d266..99932627 100644 --- a/pkg/sqlcache/informer/factory/informer_factory.go +++ b/pkg/sqlcache/informer/factory/informer_factory.go @@ -54,6 +54,7 @@ type DBClient interface { type Cache struct { informer.ByOptionsLister + informer.Watcher } type connector interface { @@ -152,7 +153,7 @@ func (f *CacheFactory) CacheFor(fields [][]string, transform cache.TransformFunc } // At this point the informer is ready, return it - return Cache{ByOptionsLister: gi.informer}, nil + return Cache{ByOptionsLister: gi.informer, Watcher: gi.informer}, nil } // Reset closes the stopCh which stops any running informers, assigns a new stopCh, resets the GVK-informer cache, and resets diff --git a/pkg/sqlcache/informer/indexer.go b/pkg/sqlcache/informer/indexer.go index 7ed4451b..b07dd845 100644 --- a/pkg/sqlcache/informer/indexer.go +++ b/pkg/sqlcache/informer/indexer.go @@ -68,7 +68,7 @@ type Store interface { GetByKey(key string) (item any, exists bool, err error) GetName() string RegisterAfterUpsert(f func(key string, obj any, tx db.TXClient) error) - RegisterAfterDelete(f func(key string, tx db.TXClient) error) + RegisterAfterDelete(f func(key string, obj any, tx db.TXClient) error) GetShouldEncrypt() bool GetType() reflect.Type } diff --git a/pkg/sqlcache/informer/informer.go b/pkg/sqlcache/informer/informer.go index a74c7029..92237eeb 100644 --- a/pkg/sqlcache/informer/informer.go +++ b/pkg/sqlcache/informer/informer.go @@ -23,13 +23,19 @@ import ( type Informer struct { cache.SharedIndexInformer ByOptionsLister + + listeners *listeners } type ByOptionsLister interface { ListByOptions(ctx context.Context, lo ListOptions, partitions []partition.Partition, namespace string) (*unstructured.UnstructuredList, int, string, error) } -// this is set to a var so that it can be overridden by test code for mocking purposes +type Watcher interface { + Watch(ctx context.Context, listener Listener) int +} + +// this is set to a var so that it can be overriden by test code for mocking purposes var newInformer = cache.NewSharedIndexInformer // NewInformer returns a new SQLite-backed Informer for the type specified by schema in unstructured.Unstructured form @@ -70,12 +76,26 @@ func NewInformer(client dynamic.ResourceInterface, fields [][]string, transform return nil, err } + listeners := newlisteners() + sii.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + listeners.Notify(loi.resourceVersionCache.getLatest()) + }, + UpdateFunc: func(obj any, newObj any) { + listeners.Notify(loi.resourceVersionCache.getLatest()) + }, + DeleteFunc: func(obj any) { + listeners.Notify(loi.resourceVersionCache.getLatest()) + }, + }) + // HACK: replace the default informer's indexer with the SQL based one UnsafeSet(sii, "indexer", loi) return &Informer{ SharedIndexInformer: sii, ByOptionsLister: loi, + listeners: listeners, }, nil } @@ -86,7 +106,17 @@ func NewInformer(client dynamic.ResourceInterface, fields [][]string, transform // - a continue token, if there are more pages after the returned one // - an error instead of all of the above if anything went wrong func (i *Informer) ListByOptions(ctx context.Context, lo ListOptions, partitions []partition.Partition, namespace string) (*unstructured.UnstructuredList, int, string, error) { - return i.ByOptionsLister.ListByOptions(ctx, lo, partitions, namespace) + list, total, continueToken, err := i.ByOptionsLister.ListByOptions(ctx, lo, partitions, namespace) + return list, total, continueToken, err +} + +func (i *Informer) Watch(ctx context.Context, listener Listener) int { + revision := i.listeners.AddListener(listener) + go func() { + <-ctx.Done() + i.listeners.RemoveListener(listener) + }() + return revision } func informerNameFromGVK(gvk schema.GroupVersionKind) string { diff --git a/pkg/sqlcache/informer/listoption_indexer.go b/pkg/sqlcache/informer/listoption_indexer.go index 93ce5f08..02e4fcac 100644 --- a/pkg/sqlcache/informer/listoption_indexer.go +++ b/pkg/sqlcache/informer/listoption_indexer.go @@ -10,8 +10,10 @@ import ( "sort" "strconv" "strings" + "sync" "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/client-go/tools/cache" @@ -35,6 +37,8 @@ type ListOptionIndexer struct { deleteFieldStmt *sql.Stmt upsertLabelsStmt *sql.Stmt deleteLabelsStmt *sql.Stmt + + resourceVersionCache *resourceVersionCache } var ( @@ -94,14 +98,17 @@ func NewListOptionIndexer(fields [][]string, s Store, namespaced bool) (*ListOpt } l := &ListOptionIndexer{ - Indexer: i, - namespaced: namespaced, - indexedFields: indexedFields, + Indexer: i, + namespaced: namespaced, + indexedFields: indexedFields, + resourceVersionCache: newResourceVersionCache(1000), } l.RegisterAfterUpsert(l.addIndexFields) l.RegisterAfterUpsert(l.addLabels) + l.RegisterAfterUpsert(l.updateResourceVersionCache) l.RegisterAfterDelete(l.deleteIndexFields) l.RegisterAfterDelete(l.deleteLabels) + l.RegisterAfterDelete(l.updateResourceVersionCache) columnDefs := make([]string, len(indexedFields)) for index, field := range indexedFields { column := fmt.Sprintf(`"%s" TEXT`, field) @@ -216,6 +223,15 @@ func (l *ListOptionIndexer) addIndexFields(key string, obj any, tx db.TXClient) return nil } +func (l *ListOptionIndexer) updateResourceVersionCache(key string, obj any, tx db.TXClient) error { + objMeta, err := meta.Accessor(obj) + if err != nil { + return err + } + l.resourceVersionCache.add(objMeta.GetResourceVersion()) + return nil +} + // labels are stored in tables that shadow the underlying object table for each GVK func (l *ListOptionIndexer) addLabels(key string, obj any, tx db.TXClient) error { k8sObj, ok := obj.(*unstructured.Unstructured) @@ -232,7 +248,7 @@ func (l *ListOptionIndexer) addLabels(key string, obj any, tx db.TXClient) error return nil } -func (l *ListOptionIndexer) deleteIndexFields(key string, tx db.TXClient) error { +func (l *ListOptionIndexer) deleteIndexFields(key string, _ any, tx db.TXClient) error { args := []any{key} err := tx.StmtExec(tx.Stmt(l.deleteFieldStmt), args...) @@ -242,7 +258,7 @@ func (l *ListOptionIndexer) deleteIndexFields(key string, tx db.TXClient) error return nil } -func (l *ListOptionIndexer) deleteLabels(key string, tx db.TXClient) error { +func (l *ListOptionIndexer) deleteLabels(key string, _ any, tx db.TXClient) error { err := tx.StmtExec(tx.Stmt(l.deleteLabelsStmt), key) if err != nil { return &db.QueryError{QueryString: l.deleteLabelsQuery, Err: err} @@ -250,6 +266,10 @@ func (l *ListOptionIndexer) deleteLabels(key string, tx db.TXClient) error { return nil } +func (l *ListOptionIndexer) GetLastResourceVersion() string { + return l.resourceVersionCache.getLatest() +} + // ListByOptions returns objects according to the specified list options and partitions. // Specifically: // - an unstructured list of resources belonging to any of the specified partitions @@ -257,6 +277,12 @@ func (l *ListOptionIndexer) deleteLabels(key string, tx db.TXClient) error { // - a continue token, if there are more pages after the returned one // - an error instead of all of the above if anything went wrong func (l *ListOptionIndexer) ListByOptions(ctx context.Context, lo ListOptions, partitions []partition.Partition, namespace string) (*unstructured.UnstructuredList, int, string, error) { + isMaybeStale := lo.Revision != "" && !l.resourceVersionCache.contains(lo.Revision) + if isMaybeStale { + // TODO: The meat of the logic would be here + return nil, 0, "", fmt.Errorf("might be stale, try again maybe") + } + queryInfo, err := l.constructQuery(lo, partitions, namespace, db.Sanitize(l.GetName())) if err != nil { return nil, 0, "", err @@ -851,3 +877,50 @@ func toUnstructuredList(items []any) *unstructured.UnstructuredList { } return result } + +type resourceVersionCache struct { + lock sync.RWMutex + resourceVersions []string + items map[string]struct{} + nextIndex int +} + +func newResourceVersionCache(size int) *resourceVersionCache { + return &resourceVersionCache{ + resourceVersions: make([]string, size), + nextIndex: 0, + items: make(map[string]struct{}), + } +} + +func (c *resourceVersionCache) add(resourceVersion string) { + c.lock.Lock() + defer c.lock.Unlock() + + oldResourceVersion := c.resourceVersions[c.nextIndex] + delete(c.items, oldResourceVersion) + + c.resourceVersions[c.nextIndex] = resourceVersion + c.items[resourceVersion] = struct{}{} + c.nextIndex = (c.nextIndex + 1) % len(c.resourceVersions) +} + +func (c *resourceVersionCache) contains(target string) bool { + c.lock.RLock() + defer c.lock.RUnlock() + + _, found := c.items[target] + return found +} + +func (c *resourceVersionCache) getLatest() string { + c.lock.RLock() + defer c.lock.RUnlock() + + index := c.nextIndex - 1 + if index < 0 { + index = len(c.resourceVersions) - 1 + } + + return c.resourceVersions[index] +} diff --git a/pkg/sqlcache/informer/listoptions.go b/pkg/sqlcache/informer/listoptions.go index 71bf6f6e..6c79a550 100644 --- a/pkg/sqlcache/informer/listoptions.go +++ b/pkg/sqlcache/informer/listoptions.go @@ -30,6 +30,7 @@ type ListOptions struct { Filters []OrFilter Sort Sort Pagination Pagination + Revision string } // Filter represents a field to filter by. diff --git a/pkg/sqlcache/informer/watcher.go b/pkg/sqlcache/informer/watcher.go new file mode 100644 index 00000000..375e51fa --- /dev/null +++ b/pkg/sqlcache/informer/watcher.go @@ -0,0 +1,53 @@ +package informer + +import "sync" + +type listeners struct { + lock sync.Mutex + listeners map[Listener]struct{} + // count is incremented everytime Notify is called + count int +} + +func newlisteners() *listeners { + return &listeners{ + listeners: make(map[Listener]struct{}), + } +} + +func (w *listeners) Notify(revision string) { + w.lock.Lock() + defer w.lock.Unlock() + + w.count += 1 + + for listener := range w.listeners { + listener.Notify(revision) + } +} + +func (w *listeners) AddListener(listener Listener) int { + w.lock.Lock() + defer w.lock.Unlock() + + w.listeners[listener] = struct{}{} + return w.count +} + +func (w *listeners) RemoveListener(listener Listener) { + w.lock.Lock() + defer w.lock.Unlock() + + delete(w.listeners, listener) +} + +func (w *listeners) Count() int { + w.lock.Lock() + defer w.lock.Unlock() + + return w.count +} + +type Listener interface { + Notify(revision string) +} diff --git a/pkg/sqlcache/store/store.go b/pkg/sqlcache/store/store.go index a228ee86..313bee0a 100644 --- a/pkg/sqlcache/store/store.go +++ b/pkg/sqlcache/store/store.go @@ -54,7 +54,7 @@ type Store struct { listKeysStmt *sql.Stmt afterUpsert []func(key string, obj any, tx db.TXClient) error - afterDelete []func(key string, tx db.TXClient) error + afterDelete []func(key string, obj any, tx db.TXClient) error } // Test that Store implements cache.Indexer @@ -80,7 +80,7 @@ func NewStore(example any, keyFunc cache.KeyFunc, c DBClient, shouldEncrypt bool keyFunc: keyFunc, shouldEncrypt: shouldEncrypt, afterUpsert: []func(key string, obj any, tx db.TXClient) error{}, - afterDelete: []func(key string, tx db.TXClient) error{}, + afterDelete: []func(key string, obj any, tx db.TXClient) error{}, } // once multiple informerfactories are needed, this can accept the case where table already exists error is received @@ -137,7 +137,7 @@ func (s *Store) upsert(key string, obj any) error { } // deleteByKey deletes the object associated with key, if it exists in this Store -func (s *Store) deleteByKey(key string) error { +func (s *Store) deleteByKey(key string, obj any) error { tx, err := s.BeginTx(context.Background(), true) if err != nil { return err @@ -148,7 +148,7 @@ func (s *Store) deleteByKey(key string) error { return &db.QueryError{QueryString: s.deleteQuery, Err: err} } - err = s.runAfterDelete(key, tx) + err = s.runAfterDelete(key, obj, tx) if err != nil { return err } @@ -202,7 +202,7 @@ func (s *Store) Delete(obj any) error { if err != nil { return err } - err = s.deleteByKey(key) + err = s.deleteByKey(key, obj) if err != nil { log.Errorf("Error in Store.Delete for type %v: %v", s.name, err) return err @@ -288,7 +288,7 @@ func (s *Store) replaceByKey(objects map[string]any) error { if err != nil { return err } - err = s.runAfterDelete(key, txC) + err = s.runAfterDelete(key, nil, txC) if err != nil { return err } @@ -345,15 +345,15 @@ func (s *Store) runAfterUpsert(key string, obj any, txC db.TXClient) error { } // RegisterAfterDelete registers a func to be called after each deletion -func (s *Store) RegisterAfterDelete(f func(key string, txC db.TXClient) error) { +func (s *Store) RegisterAfterDelete(f func(key string, obj any, txC db.TXClient) error) { s.afterDelete = append(s.afterDelete, f) } // keep // runAfterDelete executes functions registered to run after upsert -func (s *Store) runAfterDelete(key string, txC db.TXClient) error { +func (s *Store) runAfterDelete(key string, obj any, txC db.TXClient) error { for _, f := range s.afterDelete { - err := f(key, txC) + err := f(key, obj, txC) if err != nil { return err } diff --git a/pkg/stores/proxy/proxy_store.go b/pkg/stores/proxy/proxy_store.go index 7faf6aee..fbb818a6 100644 --- a/pkg/stores/proxy/proxy_store.go +++ b/pkg/stores/proxy/proxy_store.go @@ -9,7 +9,6 @@ import ( "io/ioutil" "net/http" "os" - "regexp" "strconv" "github.com/pkg/errors" @@ -46,7 +45,6 @@ const ( ) var ( - lowerChars = regexp.MustCompile("[a-z]+") paramScheme = runtime.NewScheme() paramCodec = runtime.NewParameterCodec(paramScheme) ) diff --git a/pkg/stores/sqlpartition/listprocessor/processor.go b/pkg/stores/sqlpartition/listprocessor/processor.go index ee2f49b3..50a1e304 100644 --- a/pkg/stores/sqlpartition/listprocessor/processor.go +++ b/pkg/stores/sqlpartition/listprocessor/processor.go @@ -12,8 +12,10 @@ import ( "github.com/rancher/apiserver/pkg/types" "github.com/rancher/steve/pkg/sqlcache/informer" "github.com/rancher/steve/pkg/sqlcache/partition" + "github.com/rancher/steve/pkg/stores/sqlpartition/queryparser" "github.com/rancher/wrangler/v3/pkg/schemas/validation" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/selection" ) const ( @@ -33,6 +35,7 @@ const ( ) var opReg = regexp.MustCompile(`[!]?=`) +var labelsRegex = regexp.MustCompile(`^(metadata)\.(labels)\[(.+)\]$`) // ListOptions represents the query parameters that may be included in a list request. type ListOptions struct { @@ -53,6 +56,71 @@ type Cache interface { ListByOptions(ctx context.Context, lo informer.ListOptions, partitions []partition.Partition, namespace string) (*unstructured.UnstructuredList, int, string, error) } +func k8sOpToRancherOp(k8sOp selection.Operator) (informer.Op, error) { + h := map[selection.Operator]informer.Op{ + selection.Equals: informer.Eq, + selection.DoubleEquals: informer.Eq, + selection.NotEquals: informer.NotEq, + selection.In: informer.In, + selection.NotIn: informer.NotIn, + selection.Exists: informer.Exists, + selection.DoesNotExist: informer.NotExists, + selection.LessThan: informer.Lt, + selection.GreaterThan: informer.Gt, + } + v, ok := h[k8sOp] + if ok { + return v, nil + } + return "", fmt.Errorf("unknown k8sOp: %s", k8sOp) +} + +// Determine if the value field is surrounded by a pair of single- or double-quotes +// This is a difference we implement from the kubernetes CLI: if the target value of a (not) equal +// test is single-quoted, we use the full string. Otherwise we do a substring match +// (which is implemented as 'SELECT ... some-field ... LIKE "%VALUE%" ...' in the query) +// +// The caller also needs to know if it should strip delimiting quotes, so this returns two bools +func isQuotedStringTarget(values []string) (isQuoted bool, isSingleQuoted bool) { + if len(values) != 1 || len(values[0]) == 0 { + return false, false + } + s1 := values[0][0:1] + if !strings.Contains(`"'`, s1) { + return false, false + } + if !strings.HasSuffix(values[0], s1) { + return false, false + } + return true, s1[0] == '\'' +} + +// k8sRequirementToOrFilter - convert one k8s Requirement to a list of Filter's: + +func k8sRequirementToOrFilter(requirement queryparser.Requirement) (informer.Filter, error) { + values := requirement.Values() + queryFields := splitQuery(requirement.Key()) + op, err := k8sOpToRancherOp(requirement.Operator()) + if err != nil { + return informer.Filter{}, err + } + usePartialMatch := true + isQuoted, isSingleQuoted := isQuotedStringTarget(values) + if isQuoted { + // Strip off the quotes + values[0] = values[0][1 : len(values[0])-1] + if isSingleQuoted { + usePartialMatch = false + } + } + return informer.Filter{ + Field: queryFields, + Matches: values, + Op: op, + Partial: usePartialMatch, + }, nil +} + // ParseQuery parses the query params of a request and returns a ListOptions. func ParseQuery(apiOp *types.APIRequest, namespaceCache Cache) (informer.ListOptions, error) { opts := informer.ListOptions{} @@ -62,24 +130,22 @@ func ParseQuery(apiOp *types.APIRequest, namespaceCache Cache) (informer.ListOpt q := apiOp.Request.URL.Query() cont := q.Get(continueParam) opts.Resume = cont + opts.Revision = q.Get(revisionParam) filterParams := q[filterParam] filterOpts := []informer.OrFilter{} for _, filters := range filterParams { - orFilters := strings.Split(filters, orOp) + requirements, err := queryparser.ParseToRequirements(filters) + if err != nil { + return informer.ListOptions{}, err + } orFilter := informer.OrFilter{} - for _, filter := range orFilters { - var op informer.Op - if strings.Contains(filter, "!=") { - op = "!=" + for _, requirement := range requirements { + filter, err := k8sRequirementToOrFilter(requirement) + if err != nil { + return opts, err } - filter := opReg.Split(filter, -1) - if len(filter) != 2 { - continue - } - usePartialMatch := !(strings.HasPrefix(filter[1], `'`) && strings.HasSuffix(filter[1], `'`)) - value := strings.TrimSuffix(strings.TrimPrefix(filter[1], "'"), "'") - orFilter.Filters = append(orFilter.Filters, informer.Filter{Field: strings.Split(filter[0], "."), Matches: []string{value}, Op: op, Partial: usePartialMatch}) + orFilter.Filters = append(orFilter.Filters, filter) } filterOpts = append(filterOpts, orFilter) } @@ -90,20 +156,24 @@ func ParseQuery(apiOp *types.APIRequest, namespaceCache Cache) (informer.ListOpt if sortKeys != "" { sortParts := strings.SplitN(sortKeys, ",", 2) primaryField := sortParts[0] - if primaryField != "" && primaryField[0] == '-' { - sortOpts.Orders = append(sortOpts.Orders, informer.DESC) - primaryField = primaryField[1:] - } if primaryField != "" { + if primaryField[0] == '-' { + sortOpts.Orders = append(sortOpts.Orders, informer.DESC) + primaryField = primaryField[1:] + } else { + sortOpts.Orders = append(sortOpts.Orders, informer.ASC) + } sortOpts.Fields = append(sortOpts.Fields, strings.Split(primaryField, ".")) } if len(sortParts) > 1 { secondaryField := sortParts[1] - if secondaryField != "" && secondaryField[0] == '-' { - sortOpts.Orders = append(sortOpts.Orders, informer.DESC) - secondaryField = secondaryField[1:] - } if secondaryField != "" { + if secondaryField[0] == '-' { + sortOpts.Orders = append(sortOpts.Orders, informer.DESC) + secondaryField = secondaryField[1:] + } else { + sortOpts.Orders = append(sortOpts.Orders, informer.ASC) + } sortOpts.Fields = append(sortOpts.Fields, strings.Split(secondaryField, ".")) } } @@ -122,7 +192,7 @@ func ParseQuery(apiOp *types.APIRequest, namespaceCache Cache) (informer.ListOpt } opts.Pagination = pagination - var op informer.Op + op := informer.Eq projectsOrNamespaces := q.Get(projectsOrNamespacesVar) if projectsOrNamespaces == "" { projectsOrNamespaces = q.Get(projectsOrNamespacesVar + notOp) @@ -136,7 +206,7 @@ func ParseQuery(apiOp *types.APIRequest, namespaceCache Cache) (informer.ListOpt return opts, err } if projOrNSFilters == nil { - return opts, apierror.NewAPIError(validation.NotFound, fmt.Sprintf("could not find any namespacess named [%s] or namespaces belonging to project named [%s]", projectsOrNamespaces, projectsOrNamespaces)) + return opts, apierror.NewAPIError(validation.NotFound, fmt.Sprintf("could not find any namespaces named [%s] or namespaces belonging to project named [%s]", projectsOrNamespaces, projectsOrNamespaces)) } if op == informer.NotEq { for _, filter := range projOrNSFilters { @@ -162,6 +232,21 @@ func getLimit(apiOp *types.APIRequest) int { return limit } +// splitQuery takes a single-string metadata-labels filter and converts it into an array of 3 accessor strings, +// where the first two strings are always "metadata" and "labels", and the third is the label name. +// This is more complex than doing something like `strings.Split(".", "metadata.labels.fieldName") +// because the fieldName can be more complex - in particular it can contain "."s) and needs to be +// bracketed, as in `metadata.labels[rancher.io/cattle.and.beef]". +// The `labelsRegex` looks for the bracketed form. +func splitQuery(query string) []string { + m := labelsRegex.FindStringSubmatch(query) + if m != nil && len(m) == 4 { + // m[0] contains the entire string, so just return all but that first item in `m` + return m[1:] + } + return strings.Split(query, ".") +} + func parseNamespaceOrProjectFilters(ctx context.Context, projOrNS string, op informer.Op, namespaceInformer Cache) ([]informer.Filter, error) { var filters []informer.Filter for _, pn := range strings.Split(projOrNS, ",") { diff --git a/pkg/stores/sqlpartition/listprocessor/processor_test.go b/pkg/stores/sqlpartition/listprocessor/processor_test.go index 0aa3207d..301368bd 100644 --- a/pkg/stores/sqlpartition/listprocessor/processor_test.go +++ b/pkg/stores/sqlpartition/listprocessor/processor_test.go @@ -25,6 +25,7 @@ func TestParseQuery(t *testing.T) { req *types.APIRequest expectedLO informer.ListOptions errExpected bool + errorText string } var tests []testCase tests = append(tests, testCase{ @@ -41,9 +42,6 @@ func TestParseQuery(t *testing.T) { Page: 1, }, }, - setupNSCache: func() Cache { - return nil - }, }) tests = append(tests, testCase{ description: "ParseQuery() with no errors returned should returned no errors. If projectsornamespaces is not empty" + @@ -61,7 +59,7 @@ func TestParseQuery(t *testing.T) { { Field: []string{"metadata", "namespace"}, Matches: []string{"ns1"}, - Op: "", + Op: informer.Eq, Partial: false, }, }, @@ -121,7 +119,7 @@ func TestParseQuery(t *testing.T) { { Field: []string{"metadata", "namespace"}, Matches: []string{"ns1"}, - Op: "", + Op: informer.Eq, Partial: false, }, }, @@ -171,7 +169,7 @@ func TestParseQuery(t *testing.T) { { Field: []string{"metadata", "namespace"}, Matches: []string{"ns1"}, - Op: "", + Op: informer.Eq, Partial: false, }, }, @@ -223,7 +221,7 @@ func TestParseQuery(t *testing.T) { { Field: []string{"a"}, Matches: []string{"c"}, - Op: "", + Op: informer.Eq, Partial: true, }, }, @@ -233,9 +231,6 @@ func TestParseQuery(t *testing.T) { Page: 1, }, }, - setupNSCache: func() Cache { - return nil - }, }) tests = append(tests, testCase{ description: "ParseQuery() with filter param set, with value in single quotes, should include filter with partial set to false in list options.", @@ -252,7 +247,7 @@ func TestParseQuery(t *testing.T) { { Field: []string{"a"}, Matches: []string{"c"}, - Op: "", + Op: informer.Eq, Partial: false, }, }, @@ -262,8 +257,57 @@ func TestParseQuery(t *testing.T) { Page: 1, }, }, - setupNSCache: func() Cache { - return nil + }) + tests = append(tests, testCase{ + description: "ParseQuery() with filter param set, with value in double quotes, should include filter with partial set to true in list options.", + req: &types.APIRequest{ + Request: &http.Request{ + URL: &url.URL{RawQuery: `filter=a1="c1"`}, + }, + }, + expectedLO: informer.ListOptions{ + ChunkSize: defaultLimit, + Filters: []informer.OrFilter{ + { + Filters: []informer.Filter{ + { + Field: []string{"a1"}, + Matches: []string{"c1"}, + Op: informer.Eq, + Partial: true, + }, + }, + }, + }, + Pagination: informer.Pagination{ + Page: 1, + }, + }, + }) + tests = append(tests, testCase{ + description: "ParseQuery() with a labels filter param should create a labels-specific filter.", + req: &types.APIRequest{ + Request: &http.Request{ + URL: &url.URL{RawQuery: "filter=metadata.labels[grover.example.com/fish]=heads"}, + }, + }, + expectedLO: informer.ListOptions{ + ChunkSize: defaultLimit, + Filters: []informer.OrFilter{ + { + Filters: []informer.Filter{ + { + Field: []string{"metadata", "labels", "grover.example.com/fish"}, + Matches: []string{"heads"}, + Op: informer.Eq, + Partial: true, + }, + }, + }, + }, + Pagination: informer.Pagination{ + Page: 1, + }, }, }) tests = append(tests, testCase{ @@ -281,7 +325,7 @@ func TestParseQuery(t *testing.T) { { Field: []string{"a"}, Matches: []string{"c"}, - Op: "", + Op: informer.Eq, Partial: true, }, }, @@ -291,7 +335,7 @@ func TestParseQuery(t *testing.T) { { Field: []string{"b"}, Matches: []string{"d"}, - Op: "", + Op: informer.Eq, Partial: true, }, }, @@ -301,16 +345,12 @@ func TestParseQuery(t *testing.T) { Page: 1, }, }, - setupNSCache: func() Cache { - return nil - }, }) tests = append(tests, testCase{ - description: "ParseQuery() with a filter param with a comma separate value, should include a single or filter with" + - " multiple filters.", + description: "ParseQuery() with multiple filter params, should include multiple or filters.", req: &types.APIRequest{ Request: &http.Request{ - URL: &url.URL{RawQuery: "filter=a=c,b=d"}, + URL: &url.URL{RawQuery: "filter=a=c&filter=b=d"}, }, }, expectedLO: informer.ListOptions{ @@ -321,13 +361,17 @@ func TestParseQuery(t *testing.T) { { Field: []string{"a"}, Matches: []string{"c"}, - Op: "", + Op: informer.Eq, Partial: true, }, + }, + }, + { + Filters: []informer.Filter{ { Field: []string{"b"}, Matches: []string{"d"}, - Op: "", + Op: informer.Eq, Partial: true, }, }, @@ -337,8 +381,297 @@ func TestParseQuery(t *testing.T) { Page: 1, }, }, - setupNSCache: func() Cache { - return nil + }) + tests = append(tests, testCase{ + description: "ParseQuery() should handle comma-separated standard and labels filters.", + req: &types.APIRequest{ + Request: &http.Request{ + URL: &url.URL{RawQuery: "filter=beer='pabst',metadata.labels[beer2.io/ale]='schlitz'"}, + }, + }, + expectedLO: informer.ListOptions{ + ChunkSize: defaultLimit, + Filters: []informer.OrFilter{ + { + Filters: []informer.Filter{ + { + Field: []string{"beer"}, + Matches: []string{"pabst"}, + Op: informer.Eq, + Partial: false, + }, + { + Field: []string{"metadata", "labels", "beer2.io/ale"}, + Matches: []string{"schlitz"}, + Op: informer.Eq, + Partial: false, + }, + }, + }, + }, + Pagination: informer.Pagination{ + Page: 1, + }, + }, + }) + tests = append(tests, testCase{ + description: "ParseQuery() should handle simple dot-separated label filters.", + req: &types.APIRequest{ + Request: &http.Request{ + URL: &url.URL{RawQuery: "filter=beer='natty-bo',metadata.labels.beer3=rainier"}, + }, + }, + expectedLO: informer.ListOptions{ + ChunkSize: defaultLimit, + Filters: []informer.OrFilter{ + { + Filters: []informer.Filter{ + { + Field: []string{"beer"}, + Matches: []string{"natty-bo"}, + Op: informer.Eq, + Partial: false, + }, + { + Field: []string{"metadata", "labels", "beer3"}, + Matches: []string{"rainier"}, + Op: informer.Eq, + Partial: true, + }, + }, + }, + }, + Pagination: informer.Pagination{ + Page: 1, + }, + }, + }) + tests = append(tests, testCase{ + description: "ParseQuery() should handle 'in' and 'IN' with one arg", + req: &types.APIRequest{ + Request: &http.Request{ + URL: &url.URL{RawQuery: "filter=a1In in (x1),a2In IN (x2)"}, + }, + }, + expectedLO: informer.ListOptions{ + ChunkSize: defaultLimit, + Filters: []informer.OrFilter{ + { + Filters: []informer.Filter{ + { + Field: []string{"a1In"}, + Matches: []string{"x1"}, + Op: informer.In, + Partial: true, + }, + { + Field: []string{"a2In"}, + Matches: []string{"x2"}, + Op: informer.In, + Partial: true, + }, + }, + }, + }, + Pagination: informer.Pagination{ + Page: 1, + }, + }, + }) + tests = append(tests, testCase{ + description: "ParseQuery() should handle 'in' with multiple args", + req: &types.APIRequest{ + Request: &http.Request{ + URL: &url.URL{RawQuery: "filter=a2In in (x2a, x2b)"}, + }, + }, + expectedLO: informer.ListOptions{ + ChunkSize: defaultLimit, + Filters: []informer.OrFilter{ + { + Filters: []informer.Filter{ + { + Field: []string{"a2In"}, + Matches: []string{"x2a", "x2b"}, + Op: informer.In, + Partial: true, + }, + }, + }, + }, + Pagination: informer.Pagination{ + Page: 1, + }, + }, + }) + tests = append(tests, testCase{ + description: "ParseQuery() should handle 'notin' and 'NOTIN' with one arg", + req: &types.APIRequest{ + Request: &http.Request{ + URL: &url.URL{RawQuery: "filter=a1NotIn notin (x1),a2NotIn NOTIN (x2)"}, + }, + }, + expectedLO: informer.ListOptions{ + ChunkSize: defaultLimit, + Filters: []informer.OrFilter{ + { + Filters: []informer.Filter{ + { + Field: []string{"a1NotIn"}, + Matches: []string{"x1"}, + Op: informer.NotIn, + Partial: true, + }, + { + Field: []string{"a2NotIn"}, + Matches: []string{"x2"}, + Op: informer.NotIn, + Partial: true, + }, + }, + }, + }, + Pagination: informer.Pagination{ + Page: 1, + }, + }, + }) + tests = append(tests, testCase{ + description: "ParseQuery() should handle 'in' with multiple args", + req: &types.APIRequest{ + Request: &http.Request{ + URL: &url.URL{RawQuery: "filter=a3NotIn in (x3a, x3b)"}, + }, + }, + expectedLO: informer.ListOptions{ + ChunkSize: defaultLimit, + Filters: []informer.OrFilter{ + { + Filters: []informer.Filter{ + { + Field: []string{"a3NotIn"}, + Matches: []string{"x3a", "x3b"}, + Op: informer.In, + Partial: true, + }, + }, + }, + }, + Pagination: informer.Pagination{ + Page: 1, + }, + }, + }) + tests = append(tests, testCase{ + description: "ParseQuery() should handle 'in' and 'notin' in mixed case", + req: &types.APIRequest{ + Request: &http.Request{ + URL: &url.URL{RawQuery: "filter=a4In iN (x4a),a4NotIn nOtIn (x4b)"}, + }, + }, + expectedLO: informer.ListOptions{ + ChunkSize: defaultLimit, + Filters: []informer.OrFilter{ + { + Filters: []informer.Filter{ + { + Field: []string{"a4In"}, + Matches: []string{"x4a"}, + Op: informer.In, + Partial: true, + }, + { + Field: []string{"a4NotIn"}, + Matches: []string{"x4b"}, + Op: informer.NotIn, + Partial: true, + }, + }, + }, + }, + Pagination: informer.Pagination{ + Page: 1, + }, + }, + }) + tests = append(tests, testCase{ + description: "ParseQuery() should complain on non-label exists tests", + req: &types.APIRequest{ + Request: &http.Request{ + URL: &url.URL{RawQuery: "filter=a5In1,!a5In2, ! a5In3"}, + }, + }, + errExpected: true, + errorText: "unable to parse requirement: existence tests are valid only for labels; not valid for field 'a5In1'", + }) + tests = append(tests, testCase{ + description: "ParseQuery() should allow label exists tests", + req: &types.APIRequest{ + Request: &http.Request{ + URL: &url.URL{RawQuery: "filter=metadata.labels.a5In1,!metadata.labels.a5In2, ! metadata.labels.a5In3"}, + }, + }, + expectedLO: informer.ListOptions{ + ChunkSize: defaultLimit, + Filters: []informer.OrFilter{ + { + Filters: []informer.Filter{ + { + Field: []string{"metadata", "labels", "a5In1"}, + Op: informer.Exists, + Matches: []string{}, + Partial: true, + }, + { + Field: []string{"metadata", "labels", "a5In2"}, + Op: informer.NotExists, + Matches: []string{}, + Partial: true, + }, + { + Field: []string{"metadata", "labels", "a5In3"}, + Op: informer.NotExists, + Matches: []string{}, + Partial: true, + }, + }, + }, + }, + Pagination: informer.Pagination{ + Page: 1, + }, + }, + }) + tests = append(tests, testCase{ + description: "ParseQuery() should handle numeric comparisons", + req: &types.APIRequest{ + Request: &http.Request{ + URL: &url.URL{RawQuery: "filter=a<1,b>2"}, + }, + }, + expectedLO: informer.ListOptions{ + ChunkSize: defaultLimit, + Filters: []informer.OrFilter{ + { + Filters: []informer.Filter{ + { + Field: []string{"a"}, + Op: informer.Lt, + Matches: []string{"1"}, + Partial: true, + }, + { + Field: []string{"b"}, + Op: informer.Gt, + Matches: []string{"2"}, + Partial: true, + }, + }, + }, + }, + Pagination: informer.Pagination{ + Page: 1, + }, }, }) tests = append(tests, testCase{ @@ -361,9 +694,6 @@ func TestParseQuery(t *testing.T) { Page: 1, }, }, - setupNSCache: func() Cache { - return nil - }, }) tests = append(tests, testCase{ description: "ParseQuery() with no errors returned should returned no errors. If one sort param is given primary field " + @@ -384,9 +714,6 @@ func TestParseQuery(t *testing.T) { Page: 1, }, }, - setupNSCache: func() Cache { - return nil - }, }) tests = append(tests, testCase{ description: "ParseQuery() with no errors returned should returned no errors. If two sort params are given, sort " + @@ -412,9 +739,6 @@ func TestParseQuery(t *testing.T) { Page: 1, }, }, - setupNSCache: func() Cache { - return nil - }, }) tests = append(tests, testCase{ description: "ParseQuery() with no errors returned should returned no errors. If continue params is given, resume" + @@ -432,9 +756,6 @@ func TestParseQuery(t *testing.T) { Page: 1, }, }, - setupNSCache: func() Cache { - return nil - }, }) tests = append(tests, testCase{ description: "ParseQuery() with no errors returned should returned no errors. If continue param is given, resume" + @@ -452,9 +773,6 @@ func TestParseQuery(t *testing.T) { Page: 1, }, }, - setupNSCache: func() Cache { - return nil - }, }) tests = append(tests, testCase{ description: "ParseQuery() with no errors returned should returned no errors. If limit param is given, chunksize" + @@ -471,9 +789,6 @@ func TestParseQuery(t *testing.T) { Page: 1, }, }, - setupNSCache: func() Cache { - return nil - }, }) tests = append(tests, testCase{ description: "ParseQuery() with no errors returned should returned no errors. If page param is given, page" + @@ -490,9 +805,6 @@ func TestParseQuery(t *testing.T) { Page: 3, }, }, - setupNSCache: func() Cache { - return nil - }, }) tests = append(tests, testCase{ description: "ParseQuery() with no errors returned should returned no errors. If pagesize param is given, pageSize" + @@ -510,18 +822,24 @@ func TestParseQuery(t *testing.T) { Page: 1, }, }, - setupNSCache: func() Cache { - return nil - }, }) t.Parallel() for _, test := range tests { t.Run(test.description, func(t *testing.T) { - test.nsc = test.setupNSCache() + if test.setupNSCache == nil { + test.nsc = nil + } else { + test.nsc = test.setupNSCache() + } lo, err := ParseQuery(test.req, test.nsc) if test.errExpected { assert.NotNil(t, err) + if test.errorText != "" { + assert.Contains(t, test.errorText, err.Error()) + } return + } else { + assert.Nil(t, err) } assert.Equal(t, test.expectedLO, lo) }) diff --git a/pkg/stores/sqlpartition/partition_mocks_test.go b/pkg/stores/sqlpartition/partition_mocks_test.go index 687b9006..8de44515 100644 --- a/pkg/stores/sqlpartition/partition_mocks_test.go +++ b/pkg/stores/sqlpartition/partition_mocks_test.go @@ -16,7 +16,6 @@ import ( partition "github.com/rancher/steve/pkg/sqlcache/partition" gomock "go.uber.org/mock/gomock" unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - watch "k8s.io/apimachinery/pkg/watch" ) // MockPartitioner is a mock of Partitioner interface. @@ -143,10 +142,10 @@ func (mr *MockUnstructuredStoreMockRecorder) Delete(arg0, arg1, arg2 any) *gomoc } // ListByPartitions mocks base method. -func (m *MockUnstructuredStore) ListByPartitions(arg0 *types.APIRequest, arg1 *types.APISchema, arg2 []partition.Partition) ([]unstructured.Unstructured, int, string, error) { +func (m *MockUnstructuredStore) ListByPartitions(arg0 *types.APIRequest, arg1 *types.APISchema, arg2 []partition.Partition) (*unstructured.UnstructuredList, int, string, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListByPartitions", arg0, arg1, arg2) - ret0, _ := ret[0].([]unstructured.Unstructured) + ret0, _ := ret[0].(*unstructured.UnstructuredList) ret1, _ := ret[1].(int) ret2, _ := ret[2].(string) ret3, _ := ret[3].(error) @@ -176,10 +175,10 @@ func (mr *MockUnstructuredStoreMockRecorder) Update(arg0, arg1, arg2, arg3 any) } // WatchByPartitions mocks base method. -func (m *MockUnstructuredStore) WatchByPartitions(arg0 *types.APIRequest, arg1 *types.APISchema, arg2 types.WatchRequest, arg3 []partition.Partition) (chan watch.Event, error) { +func (m *MockUnstructuredStore) WatchByPartitions(arg0 *types.APIRequest, arg1 *types.APISchema, arg2 types.WatchRequest, arg3 []partition.Partition) (chan struct{}, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "WatchByPartitions", arg0, arg1, arg2, arg3) - ret0, _ := ret[0].(chan watch.Event) + ret0, _ := ret[0].(chan struct{}) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/pkg/stores/sqlpartition/partitioner.go b/pkg/stores/sqlpartition/partitioner.go index b3b74f9b..f546f899 100644 --- a/pkg/stores/sqlpartition/partitioner.go +++ b/pkg/stores/sqlpartition/partitioner.go @@ -12,7 +12,6 @@ import ( "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/watch" ) var ( @@ -29,8 +28,8 @@ type UnstructuredStore interface { Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, []types.Warning, error) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, []types.Warning, error) - ListByPartitions(apiOp *types.APIRequest, schema *types.APISchema, partitions []partition.Partition) ([]unstructured.Unstructured, int, string, error) - WatchByPartitions(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest, partitions []partition.Partition) (chan watch.Event, error) + ListByPartitions(apiOp *types.APIRequest, schema *types.APISchema, partitions []partition.Partition) (*unstructured.UnstructuredList, int, string, error) + WatchByPartitions(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest, partitions []partition.Partition) (chan string, error) } // rbacPartitioner is an implementation of the sqlpartition.Partitioner interface. diff --git a/pkg/stores/sqlpartition/queryparser/labels.go b/pkg/stores/sqlpartition/queryparser/labels.go new file mode 100644 index 00000000..0d80a8c3 --- /dev/null +++ b/pkg/stores/sqlpartition/queryparser/labels.go @@ -0,0 +1,166 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +This file is derived from +https://github.com/kubernetes/apimachinery/blob/90df4d1d2d40ea9b3a522bec6e3577237358de00/pkg/labels/labels.go + - FormatLabels was dropped + - validateLabelKey calls from ConvertSelectorToLabelsMap were dropped +*/ + +package queryparser + +import ( + "fmt" + "sort" + "strings" + + "k8s.io/apimachinery/pkg/util/validation/field" +) + +// Labels allows you to present labels independently from their storage. +type Labels interface { + // Has returns whether the provided label exists. + Has(label string) (exists bool) + + // Get returns the value for the provided label. + Get(label string) (value string) +} + +// Set is a map of label:value. It implements Labels. +type Set map[string]string + +// String returns all labels listed as a human readable string. +// Conveniently, exactly the format that ParseSelector takes. +func (ls Set) String() string { + selector := make([]string, 0, len(ls)) + for key, value := range ls { + selector = append(selector, key+"="+value) + } + // Sort for determinism. + sort.StringSlice(selector).Sort() + return strings.Join(selector, ",") +} + +// Has returns whether the provided label exists in the map. +func (ls Set) Has(label string) bool { + _, exists := ls[label] + return exists +} + +// Get returns the value in the map for the provided label. +func (ls Set) Get(label string) string { + return ls[label] +} + +// AsSelector converts labels into a selectors. It does not +// perform any validation, which means the server will reject +// the request if the Set contains invalid values. +func (ls Set) AsSelector() Selector { + return SelectorFromSet(ls) +} + +// AsValidatedSelector converts labels into a selectors. +// The Set is validated client-side, which allows to catch errors early. +func (ls Set) AsValidatedSelector() (Selector, error) { + return ValidatedSelectorFromSet(ls) +} + +// AsSelectorPreValidated converts labels into a selector, but +// assumes that labels are already validated and thus doesn't +// perform any validation. +// According to our measurements this is significantly faster +// in codepaths that matter at high scale. +// Note: this method copies the Set; if the Set is immutable, consider wrapping it with ValidatedSetSelector +// instead, which does not copy. +func (ls Set) AsSelectorPreValidated() Selector { + return SelectorFromValidatedSet(ls) +} + +// Conflicts takes 2 maps and returns true if there a key match between +// the maps but the value doesn't match, and returns false in other cases +func Conflicts(labels1, labels2 Set) bool { + small := labels1 + big := labels2 + if len(labels2) < len(labels1) { + small = labels2 + big = labels1 + } + + for k, v := range small { + if val, match := big[k]; match { + if val != v { + return true + } + } + } + + return false +} + +// Merge combines given maps, and does not check for any conflicts +// between the maps. In case of conflicts, second map (labels2) wins +func Merge(labels1, labels2 Set) Set { + mergedMap := Set{} + + for k, v := range labels1 { + mergedMap[k] = v + } + for k, v := range labels2 { + mergedMap[k] = v + } + return mergedMap +} + +// Equals returns true if the given maps are equal +func Equals(labels1, labels2 Set) bool { + if len(labels1) != len(labels2) { + return false + } + + for k, v := range labels1 { + value, ok := labels2[k] + if !ok { + return false + } + if value != v { + return false + } + } + return true +} + +// ConvertSelectorToLabelsMap converts selector string to labels map +// and validates keys and values +func ConvertSelectorToLabelsMap(selector string, opts ...field.PathOption) (Set, error) { + labelsMap := Set{} + + if len(selector) == 0 { + return labelsMap, nil + } + + labels := strings.Split(selector, ",") + for _, label := range labels { + l := strings.Split(label, "=") + if len(l) != 2 { + return labelsMap, fmt.Errorf("invalid selector: %s", l) + } + key := strings.TrimSpace(l[0]) + value := strings.TrimSpace(l[1]) + labelsMap[key] = value + } + return labelsMap, nil +} diff --git a/pkg/stores/sqlpartition/queryparser/selector.go b/pkg/stores/sqlpartition/queryparser/selector.go new file mode 100644 index 00000000..cc8821a1 --- /dev/null +++ b/pkg/stores/sqlpartition/queryparser/selector.go @@ -0,0 +1,965 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +This file is derived from +https://github.com/kubernetes/apimachinery/blob/90df4d1d2d40ea9b3a522bec6e3577237358de00/pkg/labels/selector.go +*/ + +/** +Main changes: + +1. The upstream `selector.go` file does parsing and applying to the objects being test. +We only care about the parser, so the selection part is dropped. + +2. I dropped label value validation in the parser + +3. Multiple values are returned as an array rather than a `k8s.io/utils/sets.String` object +to avoid having to pull in that dependency as well (and it isn't needed because we convert +the array into a sql statement. So the set gives us no benefit apart from removing duplicate target values). + +4. We added the `QuotedStringToken` constant to distinguish exact matches from substring matches. +This needed the `SingleQuoteToken` and `DoubleQuoteToken` variants for the lexer. + +5. Our filter language ignores case for `in` and `notin`. These must be lower-case in kubectl filter expressions. + +6. The `Lexer.Lex` function names the return parameters in its header but has no argument-less + return statement, so I dropped the names. + +7. We allow `lt` and `gt` as aliases for `<` and `>`. +*/ + +package queryparser + +import ( + "fmt" + "regexp" + "slices" + "sort" + "strconv" + "strings" + + "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/validation/field" +) + +var ( + unaryOperators = []string{ + string(selection.Exists), string(selection.DoesNotExist), + } + binaryOperators = []string{ + string(selection.In), string(selection.NotIn), + string(selection.Equals), string(selection.DoubleEquals), string(selection.NotEquals), + string(selection.GreaterThan), string(selection.LessThan), + } + validRequirementOperators = append(binaryOperators, unaryOperators...) + labelSelectorRegex *regexp.Regexp +) + +func init() { + labelSelectorRegex = regexp.MustCompile(`^metadata.labels[[.]`) +} + +// Requirements is AND of all requirements. +type Requirements []Requirement + +func (r Requirements) String() string { + var sb strings.Builder + + for i, requirement := range r { + if i > 0 { + sb.WriteString(", ") + } + sb.WriteString(requirement.String()) + } + + return sb.String() +} + +// Selector represents a label selector. +type Selector interface { + + // String returns a human readable string that represents this selector. + String() string + + // Requirements converts this interface into Requirements to expose + // more detailed selection information. + // If there are querying parameters, it will return converted requirements and selectable=true. + // If this selector doesn't want to select anything, it will return selectable=false. + Requirements() (requirements Requirements, selectable bool) + + // Make a deep copy of the selector. + DeepCopySelector() Selector +} + +type internalSelector []Requirement + +func (s internalSelector) DeepCopy() internalSelector { + if s == nil { + return nil + } + result := make([]Requirement, len(s)) + for i := range s { + s[i].DeepCopyInto(&result[i]) + } + return result +} + +func (s internalSelector) DeepCopySelector() Selector { + return s.DeepCopy() +} + +// Requirement contains values, a key, and an operator that relates the key and values. +// The zero value of Requirement is invalid. +// Requirement implements both set based match and exact match +// Requirement should be initialized via NewRequirement constructor for creating a valid Requirement. +// +k8s:deepcopy-gen=true +type Requirement struct { + key string + operator selection.Operator + // In huge majority of cases we have at most one value here. + // It is generally faster to operate on a single-element slice + // than on a single-element map, so we have a slice here. + strValues []string +} + +// NewRequirement is the constructor for a Requirement. +// If any of these rules is violated, an error is returned: +// 1. The operator can only be In, NotIn, Equals, DoubleEquals, Gt, Lt, NotEquals, Exists, or DoesNotExist. +// 2. If the operator is In or NotIn, the values set must be non-empty. +// 3. If the operator is Equals, DoubleEquals, or NotEquals, the values set must contain one value. +// 4. If the operator is Exists or DoesNotExist, the value set must be empty. +// 5. If the operator is Gt or Lt, the values set must contain only one value, which will be interpreted as an integer. +// 6. The key is invalid due to its length, or sequence of characters. See validateLabelKey for more details. +// +// The empty string is a valid value in the input values set. +// Returned error, if not nil, is guaranteed to be an aggregated field.ErrorList +func NewRequirement(key string, op selection.Operator, vals []string, opts ...field.PathOption) (*Requirement, error) { + var allErrs field.ErrorList + path := field.ToPath(opts...) + + valuePath := path.Child("values") + switch op { + case selection.In, selection.NotIn: + if len(vals) == 0 { + allErrs = append(allErrs, field.Invalid(valuePath, vals, "for 'in', 'notin' operators, values set can't be empty")) + } + case selection.Equals, selection.DoubleEquals, selection.NotEquals: + if len(vals) != 1 { + allErrs = append(allErrs, field.Invalid(valuePath, vals, "exact-match compatibility requires one single value")) + } + case selection.Exists, selection.DoesNotExist: + if len(vals) != 0 { + allErrs = append(allErrs, field.Invalid(valuePath, vals, "values set must be empty for exists and does not exist")) + } + case selection.GreaterThan, selection.LessThan: + if len(vals) != 1 { + allErrs = append(allErrs, field.Invalid(valuePath, vals, "for 'Gt', 'Lt' operators, exactly one value is required")) + } + for i := range vals { + if _, err := strconv.ParseInt(vals[i], 10, 64); err != nil { + allErrs = append(allErrs, field.Invalid(valuePath.Index(i), vals[i], "for 'Gt', 'Lt' operators, the value must be an integer")) + } + } + default: + allErrs = append(allErrs, field.NotSupported(path.Child("operator"), op, validRequirementOperators)) + } + return &Requirement{key: key, operator: op, strValues: vals}, allErrs.ToAggregate() +} + +func (r *Requirement) hasValue(value string) bool { + for i := range r.strValues { + if r.strValues[i] == value { + return true + } + } + return false +} + +// Key returns requirement key +func (r *Requirement) Key() string { + return r.key +} + +// Operator returns requirement operator +func (r *Requirement) Operator() selection.Operator { + return r.operator +} + +// Values returns requirement values +func (r *Requirement) Values() []string { + ret := sets.String{} + for i := range r.strValues { + ret.Insert(r.strValues[i]) + } + return ret.List() +} + +// Equal checks the equality of requirement. +func (r Requirement) Equal(x Requirement) bool { + if r.key != x.key { + return false + } + if r.operator != x.operator { + return false + } + return slices.Equal(r.strValues, x.strValues) +} + +// Empty returns true if the internalSelector doesn't restrict selection space +func (s internalSelector) Empty() bool { + if s == nil { + return true + } + return len(s) == 0 +} + +// String returns a human-readable string that represents this +// Requirement. If called on an invalid Requirement, an error is +// returned. See NewRequirement for creating a valid Requirement. +func (r *Requirement) String() string { + var sb strings.Builder + sb.Grow( + // length of r.key + len(r.key) + + // length of 'r.operator' + 2 spaces for the worst case ('in' and 'notin') + len(string(r.operator)) + 2 + + // length of 'r.strValues' slice times. Heuristically 5 chars per word + +5*len(r.strValues)) + if r.operator == selection.DoesNotExist { + sb.WriteString("!") + } + sb.WriteString(r.key) + + switch r.operator { + case selection.Equals: + sb.WriteString("=") + case selection.DoubleEquals: + sb.WriteString("==") + case selection.NotEquals: + sb.WriteString("!=") + case selection.In: + sb.WriteString(" in ") + case selection.NotIn: + sb.WriteString(" notin ") + case selection.GreaterThan: + sb.WriteString(">") + case selection.LessThan: + sb.WriteString("<") + case selection.Exists, selection.DoesNotExist: + return sb.String() + } + + switch r.operator { + case selection.In, selection.NotIn: + sb.WriteString("(") + } + if len(r.strValues) == 1 { + sb.WriteString(r.strValues[0]) + } else { // only > 1 since == 0 prohibited by NewRequirement + // normalizes value order on output, without mutating the in-memory selector representation + // also avoids normalization when it is not required, and ensures we do not mutate shared data + sb.WriteString(strings.Join(safeSort(r.strValues), ",")) + } + + switch r.operator { + case selection.In, selection.NotIn: + sb.WriteString(")") + } + return sb.String() +} + +// safeSort sorts input strings without modification +func safeSort(in []string) []string { + if sort.StringsAreSorted(in) { + return in + } + out := make([]string, len(in)) + copy(out, in) + sort.Strings(out) + return out +} + +// Add adds requirements to the selector. It copies the current selector returning a new one +func (s internalSelector) Add(reqs ...Requirement) Selector { + ret := make(internalSelector, 0, len(s)+len(reqs)) + ret = append(ret, s...) + ret = append(ret, reqs...) + return ret +} + +func (s internalSelector) Requirements() (Requirements, bool) { return Requirements(s), true } + +// String returns a comma-separated string of all +// the internalSelector Requirements' human-readable strings. +func (s internalSelector) String() string { + var reqs []string + for ix := range s { + reqs = append(reqs, s[ix].String()) + } + return strings.Join(reqs, ",") +} + +// RequiresExactMatch introspects whether a given selector requires a single specific field +// to be set, and if so returns the value it requires. +func (s internalSelector) RequiresExactMatch(label string) (value string, found bool) { + for ix := range s { + if s[ix].key == label { + switch s[ix].operator { + case selection.Equals, selection.DoubleEquals, selection.In: + if len(s[ix].strValues) == 1 { + return s[ix].strValues[0], true + } + } + return "", false + } + } + return "", false +} + +// Token represents constant definition for lexer token +type Token int + +const ( + // ErrorToken represents scan error + ErrorToken Token = iota + // EndOfStringToken represents end of string + EndOfStringToken + // ClosedParToken represents close parenthesis + ClosedParToken + // CommaToken represents the comma + CommaToken + // DoesNotExistToken represents logic not + DoesNotExistToken + // DoubleEqualsToken represents double equals + DoubleEqualsToken + // EqualsToken represents equal + EqualsToken + // GreaterThanToken represents greater than + GreaterThanToken + // IdentifierToken represents identifier, e.g. keys and values + IdentifierToken + QuotedStringToken + // InToken represents in + InToken + // LessThanToken represents less than + LessThanToken + // NotEqualsToken represents not equal + NotEqualsToken + // NotInToken represents not in + NotInToken + // OpenParToken represents open parenthesis + OpenParToken + SingleQuoteToken + DoubleQuoteToken +) + +// string2token contains the mapping between lexer Token and token literal +// (except IdentifierToken, EndOfStringToken and ErrorToken since it makes no sense) +var string2token = map[string]Token{ + ")": ClosedParToken, + ",": CommaToken, + "!": DoesNotExistToken, + "==": DoubleEqualsToken, + "=": EqualsToken, + ">": GreaterThanToken, + "in": InToken, + "<": LessThanToken, + "!=": NotEqualsToken, + "notin": NotInToken, + "(": OpenParToken, + "'": SingleQuoteToken, + "\"": DoubleQuoteToken, +} + +// ScannedItem contains the Token and the literal produced by the lexer. +type ScannedItem struct { + tok Token + literal string +} + +// isWhitespace returns true if the rune is a space, tab, or newline. +func isWhitespace(ch byte) bool { + return ch == ' ' || ch == '\t' || ch == '\r' || ch == '\n' +} + +// isSpecialSymbol detects if the character ch can be an operator +func isSpecialSymbol(ch byte) bool { + switch ch { + case '=', '!', '(', ')', ',', '>', '<': + return true + } + return false +} + +func isQuoteDelimiter(ch byte) bool { + return ch == '"' || ch == '\'' +} + +// Lexer represents the Lexer struct for label selector. +// It contains necessary informationt to tokenize the input string +type Lexer struct { + // s stores the string to be tokenized + s string + // pos is the position currently tokenized + pos int +} + +// read returns the character currently lexed +// increment the position and check the buffer overflow +func (l *Lexer) read() (b byte) { + b = 0 + if l.pos < len(l.s) { + b = l.s[l.pos] + l.pos++ + } + return b +} + +// unread 'undoes' the last read character +func (l *Lexer) unread() { + l.pos-- +} + +// scanIDOrKeyword scans string to recognize literal token (for example 'in'), an identifier, or a quoted string. +func (l *Lexer) scanIDOrKeyword() (tok Token, lit string) { + var buffer []byte +IdentifierLoop: + for { + switch ch := l.read(); { + case ch == 0: + break IdentifierLoop + case isSpecialSymbol(ch) || isWhitespace(ch): + l.unread() + break IdentifierLoop + default: + buffer = append(buffer, ch) + } + } + s := string(buffer) + if val, ok := string2token[strings.ToLower(s)]; ok { // is a literal token? + return val, s + } + return IdentifierToken, s // otherwise is an identifier +} + +func (l *Lexer) scanString(delimiter byte) (tok Token, lit string) { + buffer := []byte{delimiter} + escapeNext := false +StringLoop: + for { + switch ch := l.read(); { + case ch == 0: + return ErrorToken, fmt.Sprintf("unclosed string, looking for %c, got eof in [%s]", delimiter, string(buffer)) + case escapeNext: + buffer = append(buffer, ch) + escapeNext = false + case ch == '\\': + escapeNext = true + case ch == delimiter: + buffer = append(buffer, ch) + break StringLoop + default: + buffer = append(buffer, ch) + } + } + return QuotedStringToken, string(buffer) +} + +// scanSpecialSymbol scans string starting with special symbol. +// special symbol identify non literal operators. "!=", "==", "=" +func (l *Lexer) scanSpecialSymbol() (Token, string) { + lastScannedItem := ScannedItem{} + var buffer []byte +SpecialSymbolLoop: + for { + switch ch := l.read(); { + case ch == 0: + break SpecialSymbolLoop + case isSpecialSymbol(ch): + buffer = append(buffer, ch) + if token, ok := string2token[string(buffer)]; ok { + lastScannedItem = ScannedItem{tok: token, literal: string(buffer)} + } else if lastScannedItem.tok != 0 { + l.unread() + break SpecialSymbolLoop + } + default: + l.unread() + break SpecialSymbolLoop + } + } + if lastScannedItem.tok == 0 { + return ErrorToken, fmt.Sprintf("error expected: keyword found '%s'", buffer) + } + return lastScannedItem.tok, lastScannedItem.literal +} + +// skipWhiteSpaces consumes all blank characters +// returning the first non blank character +func (l *Lexer) skipWhiteSpaces(ch byte) byte { + for { + if !isWhitespace(ch) { + return ch + } + ch = l.read() + } +} + +// Lex returns a pair of Token and the literal +// literal is meaningful only for IdentifierToken and QuotedStringToken +func (l *Lexer) Lex() (Token, string) { + switch ch := l.skipWhiteSpaces(l.read()); { + case ch == 0: + return EndOfStringToken, "" + case isSpecialSymbol(ch): + l.unread() + return l.scanSpecialSymbol() + case isQuoteDelimiter(ch): + return l.scanString(ch) + default: + l.unread() + return l.scanIDOrKeyword() + } +} + +// Parser data structure contains the label selector parser data structure +type Parser struct { + l *Lexer + scannedItems []ScannedItem + position int + path *field.Path +} + +// ParserContext represents context during parsing: +// some literal for example 'in' and 'notin' can be +// recognized as operator for example 'x in (a)' but +// it can be recognized as value for example 'value in (in)' +type ParserContext int + +const ( + // KeyAndOperator represents key and operator + KeyAndOperator ParserContext = iota + // Values represents values + Values +) + +// lookahead func returns the current token and string. No increment of current position +func (p *Parser) lookahead(context ParserContext) (Token, string) { + tok, lit := p.scannedItems[p.position].tok, p.scannedItems[p.position].literal + if context == Values { + switch tok { + case InToken, NotInToken: + tok = IdentifierToken + } + } + return tok, lit +} + +// consume returns current token and string. Increments the position +func (p *Parser) consume(context ParserContext) (Token, string) { + p.position++ + tok, lit := p.scannedItems[p.position-1].tok, p.scannedItems[p.position-1].literal + if context == Values { + switch tok { + case InToken, NotInToken: + tok = IdentifierToken + } + } + return tok, lit +} + +// scan runs through the input string and stores the ScannedItem in an array +// Parser can now lookahead and consume the tokens +func (p *Parser) scan() { + for { + token, literal := p.l.Lex() + p.scannedItems = append(p.scannedItems, ScannedItem{token, literal}) + if token == EndOfStringToken { + break + } + } +} + +// parse runs the left recursive descending algorithm +// on input string. It returns a list of Requirement objects. +func (p *Parser) parse() (internalSelector, error) { + p.scan() // init scannedItems + + var requirements internalSelector + for { + tok, lit := p.lookahead(Values) + switch tok { + case IdentifierToken, DoesNotExistToken: + r, err := p.parseRequirement() + if err != nil { + return nil, fmt.Errorf("unable to parse requirement: %v", err) + } + requirements = append(requirements, *r) + t, l := p.consume(Values) + switch t { + case EndOfStringToken: + return requirements, nil + case CommaToken: + t2, l2 := p.lookahead(Values) + if t2 != IdentifierToken && t2 != DoesNotExistToken { + return nil, fmt.Errorf("found '%s', expected: identifier after ','", l2) + } + default: + return nil, fmt.Errorf("found '%s', expected: ',' or 'end of string'", l) + } + case EndOfStringToken: + return requirements, nil + default: + return nil, fmt.Errorf("found '%s', expected: !, identifier, or 'end of string'", lit) + } + } +} + +func (p *Parser) parseRequirement() (*Requirement, error) { + key, operator, err := p.parseKeyAndInferOperator() + if err != nil { + return nil, err + } + if operator == selection.Exists || operator == selection.DoesNotExist { // operator found lookahead set checked + if !labelSelectorRegex.MatchString(key) { + return nil, fmt.Errorf("existence tests are valid only for labels; not valid for field '%s'", key) + } + return NewRequirement(key, operator, []string{}, field.WithPath(p.path)) + } + operator, err = p.parseOperator() + if err != nil { + return nil, err + } + var values sets.String + switch operator { + case selection.In, selection.NotIn: + values, err = p.parseValues() + case selection.Equals, selection.DoubleEquals, selection.NotEquals, selection.GreaterThan, selection.LessThan: + values, err = p.parseExactValue() + } + if err != nil { + return nil, err + } + return NewRequirement(key, operator, values.List(), field.WithPath(p.path)) + +} + +// parseKeyAndInferOperator parses literals. +// in case of no operator '!, in, notin, ==, =, !=' are found +// the 'exists' operator is inferred +func (p *Parser) parseKeyAndInferOperator() (string, selection.Operator, error) { + var operator selection.Operator + tok, literal := p.consume(Values) + if tok == DoesNotExistToken { + operator = selection.DoesNotExist + tok, literal = p.consume(Values) + } + if tok != IdentifierToken { + err := fmt.Errorf("found '%s', expected: identifier", literal) + return "", "", err + } + if t, _ := p.lookahead(Values); t == EndOfStringToken || t == CommaToken { + if operator != selection.DoesNotExist { + operator = selection.Exists + } + } + return literal, operator, nil +} + +// parseOperator returns operator and eventually matchType +// matchType can be exact +func (p *Parser) parseOperator() (op selection.Operator, err error) { + tok, lit := p.consume(KeyAndOperator) + switch tok { + // DoesNotExistToken shouldn't be here because it's a unary operator, not a binary operator + case InToken: + op = selection.In + case EqualsToken: + op = selection.Equals + case DoubleEqualsToken: + op = selection.DoubleEquals + case GreaterThanToken: + op = selection.GreaterThan + case LessThanToken: + op = selection.LessThan + case NotInToken: + op = selection.NotIn + case NotEqualsToken: + op = selection.NotEquals + default: + if lit == "lt" { + op = selection.LessThan + } else if lit == "gt" { + op = selection.GreaterThan + } else { + return "", fmt.Errorf("found '%s', expected: %v", lit, strings.Join(binaryOperators, ", ")) + } + } + return op, nil +} + +// parseValues parses the values for set based matching (x,y,z) +func (p *Parser) parseValues() (sets.String, error) { + tok, lit := p.consume(Values) + if tok != OpenParToken { + return nil, fmt.Errorf("found '%s' expected: '('", lit) + } + tok, lit = p.lookahead(Values) + switch tok { + case IdentifierToken, CommaToken: + s, err := p.parseIdentifiersList() // handles general cases + if err != nil { + return s, err + } + if tok, _ = p.consume(Values); tok != ClosedParToken { + return nil, fmt.Errorf("found '%s', expected: ')'", lit) + } + return s, nil + case ClosedParToken: // handles "()" + p.consume(Values) + return sets.NewString(""), nil + default: + return nil, fmt.Errorf("found '%s', expected: ',', ')' or identifier", lit) + } +} + +// parseIdentifiersList parses a (possibly empty) list of +// of comma separated (possibly empty) identifiers +func (p *Parser) parseIdentifiersList() (sets.String, error) { + s := sets.NewString() + for { + tok, lit := p.consume(Values) + switch tok { + case IdentifierToken: + s.Insert(lit) + tok2, lit2 := p.lookahead(Values) + switch tok2 { + case CommaToken: + continue + case ClosedParToken: + return s, nil + default: + return nil, fmt.Errorf("found '%s', expected: ',' or ')'", lit2) + } + case CommaToken: // handled here since we can have "(," + if s.Len() == 0 { + s.Insert("") // to handle (, + } + tok2, _ := p.lookahead(Values) + if tok2 == ClosedParToken { + s.Insert("") // to handle ,) Double "" removed by StringSet + return s, nil + } + if tok2 == CommaToken { + p.consume(Values) + s.Insert("") // to handle ,, Double "" removed by StringSet + } + default: // it can be operator + return s, fmt.Errorf("found '%s', expected: ',', or identifier", lit) + } + } +} + +// parseExactValue parses the only value for exact match style +func (p *Parser) parseExactValue() (sets.String, error) { + s := sets.NewString() + tok, _ := p.lookahead(Values) + if tok == EndOfStringToken || tok == CommaToken { + s.Insert("") + return s, nil + } + tok, lit := p.consume(Values) + if tok == IdentifierToken || tok == QuotedStringToken { + s.Insert(lit) + return s, nil + } + return nil, fmt.Errorf("found '%s', expected: identifier", lit) +} + +// Parse takes a string representing a selector and returns a selector +// object, or an error. This parsing function differs from ParseSelector +// as they parse different selectors with different syntaxes. +// The input will cause an error if it does not follow this form: +// +// ::= | "," +// ::= [!] KEY [ | ] +// ::= "" | +// ::= | +// ::= "notin" +// ::= "in" +// ::= "(" ")" +// ::= VALUE | VALUE "," +// ::= ["="|"=="|"!="] VALUE +// +// KEY is a sequence of one or more characters following [ DNS_SUBDOMAIN "/" ] DNS_LABEL. Max length is 63 characters. +// VALUE is a sequence of zero or more characters "([A-Za-z0-9_-\.])". Max length is 63 characters. +// Delimiter is white space: (' ', '\t') +// Example of valid syntax: +// +// "x in (foo,,baz),y,z notin ()" +// +// Note: +// 1. Inclusion - " in " - denotes that the KEY exists and is equal to any of the +// VALUEs in its requirement +// 2. Exclusion - " notin " - denotes that the KEY is not equal to any +// of the VALUEs in its requirement or does not exist +// 3. The empty string is a valid VALUE +// 4. A requirement with just a KEY - as in "y" above - denotes that +// the KEY exists and can be any VALUE. +// 5. A requirement with just !KEY requires that the KEY not exist. +func Parse(selector string, opts ...field.PathOption) (Selector, error) { + pathThing := field.ToPath(opts...) + parsedSelector, err := parse(selector, pathThing) + if err == nil { + return parsedSelector, nil + } + return nil, err +} + +// parse parses the string representation of the selector and returns the internalSelector struct. +// The callers of this method can then decide how to return the internalSelector struct to their +// callers. This function has two callers now, one returns a Selector interface and the other +// returns a list of requirements. +func parse(selector string, path *field.Path) (internalSelector, error) { + p := &Parser{l: &Lexer{s: selector, pos: 0}, path: path} + items, err := p.parse() + if err != nil { + return nil, err + } + return internalSelector(items), err +} + +// SelectorFromSet returns a Selector which will match exactly the given Set. A +// nil and empty Sets are considered equivalent to Everything(). +// It does not perform any validation, which means the server will reject +// the request if the Set contains invalid values. +func SelectorFromSet(ls Set) Selector { + return SelectorFromValidatedSet(ls) +} + +// ValidatedSelectorFromSet returns a Selector which will match exactly the given Set. A +// nil and empty Sets are considered equivalent to Everything(). +// The Set is validated client-side, which allows to catch errors early. +func ValidatedSelectorFromSet(ls Set) (Selector, error) { + if ls == nil || len(ls) == 0 { + return internalSelector{}, nil + } + requirements := make([]Requirement, 0, len(ls)) + for label, value := range ls { + r, err := NewRequirement(label, selection.Equals, []string{value}) + if err != nil { + return nil, err + } + requirements = append(requirements, *r) + } + return internalSelector(requirements), nil +} + +// SelectorFromValidatedSet returns a Selector which will match exactly the given Set. +// A nil and empty Sets are considered equivalent to Everything(). +// It assumes that Set is already validated and doesn't do any validation. +// Note: this method copies the Set; if the Set is immutable, consider wrapping it with ValidatedSetSelector +// instead, which does not copy. +func SelectorFromValidatedSet(ls Set) Selector { + if ls == nil || len(ls) == 0 { + return internalSelector{} + } + requirements := make([]Requirement, 0, len(ls)) + for label, value := range ls { + requirements = append(requirements, Requirement{key: label, operator: selection.Equals, strValues: []string{value}}) + } + return internalSelector(requirements) +} + +// ParseToRequirements takes a string representing a selector and returns a list of +// requirements. This function is suitable for those callers that perform additional +// processing on selector requirements. +// See the documentation for Parse() function for more details. +// TODO: Consider exporting the internalSelector type instead. +func ParseToRequirements(selector string, opts ...field.PathOption) ([]Requirement, error) { + return parse(selector, field.ToPath(opts...)) +} + +// ValidatedSetSelector wraps a Set, allowing it to implement the Selector interface. Unlike +// Set.AsSelectorPreValidated (which copies the input Set), this type simply wraps the underlying +// Set. As a result, it is substantially more efficient. A nil and empty Sets are considered +// equivalent to Everything(). +// +// Callers MUST ensure the underlying Set is not mutated, and that it is already validated. If these +// constraints are not met, Set.AsValidatedSelector should be preferred +// +// None of the Selector methods mutate the underlying Set, but Add() and Requirements() convert to +// the less optimized version. +type ValidatedSetSelector Set + +func (s ValidatedSetSelector) Matches(labels Labels) bool { + for k, v := range s { + if !labels.Has(k) || v != labels.Get(k) { + return false + } + } + return true +} + +func (s ValidatedSetSelector) Empty() bool { + return len(s) == 0 +} + +func (s ValidatedSetSelector) String() string { + keys := make([]string, 0, len(s)) + for k := range s { + keys = append(keys, k) + } + // Ensure deterministic output + sort.Strings(keys) + b := strings.Builder{} + for i, key := range keys { + v := s[key] + b.Grow(len(key) + 2 + len(v)) + if i != 0 { + b.WriteString(",") + } + b.WriteString(key) + b.WriteString("=") + b.WriteString(v) + } + return b.String() +} + +func (s ValidatedSetSelector) Requirements() (requirements Requirements, selectable bool) { + return s.toFullSelector().Requirements() +} + +func (s ValidatedSetSelector) DeepCopySelector() Selector { + res := make(ValidatedSetSelector, len(s)) + for k, v := range s { + res[k] = v + } + return res +} + +func (s ValidatedSetSelector) RequiresExactMatch(label string) (value string, found bool) { + v, f := s[label] + return v, f +} + +func (s ValidatedSetSelector) toFullSelector() Selector { + return SelectorFromValidatedSet(Set(s)) +} + +var _ Selector = ValidatedSetSelector{} diff --git a/pkg/stores/sqlpartition/queryparser/selector_test.go b/pkg/stores/sqlpartition/queryparser/selector_test.go new file mode 100644 index 00000000..05af7548 --- /dev/null +++ b/pkg/stores/sqlpartition/queryparser/selector_test.go @@ -0,0 +1,454 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +This file is derived from +https://github.com/kubernetes/apimachinery/blob/master/pkg/labels/selector_test.go +*/ + +package queryparser + +import ( + "fmt" + "reflect" + "regexp" + "strings" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/validation/field" +) + +var ( + ignoreDetail = cmpopts.IgnoreFields(field.Error{}, "Detail") +) + +func TestSelectorParse(t *testing.T) { + testGoodStrings := []string{ + "x=a,y=b,z=c", + "", + "x!=a,y=b", + "x=", + "x= ", + "x=,z= ", + "x= ,z= ", + "x>1", + "x>1,z<5", + "x gt 1,z lt 5", + `x == "abc"`, + `y == 'def'`, + "metadata.labels.im-here", + "!metadata.labels.im-not-here", + "metadata.labels[im.here]", + "!metadata.labels[im.not.here]", + } + testBadStrings := []string{ + "!no-label-absence-test", + "no-label-presence-test", + "x=a||y=b", + "x==a==b", + "!x=a", + "x", GreaterThanToken}, + {"<", LessThanToken}, + //Note that Lex returns the longest valid token found + {"!", DoesNotExistToken}, + {"!=", NotEqualsToken}, + {"(", OpenParToken}, + {")", ClosedParToken}, + //Non-"special" characters are considered part of an identifier + {"~", IdentifierToken}, + {"||", IdentifierToken}, + {`"unclosed dq string`, ErrorToken}, + {`'unclosed sq string`, ErrorToken}, + {`'unclosed sq string on an escape \`, ErrorToken}, + } + for _, v := range testcases { + l := &Lexer{s: v.s, pos: 0} + token, lit := l.Lex() + if token != v.t { + t.Errorf("Got %d it should be %d for '%s'", token, v.t, v.s) + } + if v.t != ErrorToken && lit != v.s { + t.Errorf("Got '%s' it should be '%s'", lit, v.s) + } + } +} +func TestQuotedStringLexer(t *testing.T) { + testcases := []struct { + s string + t Token + }{ + {`"abc"`, QuotedStringToken}, + {`'def'`, QuotedStringToken}, + {`"abc, bs:\\, dq:\", sq:\', x:\x"`, QuotedStringToken}, + {`'def, bs:\\, dq:\", sq:\', x:\x'`, QuotedStringToken}, + } + rx := regexp.MustCompile(`\\(.)`) + for _, v := range testcases { + l := &Lexer{s: v.s, pos: 0} + token, lit := l.Lex() + if token != v.t { + t.Errorf("Got %d it should be %d for '%s'", token, v.t, v.s) + } + if v.t != ErrorToken { + //expectedLit := v.s[1 : len(v.s)-1] + expectedLit := v.s + expectedLit = rx.ReplaceAllString(expectedLit, "$1") + if lit != expectedLit { + t.Errorf("Got '%s' it should be '%s'", lit, expectedLit) + } + } + } +} + +func min(l, r int) (m int) { + m = r + if l < r { + m = l + } + return m +} + +func TestLexerSequence(t *testing.T) { + testcases := []struct { + s string + t []Token + }{ + {"key in ( value )", []Token{IdentifierToken, InToken, OpenParToken, IdentifierToken, ClosedParToken}}, + {"key notin ( value )", []Token{IdentifierToken, NotInToken, OpenParToken, IdentifierToken, ClosedParToken}}, + {"key in ( value1, value2 )", []Token{IdentifierToken, InToken, OpenParToken, IdentifierToken, CommaToken, IdentifierToken, ClosedParToken}}, + {"key", []Token{IdentifierToken}}, + {"!key", []Token{DoesNotExistToken, IdentifierToken}}, + {"()", []Token{OpenParToken, ClosedParToken}}, + {"x in (),y", []Token{IdentifierToken, InToken, OpenParToken, ClosedParToken, CommaToken, IdentifierToken}}, + {"== != (), = notin", []Token{DoubleEqualsToken, NotEqualsToken, OpenParToken, ClosedParToken, CommaToken, EqualsToken, NotInToken}}, + {"key>2", []Token{IdentifierToken, GreaterThanToken, IdentifierToken}}, + {"key<1", []Token{IdentifierToken, LessThanToken, IdentifierToken}}, + {"key gt 3", []Token{IdentifierToken, IdentifierToken, IdentifierToken}}, + {"key lt 4", []Token{IdentifierToken, IdentifierToken, IdentifierToken}}, + {"key=value", []Token{IdentifierToken, EqualsToken, IdentifierToken}}, + {"key == value", []Token{IdentifierToken, DoubleEqualsToken, IdentifierToken}}, + {`"abc"`, []Token{QuotedStringToken}}, + {"'def'", []Token{QuotedStringToken}}, + } + for _, v := range testcases { + var tokens []Token + l := &Lexer{s: v.s, pos: 0} + for { + token, _ := l.Lex() + if token == EndOfStringToken { + break + } + tokens = append(tokens, token) + } + if len(tokens) != len(v.t) { + t.Errorf("Bad number of tokens for '%s': got %d, wanted %d (got %v)", v.s, len(tokens), len(v.t), tokens) + } + for i := 0; i < min(len(tokens), len(v.t)); i++ { + if tokens[i] != v.t[i] { + t.Errorf("Test '%s': Mismatching in token type found '%v' it should be '%v'", v.s, tokens[i], v.t[i]) + } + } + } +} +func TestParserLookahead(t *testing.T) { + testcases := []struct { + s string + t []Token + }{ + {"key in ( value )", []Token{IdentifierToken, InToken, OpenParToken, IdentifierToken, ClosedParToken, EndOfStringToken}}, + {"key notin ( value )", []Token{IdentifierToken, NotInToken, OpenParToken, IdentifierToken, ClosedParToken, EndOfStringToken}}, + {"key in ( value1, value2 )", []Token{IdentifierToken, InToken, OpenParToken, IdentifierToken, CommaToken, IdentifierToken, ClosedParToken, EndOfStringToken}}, + {"key", []Token{IdentifierToken, EndOfStringToken}}, + {"!key", []Token{DoesNotExistToken, IdentifierToken, EndOfStringToken}}, + {"()", []Token{OpenParToken, ClosedParToken, EndOfStringToken}}, + {"", []Token{EndOfStringToken}}, + {"x in (),y", []Token{IdentifierToken, InToken, OpenParToken, ClosedParToken, CommaToken, IdentifierToken, EndOfStringToken}}, + {"== != (), = notin", []Token{DoubleEqualsToken, NotEqualsToken, OpenParToken, ClosedParToken, CommaToken, EqualsToken, NotInToken, EndOfStringToken}}, + {"key>2", []Token{IdentifierToken, GreaterThanToken, IdentifierToken, EndOfStringToken}}, + {"key<1", []Token{IdentifierToken, LessThanToken, IdentifierToken, EndOfStringToken}}, + {"key gt 3", []Token{IdentifierToken, GreaterThanToken, IdentifierToken, EndOfStringToken}}, + {"key lt 4", []Token{IdentifierToken, LessThanToken, IdentifierToken, EndOfStringToken}}, + {`key == "dq string"`, []Token{IdentifierToken, DoubleEqualsToken, QuotedStringToken, EndOfStringToken}}, + {`key = 'sq string'`, []Token{IdentifierToken, EqualsToken, QuotedStringToken, EndOfStringToken}}, + } + for _, v := range testcases { + p := &Parser{l: &Lexer{s: v.s, pos: 0}, position: 0} + p.scan() + if len(p.scannedItems) != len(v.t) { + t.Errorf("Expected %d items found %d", len(v.t), len(p.scannedItems)) + } + for { + token, lit := p.lookahead(KeyAndOperator) + + token2, lit2 := p.consume(KeyAndOperator) + if token == EndOfStringToken { + break + } + if token != token2 || lit != lit2 { + t.Errorf("Bad values") + } + } + } +} + +func TestParseOperator(t *testing.T) { + testcases := []struct { + token string + expectedError error + }{ + {"in", nil}, + {"=", nil}, + {"==", nil}, + {">", nil}, + {"<", nil}, + {"lt", nil}, + {"gt", nil}, + {"notin", nil}, + {"!=", nil}, + {"!", fmt.Errorf("found '%s', expected: %v", selection.DoesNotExist, strings.Join(binaryOperators, ", "))}, + {"exists", fmt.Errorf("found '%s', expected: %v", selection.Exists, strings.Join(binaryOperators, ", "))}, + {"(", fmt.Errorf("found '%s', expected: %v", "(", strings.Join(binaryOperators, ", "))}, + } + for _, testcase := range testcases { + p := &Parser{l: &Lexer{s: testcase.token, pos: 0}, position: 0} + p.scan() + + _, err := p.parseOperator() + if ok := reflect.DeepEqual(testcase.expectedError, err); !ok { + t.Errorf("\nexpect err [%v], \nactual err [%v]", testcase.expectedError, err) + } + } +} + +// Some error fields are commented out here because this fork no longer +// enforces k8s label expression lexical and length restrictions +func TestRequirementConstructor(t *testing.T) { + requirementConstructorTests := []struct { + Key string + Op selection.Operator + Vals sets.String + WantErr field.ErrorList + }{ + { + Key: "x1", + Op: selection.In, + WantErr: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeInvalid, + Field: "values", + BadValue: []string{}, + }, + }, + }, + { + Key: "x2", + Op: selection.NotIn, + Vals: sets.NewString(), + WantErr: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeInvalid, + Field: "values", + BadValue: []string{}, + }, + }, + }, + { + Key: "x3", + Op: selection.In, + Vals: sets.NewString("foo"), + }, + { + Key: "x4", + Op: selection.NotIn, + Vals: sets.NewString("foo"), + }, + { + Key: "x5", + Op: selection.Equals, + Vals: sets.NewString("foo", "bar"), + WantErr: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeInvalid, + Field: "values", + BadValue: []string{"bar", "foo"}, + }, + }, + }, + { + Key: "x6", + Op: selection.Exists, + }, + { + Key: "x7", + Op: selection.DoesNotExist, + }, + { + Key: "x8", + Op: selection.Exists, + Vals: sets.NewString("foo"), + WantErr: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeInvalid, + Field: "values", + BadValue: []string{"foo"}, + }, + }, + }, + { + Key: "x9", + Op: selection.In, + Vals: sets.NewString("bar"), + }, + { + Key: "x10", + Op: selection.In, + Vals: sets.NewString("bar"), + }, + { + Key: "x11", + Op: selection.GreaterThan, + Vals: sets.NewString("1"), + }, + { + Key: "x12", + Op: selection.LessThan, + Vals: sets.NewString("6"), + }, + { + Key: "x13", + Op: selection.GreaterThan, + WantErr: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeInvalid, + Field: "values", + BadValue: []string{}, + }, + }, + }, + { + Key: "x14", + Op: selection.GreaterThan, + Vals: sets.NewString("bar"), + WantErr: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeInvalid, + Field: "values[0]", + BadValue: "bar", + }, + }, + }, + { + Key: "x15", + Op: selection.LessThan, + Vals: sets.NewString("bar"), + WantErr: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeInvalid, + Field: "values[0]", + BadValue: "bar", + }, + }, + }, + { + Key: strings.Repeat("a", 254), //breaks DNS rule that len(key) <= 253 + Op: selection.Exists, + //WantErr: field.ErrorList{ + // &field.Error{ + // Type: field.ErrorTypeInvalid, + // Field: "key", + // BadValue: strings.Repeat("a", 254), + // }, + //}, + }, + { + Key: "x16", + Op: selection.Equals, + Vals: sets.NewString(strings.Repeat("a", 254)), + //WantErr: field.ErrorList{ + // &field.Error{ + // Type: field.ErrorTypeInvalid, + // Field: "values[0][x16]", + // BadValue: strings.Repeat("a", 254), + // }, + //}, + }, + { + Key: "x17", + Op: selection.Equals, + Vals: sets.NewString("a b"), + //WantErr: field.ErrorList{ + // &field.Error{ + // Type: field.ErrorTypeInvalid, + // Field: "values[0][x17]", + // BadValue: "a b", + // }, + //}, + }, + { + Key: "x18", + Op: "unsupportedOp", + WantErr: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeNotSupported, + Field: "operator", + BadValue: selection.Operator("unsupportedOp"), + }, + }, + }, + } + for _, rc := range requirementConstructorTests { + _, err := NewRequirement(rc.Key, rc.Op, rc.Vals.List()) + if diff := cmp.Diff(rc.WantErr.ToAggregate(), err, ignoreDetail); diff != "" { + t.Errorf("NewRequirement test %v returned unexpected error (-want,+got):\n%s", rc.Key, diff) + } + } +} diff --git a/pkg/stores/sqlpartition/queryparser/zz_generated.deepcopy.go b/pkg/stores/sqlpartition/queryparser/zz_generated.deepcopy.go new file mode 100644 index 00000000..7e062c6a --- /dev/null +++ b/pkg/stores/sqlpartition/queryparser/zz_generated.deepcopy.go @@ -0,0 +1,48 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +This file is derived from +https://github.com/kubernetes/apimachinery/blob/master/pkg/labels/zz_generated.deepcopy.go +*/ + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package queryparser + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Requirement) DeepCopyInto(out *Requirement) { + *out = *in + if in.strValues != nil { + in, out := &in.strValues, &out.strValues + *out = make([]string, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Requirement. +func (in *Requirement) DeepCopy() *Requirement { + if in == nil { + return nil + } + out := new(Requirement) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/stores/sqlpartition/store.go b/pkg/stores/sqlpartition/store.go index f4ebb325..7e284b0b 100644 --- a/pkg/stores/sqlpartition/store.go +++ b/pkg/stores/sqlpartition/store.go @@ -5,6 +5,7 @@ package sqlpartition import ( "context" + "fmt" "github.com/rancher/apiserver/pkg/types" "github.com/rancher/steve/pkg/accesscontrol" @@ -92,13 +93,13 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP result.Count = total - for _, item := range list { + for _, item := range list.Items { item := item.DeepCopy() // the sql cache automatically adds the ID through a transformFunc. Because of this, we have a different set of reserved fields for the SQL cache result.Objects = append(result.Objects, partition.ToAPI(schema, item, nil, s.sqlReservedFields)) } - result.Revision = "" + result.Revision = list.GetResourceVersion() result.Continue = continueToken return result, nil } @@ -134,6 +135,8 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types store := s.Partitioner.Store() + fmt.Println("HITHERE from watch") + response := make(chan types.APIEvent) c, err := store.WatchByPartitions(apiOp, schema, wr, partitions) if err != nil { @@ -143,8 +146,13 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types go func() { defer close(response) - for i := range c { - response <- partition.ToAPIEvent(nil, schema, i) + for revision := range c { + fmt.Println("Revision from watch", revision) + response <- types.APIEvent{ + Name: "resource.changes", + ResourceType: schema.ID, + Revision: revision, + } } }() diff --git a/pkg/stores/sqlproxy/proxy_store.go b/pkg/stores/sqlproxy/proxy_store.go index 878405a0..8b6e4440 100644 --- a/pkg/stores/sqlproxy/proxy_store.go +++ b/pkg/stores/sqlproxy/proxy_store.go @@ -9,21 +9,18 @@ import ( "io" "io/ioutil" "net/http" - "os" "strconv" "strings" "sync" + "time" "github.com/pkg/errors" "github.com/sirupsen/logrus" - "golang.org/x/sync/errgroup" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" apitypes "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" @@ -57,8 +54,9 @@ const ( ) var ( - paramScheme = runtime.NewScheme() - paramCodec = runtime.NewParameterCodec(paramScheme) + paramScheme = runtime.NewScheme() + paramCodec = runtime.NewParameterCodec(paramScheme) + // Please keep the gvkKey entries in alphabetical order, on a field-by-field basis typeSpecificIndexedFields = map[string][][]string{ gvkKey("", "v1", "ConfigMap"): { {"metadata", "labels[harvesterhci.io/cloud-init-template]"}}, @@ -306,11 +304,13 @@ func (s *Store) initializeNamespaceCache() error { // get any type-specific fields that steve is interested in fields = append(fields, getFieldForGVK(gvk)...) - // get the type-specifc transform func + // get the type-specific transform func transformFunc := s.transformBuilder.GetTransformFunc(gvk) // get the ns informer - nsInformer, err := s.cacheFactory.CacheFor(fields, transformFunc, &tablelistconvert.Client{ResourceInterface: client}, attributes.GVK(&nsSchema), false, true) + client2 := &tablelistconvert.Client{ResourceInterface: client} + attrs := attributes.GVK(&nsSchema) + nsInformer, err := s.cacheFactory.CacheFor(fields, transformFunc, client2, attrs, false, true) if err != nil { return err } @@ -442,146 +442,6 @@ func returnErr(err error, c chan watch.Event) { } } -func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInterface, schema *types.APISchema, w types.WatchRequest, result chan watch.Event) { - rev := w.Revision - if rev == "-1" || rev == "0" { - rev = "" - } - - timeout := int64(60 * 30) - timeoutSetting := os.Getenv(watchTimeoutEnv) - if timeoutSetting != "" { - userSetTimeout, err := strconv.Atoi(timeoutSetting) - if err != nil { - logrus.Debugf("could not parse %s environment variable, error: %v", watchTimeoutEnv, err) - } else { - timeout = int64(userSetTimeout) - } - } - k8sClient, _ := metricsStore.Wrap(client, nil) - watcher, err := k8sClient.Watch(apiOp, metav1.ListOptions{ - Watch: true, - TimeoutSeconds: &timeout, - ResourceVersion: rev, - LabelSelector: w.Selector, - }) - if err != nil { - returnErr(errors.Wrapf(err, "stopping watch for %s: %v", schema.ID, err), result) - return - } - defer watcher.Stop() - logrus.Debugf("opening watcher for %s", schema.ID) - - eg, ctx := errgroup.WithContext(apiOp.Context()) - - go func() { - <-ctx.Done() - watcher.Stop() - }() - - if s.notifier != nil { - eg.Go(func() error { - for rel := range s.notifier.OnInboundRelationshipChange(ctx, schema, apiOp.Namespace) { - obj, _, err := s.byID(apiOp, schema, rel.Namespace, rel.Name) - if err == nil { - rowToObject(obj) - result <- watch.Event{Type: watch.Modified, Object: obj} - } else { - returnErr(errors.Wrapf(err, "notifier watch error: %v", err), result) - } - } - return fmt.Errorf("closed") - }) - } - - eg.Go(func() error { - for event := range watcher.ResultChan() { - if event.Type == watch.Error { - if status, ok := event.Object.(*metav1.Status); ok { - returnErr(fmt.Errorf("event watch error: %s", status.Message), result) - } else { - logrus.Debugf("event watch error: could not decode event object %T", event.Object) - } - continue - } - if unstr, ok := event.Object.(*unstructured.Unstructured); ok { - rowToObject(unstr) - } - result <- event - } - return fmt.Errorf("closed") - }) - - _ = eg.Wait() - return -} - -// WatchNames returns a channel of events filtered by an allowed set of names. -// In plain kubernetes, if a user has permission to 'list' or 'watch' a defined set of resource names, -// performing the list or watch will result in a Forbidden error, because the user does not have permission -// to list *all* resources. -// With this filter, the request can be performed successfully, and only the allowed resources will -// be returned in watch. -func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, names sets.Set[string]) (chan watch.Event, error) { - buffer := &WarningBuffer{} - adminClient, err := s.clientGetter.TableAdminClientForWatch(apiOp, schema, apiOp.Namespace, buffer) - if err != nil { - return nil, err - } - c, err := s.watch(apiOp, schema, w, adminClient) - if err != nil { - return nil, err - } - - result := make(chan watch.Event) - go func() { - defer close(result) - for item := range c { - if item.Type == watch.Error { - if status, ok := item.Object.(*metav1.Status); ok { - logrus.Debugf("WatchNames received error: %s", status.Message) - } else { - logrus.Debugf("WatchNames received error: %v", item) - } - result <- item - continue - } - - m, err := meta.Accessor(item.Object) - if err != nil { - logrus.Debugf("WatchNames cannot process unexpected object: %s", err) - continue - } - - if names.Has(m.GetName()) { - result <- item - } - } - }() - - return result, nil -} - -// Watch returns a channel of events for a list or resource. -func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan watch.Event, error) { - buffer := &WarningBuffer{} - client, err := s.clientGetter.TableClientForWatch(apiOp, schema, apiOp.Namespace, buffer) - if err != nil { - return nil, err - } - return s.watch(apiOp, schema, w, client) -} - -func (s *Store) watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, client dynamic.ResourceInterface) (chan watch.Event, error) { - result := make(chan watch.Event) - go func() { - s.listAndWatch(apiOp, client, schema, w, result) - logrus.Debugf("closing watcher for %s", schema.ID) - close(result) - }() - return result, nil -} - // Create creates a single object in the store. func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject) (*unstructured.Unstructured, []types.Warning, error) { var ( @@ -734,7 +594,7 @@ func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id stri // - the total number of resources (returned list might be a subset depending on pagination options in apiOp) // - a continue token, if there are more pages after the returned one // - an error instead of all of the above if anything went wrong -func (s *Store) ListByPartitions(apiOp *types.APIRequest, schema *types.APISchema, partitions []partition.Partition) ([]unstructured.Unstructured, int, string, error) { +func (s *Store) ListByPartitions(apiOp *types.APIRequest, schema *types.APISchema, partitions []partition.Partition) (*unstructured.UnstructuredList, int, string, error) { opts, err := listprocessor.ParseQuery(apiOp, s.namespaceCache) if err != nil { return nil, 0, "", err @@ -745,12 +605,15 @@ func (s *Store) ListByPartitions(apiOp *types.APIRequest, schema *types.APISchem if err != nil { return nil, 0, "", err } + gvk := attributes.GVK(schema) fields := getFieldsFromSchema(schema) fields = append(fields, getFieldForGVK(gvk)...) transformFunc := s.transformBuilder.GetTransformFunc(gvk) - - inf, err := s.cacheFactory.CacheFor(fields, transformFunc, &tablelistconvert.Client{ResourceInterface: client}, attributes.GVK(schema), attributes.Namespaced(schema), controllerschema.IsListWatchable(schema)) + client2 := &tablelistconvert.Client{ResourceInterface: client} + attrs2 := attributes.GVK(schema) + ns2 := attributes.Namespaced(schema) + inf, err := s.cacheFactory.CacheFor(fields, transformFunc, client2, attrs2, ns2, controllerschema.IsListWatchable(schema)) if err != nil { return nil, 0, "", err } @@ -762,54 +625,60 @@ func (s *Store) ListByPartitions(apiOp *types.APIRequest, schema *types.APISchem } return nil, 0, "", err } + if total > 0 { + resourceVersion := inf.ByOptionsLister.(*informer.Informer).ByOptionsLister.(*informer.ListOptionIndexer).GetLastResourceVersion() + list.SetResourceVersion(resourceVersion) + } else { + list.SetResourceVersion("") + } - return list.Items, total, continueToken, nil + return list, total, continueToken, nil } // WatchByPartitions returns a channel of events for a list or resource belonging to any of the specified partitions -func (s *Store) WatchByPartitions(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest, partitions []partition.Partition) (chan watch.Event, error) { - ctx, cancel := context.WithCancel(apiOp.Context()) - apiOp = apiOp.Clone().WithContext(ctx) - - eg := errgroup.Group{} - - result := make(chan watch.Event) - - for _, partition := range partitions { - p := partition - eg.Go(func() error { - defer cancel() - c, err := s.watchByPartition(p, apiOp, schema, wr) +func (s *Store) WatchByPartitions(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest, partitions []partition.Partition) (chan string, error) { + ctx := apiOp.Context() - if err != nil { - return err - } - for i := range c { - result <- i - } - return nil - }) + revision := 0 + if wr.Revision != "" { + parsedRevision, err := strconv.ParseInt(wr.Revision, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid revision %q: %w", wr.Revision, err) + } + revision = int(parsedRevision) } + _ = revision - go func() { - defer close(result) - <-ctx.Done() - eg.Wait() - cancel() - }() + // XXX: Why was this needed at all?? + apiOp = apiOp.Clone().WithContext(ctx) - return result, nil -} + // warnings from inside the informer are discarded + buffer := WarningBuffer{} + client, err := s.clientGetter.TableAdminClient(apiOp, schema, "", &buffer) + if err != nil { + return nil, err + } -// watchByPartition returns a channel of events for a list or resource belonging to a specified partition -func (s *Store) watchByPartition(partition partition.Partition, apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan watch.Event, error) { - if partition.Passthrough { - return s.Watch(apiOp, schema, wr) + gvk := attributes.GVK(schema) + fields := getFieldsFromSchema(schema) + fields = append(fields, getFieldForGVK(gvk)...) + transformFunc := s.transformBuilder.GetTransformFunc(gvk) + client2 := &tablelistconvert.Client{ResourceInterface: client} + attrs2 := attributes.GVK(schema) + ns2 := attributes.Namespaced(schema) + inf, err := s.cacheFactory.CacheFor(fields, transformFunc, client2, attrs2, ns2, controllerschema.IsListWatchable(schema)) + if err != nil { + return nil, err } - apiOp.Namespace = partition.Namespace - if partition.All { - return s.Watch(apiOp, schema, wr) + debounceListener := newDebounceListener(5 * time.Second) + _ = inf.Watch(ctx, debounceListener) + resourceVersion := inf.ByOptionsLister.(*informer.Informer).ByOptionsLister.(*informer.ListOptionIndexer).GetLastResourceVersion() + if wr.Revision != resourceVersion { + debounceListener.NotifyNow(resourceVersion) } - return s.WatchNames(apiOp, schema, wr, partition.Names) + + go debounceListener.Run(ctx) + + return debounceListener.ch, nil } diff --git a/pkg/stores/sqlproxy/watchers.go b/pkg/stores/sqlproxy/watchers.go new file mode 100644 index 00000000..5e28fca5 --- /dev/null +++ b/pkg/stores/sqlproxy/watchers.go @@ -0,0 +1,60 @@ +package sqlproxy + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/rancher/steve/pkg/sqlcache/informer" +) + +var _ informer.Listener = (*debounceListener)(nil) + +type debounceListener struct { + lock sync.Mutex + lastRevision string + + debounceRate time.Duration + ch chan string +} + +func newDebounceListener(debounceRate time.Duration) *debounceListener { + listener := &debounceListener{ + debounceRate: debounceRate, + ch: make(chan string, 100), + } + return listener +} + +func (d *debounceListener) Run(ctx context.Context) { + ticker := time.NewTicker(d.debounceRate) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + close(d.ch) + return + case <-ticker.C: + d.lock.Lock() + if d.lastRevision != "" { + d.ch <- d.lastRevision + d.lastRevision = "" + } + d.lock.Unlock() + } + } +} + +func (d *debounceListener) NotifyNow(revision string) { + d.lock.Lock() + defer d.lock.Unlock() + d.ch <- revision +} + +func (d *debounceListener) Notify(revision string) { + fmt.Println("Notify(", revision, ")") + d.lock.Lock() + defer d.lock.Unlock() + d.lastRevision = revision +}