Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOT FOR REVIEW: PoC without MS Cache #7035

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions common/cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
pin bool
timeSource clock.TimeSource
metricsHandler metrics.Handler
enabled bool
}

iteratorImpl struct {
Expand All @@ -78,6 +79,10 @@
refCount int
size int
}
//
//hasUpdate interface {

Check failure on line 83 in common/cache/lru.go

View workflow job for this annotation

GitHub Actions / golangci

comment-spacings: no space between comment delimiter and comment text (revive)
// UpdateRegistry(ctx context.Context) update.Registry
//}

Check failure on line 85 in common/cache/lru.go

View workflow job for this annotation

GitHub Actions / golangci

comment-spacings: no space between comment delimiter and comment text (revive)
)

// Close closes the iterator
Expand Down Expand Up @@ -154,12 +159,12 @@

// New creates a new cache with the given options
func New(maxSize int, opts *Options) Cache {
return NewWithMetrics(maxSize, opts, metrics.NoopMetricsHandler)
return NewWithMetrics(maxSize, opts, metrics.NoopMetricsHandler, false)
}

// NewWithMetrics creates a new cache that will emit capacity and ttl metrics.
// handler should be tagged with metrics.CacheTypeTag.
func NewWithMetrics(maxSize int, opts *Options, handler metrics.Handler) Cache {
func NewWithMetrics(maxSize int, opts *Options, handler metrics.Handler, enabled bool) Cache {
if opts == nil {
opts = &Options{}
}
Expand All @@ -181,6 +186,7 @@
onEvict: opts.OnEvict,
timeSource: timeSource,
metricsHandler: handler,
enabled: enabled,
}
}

Expand Down Expand Up @@ -273,6 +279,9 @@
if entry.refCount == 0 {
c.pinnedSize -= entry.Size()
metrics.CachePinnedUsage.With(c.metricsHandler).Record(float64(c.pinnedSize))
if !c.enabled {
c.tryEvictAndGetPreviousElement(entry, c.byKey[key])
}
}
// Entry size might have changed. Recalculate size and evict entries if necessary.
newEntrySize := getSize(entry.value)
Expand Down
4 changes: 3 additions & 1 deletion common/cache/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestLRU(t *testing.T) {
metricsHandler := metricstest.NewCaptureHandler()
capture := metricsHandler.StartCapture()

cache := NewWithMetrics(4, nil, metricsHandler)
cache := NewWithMetrics(4, nil, metricsHandler, true)

cache.Put("A", "Foo")
assert.Equal(t, "Foo", cache.Get("A"))
Expand Down Expand Up @@ -148,6 +148,7 @@ func TestLRUWithTTL(t *testing.T) {
TimeSource: timeSource,
},
metricsHandler,
false,
)
cache.Put("A", "foo")
assert.Equal(t, "foo", cache.Get("A"))
Expand Down Expand Up @@ -249,6 +250,7 @@ func TestTTLWithPin(t *testing.T) {
TimeSource: timeSource,
},
metricsHandler,
false,
)

capture := metricsHandler.StartCapture()
Expand Down
5 changes: 5 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2626,4 +2626,9 @@ WorkerActivitiesPerSecond, MaxConcurrentActivityTaskPollers.
false,
`ActivityAPIsEnabled is a "feature enable" flag. `,
)
WorkflowCacheEnabled = NewGlobalBoolSetting(
"history.workflowCacheEnabled",
true,
``,
)
)
2 changes: 1 addition & 1 deletion common/nexus/endpoint_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func NewEndpointRegistry(
logger: logger,
readThroughCacheByID: cache.NewWithMetrics(config.readThroughCacheSize(), &cache.Options{
TTL: config.readThroughCacheTTL(),
}, metricsHandler.WithTags(metrics.CacheTypeTag(metrics.NexusEndpointRegistryReadThroughCacheTypeTagValue))),
}, metricsHandler.WithTags(metrics.CacheTypeTag(metrics.NexusEndpointRegistryReadThroughCacheTypeTagValue)), true),
}
}

Expand Down
6 changes: 4 additions & 2 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,8 @@ type Config struct {

BreakdownMetricsByTaskQueue dynamicconfig.BoolPropertyFnWithTaskQueueFilter

LogAllReqErrors dynamicconfig.BoolPropertyFnWithNamespaceFilter
LogAllReqErrors dynamicconfig.BoolPropertyFnWithNamespaceFilter
WorkflowCacheEnabled dynamicconfig.BoolPropertyFn
}

// NewConfig returns new service config with default values
Expand Down Expand Up @@ -686,7 +687,8 @@ func NewConfig(

BreakdownMetricsByTaskQueue: dynamicconfig.MetricsBreakdownByTaskQueue.Get(dc),

LogAllReqErrors: dynamicconfig.LogAllReqErrors.Get(dc),
LogAllReqErrors: dynamicconfig.LogAllReqErrors.Get(dc),
WorkflowCacheEnabled: dynamicconfig.WorkflowCacheEnabled.Get(dc),
}

return cfg
Expand Down
2 changes: 1 addition & 1 deletion service/history/events/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func newEventsCache(

taggedMetricHandler := metricsHandler.WithTags(metrics.CacheTypeTag(metrics.EventsCacheTypeTagValue))
return &CacheImpl{
Cache: cache.NewWithMetrics(maxSize, opts, taggedMetricHandler),
Cache: cache.NewWithMetrics(maxSize, opts, taggedMetricHandler, true),
executionManager: executionManager,
metricsHandler: taggedMetricHandler,
logger: logger,
Expand Down
2 changes: 1 addition & 1 deletion service/history/replication/progress_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func NewProgressCache(
TTL: config.ReplicationProgressCacheTTL(),
}
return &progressCacheImpl{
cache: cache.NewWithMetrics(maxSize, opts, handler.WithTags(metrics.CacheTypeTag(metrics.MutableStateCacheTypeTagValue))),
cache: cache.NewWithMetrics(maxSize, opts, handler.WithTags(metrics.CacheTypeTag(metrics.MutableStateCacheTypeTagValue)), true),
}
}

Expand Down
5 changes: 4 additions & 1 deletion service/history/workflow/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func NewHostLevelCache(
config.HistoryCacheNonUserContextLockTimeout(),
logger,
handler,
config.WorkflowCacheEnabled(),
)
}

Expand All @@ -143,6 +144,7 @@ func NewShardLevelCache(
config.HistoryCacheNonUserContextLockTimeout(),
logger,
handler,
config.WorkflowCacheEnabled(),
)
}

Expand All @@ -152,6 +154,7 @@ func newCache(
nonUserContextLockTimeout time.Duration,
logger log.Logger,
handler metrics.Handler,
cacheEnabled bool,
) Cache {
opts := &cache.Options{
TTL: ttl,
Expand Down Expand Up @@ -193,7 +196,7 @@ func newCache(
},
}

withMetrics := cache.NewWithMetrics(size, opts, handler.WithTags(metrics.CacheTypeTag(metrics.MutableStateCacheTypeTagValue)))
withMetrics := cache.NewWithMetrics(size, opts, handler.WithTags(metrics.CacheTypeTag(metrics.MutableStateCacheTypeTagValue)), cacheEnabled)

return &cacheImpl{
Cache: withMetrics,
Expand Down
Loading