Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dumb watch mechanism for Vai #277

Draft
wants to merge 58 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
efb13f3
Add more fields to index when sql-caching is on.
ericpromislow Aug 30, 2024
fb44ba2
Restore the gvkKey helper, add event fields.
ericpromislow Sep 5, 2024
be7dc58
More fixes to the fields to index:
ericpromislow Sep 11, 2024
e6fb4a2
Start on the virtual-field work.
ericpromislow Sep 11, 2024
af2634c
Map `Event.type` to `Event._type` for indexing.
ericpromislow Sep 11, 2024
af40269
Add a unit test for field replacement for Event.type
ericpromislow Sep 11, 2024
80c1a11
Add label processing.
ericpromislow Oct 10, 2024
fba846f
WIP - moving module imports around
ericpromislow Oct 21, 2024
aea2382
Don't test for transformation of event objects in the common module.
ericpromislow Oct 21, 2024
7840d5b
Parse metadata.label queries differently.
ericpromislow Nov 5, 2024
0c507e7
Improve a variable name that turned out to not be temporary.
ericpromislow Nov 6, 2024
ca0a8a3
No need to specifically cache certain labels, as all are now cached.
ericpromislow Nov 7, 2024
d87534b
Add a test to verify simple label (m.labels.foo=blah) queries work.
ericpromislow Nov 8, 2024
163004b
'addLabelFields' never returns an error.
ericpromislow Nov 8, 2024
41f61a3
Delete superseded function.
ericpromislow Nov 8, 2024
ed724a5
Was calling 'addLabelFields' one time too many.
ericpromislow Nov 8, 2024
1724d49
Start using k8s ParseToRequirements
ericpromislow Nov 13, 2024
33975d3
Pull in the k8s parser.
ericpromislow Nov 13, 2024
473e7cf
Successfully test for quotation marks.
ericpromislow Nov 13, 2024
cc994ee
Add quoted strings to the lexer.
ericpromislow Nov 13, 2024
575a6ab
Move to a forked k8s label lexer to include non-label tests.
ericpromislow Nov 13, 2024
3bccd0e
Improve and test the way quoted strings in the query are detected.
ericpromislow Nov 14, 2024
afbb1f9
Reinstate the original Apache license in the derived code.
ericpromislow Nov 14, 2024
a556cc2
Ignore case for operators.
ericpromislow Nov 15, 2024
a7f9758
Test IN multiple-target-values
ericpromislow Nov 15, 2024
0bd0121
Test the not-in operator.
ericpromislow Nov 15, 2024
e357f22
Ignore case for operators.
ericpromislow Nov 15, 2024
ad42186
Added tests for parsing EXISTS and NOT-EXISTS queries.
ericpromislow Nov 16, 2024
c141a83
Parse less-than and greater-than ops
ericpromislow Dec 3, 2024
740af5a
Lasso's `CacheFor` now takes a `watchable` argument.
ericpromislow Dec 3, 2024
1bdc060
Support 'gt' and 'lt' as synonyms for '<' and '>'.
ericpromislow Dec 3, 2024
02a4e07
typo fix
ericpromislow Dec 4, 2024
f083ea6
Have the filter parser allow exist tests only on labels.
ericpromislow Dec 4, 2024
925e0ff
Specify hard-wired fields to index alphabetically.
ericpromislow Dec 7, 2024
24ba823
Remove unused variable.
ericpromislow Dec 9, 2024
956652e
Parser: 'metadata.labels[FIELD]' is valid
ericpromislow Dec 9, 2024
333a42a
Pull in new gvk fields from main (and keep in alpha order).
ericpromislow Jan 6, 2025
2bd58a4
Fixed a couple of drops done during the last rebase.
ericpromislow Jan 6, 2025
227b7f0
Add a reminder to keep the entries in alpha order.
ericpromislow Jan 6, 2025
83212b0
Update lasso ref
ericpromislow Jan 6, 2025
023a123
Test TransformLabels
ericpromislow Jan 6, 2025
235e6ee
Remove TransformLabels
ericpromislow Jan 6, 2025
ca40b34
Remove unused/unneeded code.
ericpromislow Jan 6, 2025
62f0a6f
Describe diffs between our label-selector parser and upstream's.
ericpromislow Jan 7, 2025
cdf6b35
Use the merged lasso 46333 work.
ericpromislow Jan 16, 2025
11a47b1
Clean go.sum of old lasso entries.
ericpromislow Jan 16, 2025
cc7e553
Drop unused field.
ericpromislow Jan 16, 2025
13f73f2
Tighten up the code.
ericpromislow Jan 16, 2025
21c93d2
Specify which commit the label selector parser is based on.
ericpromislow Jan 16, 2025
7dd7067
Allow both single-quoted and double-quoted value matching, doc differ…
ericpromislow Jan 16, 2025
e4bdba9
Checkpoint
tomleb Sep 5, 2024
cd77406
Send schema ID for watch event
tomleb Sep 9, 2024
2cf1f45
Compare revision
tomleb Sep 16, 2024
d33adf3
Checkpoint
tomleb Jan 16, 2025
cd14249
Set resourceVersion in List calls and don't always notify immediately
tomleb Jan 17, 2025
bb52573
Fix empty list panic
tomleb Jan 17, 2025
9b8c1c4
Checkpoint rebase on top of Steve
tomleb Jan 20, 2025
ec21271
Rebase on top of Eric's work
tomleb Jan 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ is empty.
#### `filter`

Filter results by a designated field. Filter keys use dot notation to denote
the subfield of an object to filter on. The filter value is matched as a
the subfield of an object to filter on. The filter value is normally matched as a
substring.

Example, filtering by object name:
Expand All @@ -117,6 +117,23 @@ Example, filtering by object name:
/v1/{type}?filter=metadata.name=foo
```

if a target value is surrounded by single-quotes, it succeeds only on an exact match:

Example, filtering by object name:

```
/v1/{type}?filter=metadata.name='match-this-exactly'
```
```

A target value can be delimited by double-quotes, but this will succeed on a partial match:

Example, filtering by object name:

```
/v1/{type}?filter=metadata.name="can-be-a-substri"
```

One filter can list multiple possible fields to match, these are ORed together:

```
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/adrg/xdg v0.5.0
github.com/golang/protobuf v1.5.4
github.com/google/gnostic-models v0.6.8
github.com/google/go-cmp v0.6.0
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.1
github.com/pborman/uuid v1.2.1
Expand All @@ -22,7 +23,7 @@ require (
github.com/rancher/apiserver v0.0.0-20241009200134-5a4ecca7b988
github.com/rancher/dynamiclistener v0.6.1-rc.2
github.com/rancher/kubernetes-provider-detector v0.1.5
github.com/rancher/lasso v0.0.0-20241202185148-04649f379358
github.com/rancher/lasso v0.0.0-20250116001102-9e2b68739ccc
github.com/rancher/norman v0.0.0-20241001183610-78a520c160ab
github.com/rancher/remotedialer v0.3.2
github.com/rancher/wrangler/v3 v3.0.1-rc.2
Expand Down Expand Up @@ -78,7 +79,6 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/cel-go v0.20.1 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ github.com/rancher/dynamiclistener v0.6.1-rc.2 h1:PTKNKcYXZjc/lo40EivRcXuEbCXwjp
github.com/rancher/dynamiclistener v0.6.1-rc.2/go.mod h1:0KhUMHy3VcGMGavTY3i1/Mr8rVM02wFqNlUzjc+Cplg=
github.com/rancher/kubernetes-provider-detector v0.1.5 h1:hWRAsWuJOemzGjz/XrbTlM7QmfO4OedvFE3QwXiH60I=
github.com/rancher/kubernetes-provider-detector v0.1.5/go.mod h1:ypuJS7kP7rUiAn330xG46mj+Nhvym05GM8NqMVekpH0=
github.com/rancher/lasso v0.0.0-20241202185148-04649f379358 h1:pJwgJXPt4fi0ysXsJcl28rvxhx/Z/9SNCDwFOEyeGu0=
github.com/rancher/lasso v0.0.0-20241202185148-04649f379358/go.mod h1:IxgTBO55lziYhTEETyVKiT8/B5Rg92qYiRmcIIYoPgI=
github.com/rancher/lasso v0.0.0-20250116001102-9e2b68739ccc h1:5Hb++qaO2TJJCr4z2Ks0GrR5wXVxmr9sdiAFr7iItSg=
github.com/rancher/lasso v0.0.0-20250116001102-9e2b68739ccc/go.mod h1:IxgTBO55lziYhTEETyVKiT8/B5Rg92qYiRmcIIYoPgI=
github.com/rancher/norman v0.0.0-20241001183610-78a520c160ab h1:ihK6See3y/JilqZlc0CG7NXPN+ue5nY9U7xUZUA8M7I=
github.com/rancher/norman v0.0.0-20241001183610-78a520c160ab/go.mod h1:qX/OG/4wY27xSAcSdRilUBxBumV6Ey2CWpAeaKnBQDs=
github.com/rancher/remotedialer v0.3.2 h1:kstZbRwPS5gPWpGg8VjEHT2poHtArs+Fc317YM8JCzU=
Expand Down
4 changes: 2 additions & 2 deletions pkg/resources/virtual/virtual_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package virtual_test

import (
"github.com/rancher/steve/pkg/resources/virtual"
"k8s.io/apimachinery/pkg/runtime/schema"
"strings"
"testing"

"github.com/rancher/steve/pkg/resources/virtual"
"github.com/rancher/steve/pkg/resources/virtual/common"
"github.com/rancher/steve/pkg/summarycache"
"github.com/rancher/wrangler/v3/pkg/summary"
"github.com/stretchr/testify/require"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
)

func TestTransformChain(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/cli/clicontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type Config struct {
HTTPListenPort int
UIPath string

SQLCache bool

WebhookConfig authcli.WebhookConfig
}

Expand Down Expand Up @@ -57,6 +59,10 @@ func (c *Config) ToServer(ctx context.Context, sqlCache bool) (*server.Server, e

func Flags(config *Config) []cli.Flag {
flags := []cli.Flag{
cli.BoolFlag{
Name: "sqlcache",
Destination: &config.SQLCache,
},
cli.StringFlag{
Name: "kubeconfig",
EnvVar: "KUBECONFIG",
Expand Down
3 changes: 2 additions & 1 deletion pkg/sqlcache/informer/factory/informer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type DBClient interface {

type Cache struct {
informer.ByOptionsLister
informer.Watcher
}

type connector interface {
Expand Down Expand Up @@ -152,7 +153,7 @@ func (f *CacheFactory) CacheFor(fields [][]string, transform cache.TransformFunc
}

// At this point the informer is ready, return it
return Cache{ByOptionsLister: gi.informer}, nil
return Cache{ByOptionsLister: gi.informer, Watcher: gi.informer}, nil
}

// Reset closes the stopCh which stops any running informers, assigns a new stopCh, resets the GVK-informer cache, and resets
Expand Down
2 changes: 1 addition & 1 deletion pkg/sqlcache/informer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type Store interface {
GetByKey(key string) (item any, exists bool, err error)
GetName() string
RegisterAfterUpsert(f func(key string, obj any, tx db.TXClient) error)
RegisterAfterDelete(f func(key string, tx db.TXClient) error)
RegisterAfterDelete(f func(key string, obj any, tx db.TXClient) error)
GetShouldEncrypt() bool
GetType() reflect.Type
}
Expand Down
34 changes: 32 additions & 2 deletions pkg/sqlcache/informer/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,19 @@
type Informer struct {
cache.SharedIndexInformer
ByOptionsLister

listeners *listeners
}

type ByOptionsLister interface {
ListByOptions(ctx context.Context, lo ListOptions, partitions []partition.Partition, namespace string) (*unstructured.UnstructuredList, int, string, error)
}

// this is set to a var so that it can be overridden by test code for mocking purposes
type Watcher interface {
Watch(ctx context.Context, listener Listener) int
}

// this is set to a var so that it can be overriden by test code for mocking purposes

Check failure on line 38 in pkg/sqlcache/informer/informer.go

View workflow job for this annotation

GitHub Actions / ci

`overriden` is a misspelling of `overridden` (misspell)
var newInformer = cache.NewSharedIndexInformer

// NewInformer returns a new SQLite-backed Informer for the type specified by schema in unstructured.Unstructured form
Expand Down Expand Up @@ -70,12 +76,26 @@
return nil, err
}

listeners := newlisteners()
sii.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
listeners.Notify(loi.resourceVersionCache.getLatest())
},
UpdateFunc: func(obj any, newObj any) {
listeners.Notify(loi.resourceVersionCache.getLatest())
},
DeleteFunc: func(obj any) {
listeners.Notify(loi.resourceVersionCache.getLatest())
},
})

// HACK: replace the default informer's indexer with the SQL based one
UnsafeSet(sii, "indexer", loi)

return &Informer{
SharedIndexInformer: sii,
ByOptionsLister: loi,
listeners: listeners,
}, nil
}

Expand All @@ -86,7 +106,17 @@
// - a continue token, if there are more pages after the returned one
// - an error instead of all of the above if anything went wrong
func (i *Informer) ListByOptions(ctx context.Context, lo ListOptions, partitions []partition.Partition, namespace string) (*unstructured.UnstructuredList, int, string, error) {
return i.ByOptionsLister.ListByOptions(ctx, lo, partitions, namespace)
list, total, continueToken, err := i.ByOptionsLister.ListByOptions(ctx, lo, partitions, namespace)
return list, total, continueToken, err
}

func (i *Informer) Watch(ctx context.Context, listener Listener) int {
revision := i.listeners.AddListener(listener)
go func() {
<-ctx.Done()
i.listeners.RemoveListener(listener)
}()
return revision
}

func informerNameFromGVK(gvk schema.GroupVersionKind) string {
Expand Down
83 changes: 78 additions & 5 deletions pkg/sqlcache/informer/listoption_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"sort"
"strconv"
"strings"
"sync"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/tools/cache"

Expand All @@ -35,6 +37,8 @@ type ListOptionIndexer struct {
deleteFieldStmt *sql.Stmt
upsertLabelsStmt *sql.Stmt
deleteLabelsStmt *sql.Stmt

resourceVersionCache *resourceVersionCache
}

var (
Expand Down Expand Up @@ -94,14 +98,17 @@ func NewListOptionIndexer(fields [][]string, s Store, namespaced bool) (*ListOpt
}

l := &ListOptionIndexer{
Indexer: i,
namespaced: namespaced,
indexedFields: indexedFields,
Indexer: i,
namespaced: namespaced,
indexedFields: indexedFields,
resourceVersionCache: newResourceVersionCache(1000),
}
l.RegisterAfterUpsert(l.addIndexFields)
l.RegisterAfterUpsert(l.addLabels)
l.RegisterAfterUpsert(l.updateResourceVersionCache)
l.RegisterAfterDelete(l.deleteIndexFields)
l.RegisterAfterDelete(l.deleteLabels)
l.RegisterAfterDelete(l.updateResourceVersionCache)
columnDefs := make([]string, len(indexedFields))
for index, field := range indexedFields {
column := fmt.Sprintf(`"%s" TEXT`, field)
Expand Down Expand Up @@ -216,6 +223,15 @@ func (l *ListOptionIndexer) addIndexFields(key string, obj any, tx db.TXClient)
return nil
}

func (l *ListOptionIndexer) updateResourceVersionCache(key string, obj any, tx db.TXClient) error {
objMeta, err := meta.Accessor(obj)
if err != nil {
return err
}
l.resourceVersionCache.add(objMeta.GetResourceVersion())
return nil
}

// labels are stored in tables that shadow the underlying object table for each GVK
func (l *ListOptionIndexer) addLabels(key string, obj any, tx db.TXClient) error {
k8sObj, ok := obj.(*unstructured.Unstructured)
Expand All @@ -232,7 +248,7 @@ func (l *ListOptionIndexer) addLabels(key string, obj any, tx db.TXClient) error
return nil
}

func (l *ListOptionIndexer) deleteIndexFields(key string, tx db.TXClient) error {
func (l *ListOptionIndexer) deleteIndexFields(key string, _ any, tx db.TXClient) error {
args := []any{key}

err := tx.StmtExec(tx.Stmt(l.deleteFieldStmt), args...)
Expand All @@ -242,21 +258,31 @@ func (l *ListOptionIndexer) deleteIndexFields(key string, tx db.TXClient) error
return nil
}

func (l *ListOptionIndexer) deleteLabels(key string, tx db.TXClient) error {
func (l *ListOptionIndexer) deleteLabels(key string, _ any, tx db.TXClient) error {
err := tx.StmtExec(tx.Stmt(l.deleteLabelsStmt), key)
if err != nil {
return &db.QueryError{QueryString: l.deleteLabelsQuery, Err: err}
}
return nil
}

func (l *ListOptionIndexer) GetLastResourceVersion() string {
return l.resourceVersionCache.getLatest()
}

// ListByOptions returns objects according to the specified list options and partitions.
// Specifically:
// - an unstructured list of resources belonging to any of the specified partitions
// - the total number of resources (returned list might be a subset depending on pagination options in lo)
// - a continue token, if there are more pages after the returned one
// - an error instead of all of the above if anything went wrong
func (l *ListOptionIndexer) ListByOptions(ctx context.Context, lo ListOptions, partitions []partition.Partition, namespace string) (*unstructured.UnstructuredList, int, string, error) {
isMaybeStale := lo.Revision != "" && !l.resourceVersionCache.contains(lo.Revision)
if isMaybeStale {
// TODO: The meat of the logic would be here
return nil, 0, "", fmt.Errorf("might be stale, try again maybe")
}

queryInfo, err := l.constructQuery(lo, partitions, namespace, db.Sanitize(l.GetName()))
if err != nil {
return nil, 0, "", err
Expand Down Expand Up @@ -851,3 +877,50 @@ func toUnstructuredList(items []any) *unstructured.UnstructuredList {
}
return result
}

type resourceVersionCache struct {
lock sync.RWMutex
resourceVersions []string
items map[string]struct{}
nextIndex int
}

func newResourceVersionCache(size int) *resourceVersionCache {
return &resourceVersionCache{
resourceVersions: make([]string, size),
nextIndex: 0,
items: make(map[string]struct{}),
}
}

func (c *resourceVersionCache) add(resourceVersion string) {
c.lock.Lock()
defer c.lock.Unlock()

oldResourceVersion := c.resourceVersions[c.nextIndex]
delete(c.items, oldResourceVersion)

c.resourceVersions[c.nextIndex] = resourceVersion
c.items[resourceVersion] = struct{}{}
c.nextIndex = (c.nextIndex + 1) % len(c.resourceVersions)
}

func (c *resourceVersionCache) contains(target string) bool {
c.lock.RLock()
defer c.lock.RUnlock()

_, found := c.items[target]
return found
}

func (c *resourceVersionCache) getLatest() string {
c.lock.RLock()
defer c.lock.RUnlock()

index := c.nextIndex - 1
if index < 0 {
index = len(c.resourceVersions) - 1
}

return c.resourceVersions[index]
}
1 change: 1 addition & 0 deletions pkg/sqlcache/informer/listoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ListOptions struct {
Filters []OrFilter
Sort Sort
Pagination Pagination
Revision string
}

// Filter represents a field to filter by.
Expand Down
Loading
Loading