Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support password only redis connection #2617

Merged
merged 2 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 18 additions & 25 deletions common-controller/internal/cache/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import (

// RatelimitDataStore is a cache for rate limit policies.
type RatelimitDataStore struct {
resolveRatelimitStore map[types.NamespacedName][]dpv1alpha1.ResolveRateLimitAPIPolicy
resolveSubscriptionRatelimitStore map[types.NamespacedName]dpv1alpha3.ResolveSubscriptionRatelimitPolicy
customRatelimitStore map[types.NamespacedName]*dpv1alpha1.CustomRateLimitPolicyDef
mu sync.Mutex
aiRatelimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec
subscriptionEnabledAIRatelimitPolicies map[types.NamespacedName]struct{}
resolveRatelimitStore map[types.NamespacedName][]dpv1alpha1.ResolveRateLimitAPIPolicy
resolveSubscriptionRatelimitStore map[types.NamespacedName]dpv1alpha3.ResolveSubscriptionRatelimitPolicy
customRatelimitStore map[types.NamespacedName]*dpv1alpha1.CustomRateLimitPolicyDef
mu sync.Mutex
aiRatelimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec
subscriptionBasedAIRatelimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec
}

// CreateNewOperatorDataStore creates a new RatelimitDataStore.
Expand Down Expand Up @@ -103,24 +103,16 @@ func (ods *RatelimitDataStore) AddorUpdateAIRatelimitToStore(rateLimit types.Nam
ods.aiRatelimitPolicySpecs[rateLimit] = &aiRatelimitSpec
}

// MarkAIRatelimitAsSubscriptionEnabled add an entry to specify an AI RatelimitPolicy is associated with a subscription
func (ods *RatelimitDataStore) MarkAIRatelimitAsSubscriptionEnabled(nn types.NamespacedName) {
ods.mu.Lock()
defer ods.mu.Unlock()
if ods.subscriptionEnabledAIRatelimitPolicies == nil {
ods.subscriptionEnabledAIRatelimitPolicies = make(map[types.NamespacedName]struct{})
}
ods.subscriptionEnabledAIRatelimitPolicies[nn] = struct{}{}
}

// MarkAIRatelimitAsSubscriptionDisabled deletes the entry which was added to specify an AI RatelimitPolicy is associated with a subscription
func (ods *RatelimitDataStore) MarkAIRatelimitAsSubscriptionDisabled(nn types.NamespacedName) {
// AddorUpdateSubscriptionBasedAIRatelimitToStore adds a new ratelimit to the RatelimitDataStore.
func (ods *RatelimitDataStore) AddorUpdateSubscriptionBasedAIRatelimitToStore(rateLimit types.NamespacedName,
aiRatelimitSpec dpv1alpha3.AIRateLimitPolicySpec) {
ods.mu.Lock()
defer ods.mu.Unlock()
if ods.subscriptionEnabledAIRatelimitPolicies == nil {
return
logger.Infof("Adding/Updating AI ratelimit spec to cache")
if ods.subscriptionBasedAIRatelimitPolicySpecs == nil {
ods.subscriptionBasedAIRatelimitPolicySpecs = make(map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec)
}
delete(ods.subscriptionEnabledAIRatelimitPolicies, nn)
ods.subscriptionBasedAIRatelimitPolicySpecs[rateLimit] = &aiRatelimitSpec
}

// GetResolveRatelimitPolicy get cached ratelimit
Expand Down Expand Up @@ -148,11 +140,12 @@ func (ods *RatelimitDataStore) GetAIRatelimitPolicySpecs() map[types.NamespacedN
return ods.aiRatelimitPolicySpecs
}

// GetSubscriptionEnabledAIRatelimitPolicies gets all the AIRatelimitPolicy stored in ods
func (ods *RatelimitDataStore) GetSubscriptionEnabledAIRatelimitPolicies() map[types.NamespacedName]struct{} {
return ods.subscriptionEnabledAIRatelimitPolicies
// GetSubscriptionBasedAIRatelimitPolicySpecs gets all the AIRatelimitPolicy stored in ods
func (ods *RatelimitDataStore) GetSubscriptionBasedAIRatelimitPolicySpecs() map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec {
return ods.subscriptionBasedAIRatelimitPolicySpecs
}


// DeleteResolveRatelimitPolicy delete from ratelimit cache
func (ods *RatelimitDataStore) DeleteResolveRatelimitPolicy(rateLimit types.NamespacedName) {
ods.mu.Lock()
Expand Down Expand Up @@ -182,7 +175,7 @@ func (ods *RatelimitDataStore) DeleteSubscriptionBasedAIRatelimitPolicySpec(subs
ods.mu.Lock()
defer ods.mu.Unlock()
logger.Debug("Deleting AI ratelimit from cache")
delete(ods.aiRatelimitPolicySpecs, subscription)
delete(ods.subscriptionBasedAIRatelimitPolicySpecs, subscription)
}

// NamespacedName generates namespaced name for Kubernetes objects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

xds "github.com/wso2/apk/common-controller/internal/xds"
cpv1alpha3 "github.com/wso2/apk/common-go-libs/apis/cp/v1alpha3"
dpv1alpha3 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha3"
)
Expand All @@ -61,11 +60,10 @@ const (
)

// NewSubscriptionController creates a new Subscription controller instance.
func NewSubscriptionController(mgr manager.Manager, subscriptionStore *cache.SubscriptionDataStore, ratelimitStore *cache.RatelimitDataStore) error {
func NewSubscriptionController(mgr manager.Manager, subscriptionStore *cache.SubscriptionDataStore) error {
r := &SubscriptionReconciler{
client: mgr.GetClient(),
ods: subscriptionStore,
rlODS: ratelimitStore,
}
ctx := context.Background()
conf := config.ReadConfigs()
Expand Down Expand Up @@ -136,20 +134,6 @@ func (subscriptionReconciler *SubscriptionReconciler) Reconcile(ctx context.Cont
}
}
} else {
if subscription.Spec.RatelimitRef.Name != "" {
nn := types.NamespacedName{
Namespace: subscription.Namespace,
Name: subscription.Spec.RatelimitRef.Name,
}
var airl dpv1alpha3.AIRateLimitPolicy
if err := subscriptionReconciler.client.Get(ctx, nn, &airl); err == nil {
subscriptionReconciler.rlODS.AddorUpdateAIRatelimitToStore(nn, airl.Spec)
subscriptionReconciler.rlODS.MarkAIRatelimitAsSubscriptionEnabled(nn)
xds.UpdateRateLimitXDSCacheForAubscriptionBasedAIRatelimitPolicies(subscriptionReconciler.rlODS.GetSubscriptionEnabledAIRatelimitPolicies(), subscriptionReconciler.rlODS.GetAIRatelimitPolicySpecs())
conf := config.ReadConfigs()
xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label)
}
}
sendSubUpdates(subscription)
utils.SendAddSubscriptionEvent(subscription)
subscriptionReconciler.ods.AddorUpdateSubscriptionToStore(subscriptionKey, subscription.Spec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package dp

import (
"context"
"strings"

"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -101,7 +102,9 @@ func (r *AIRateLimitPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Re
loggers.LoggerAPKOperator.Errorf("Error retrieving AIRatelimit")
// It could be deletion event. So lets try to delete the related entried from the ods and update xds
r.ods.DeleteAIRatelimitPolicySpec(ratelimitKey)
r.ods.DeleteSubscriptionBasedAIRatelimitPolicySpec(ratelimitKey)
xds.UpdateRateLimitXDSCacheForAIRatelimitPolicies(r.ods.GetAIRatelimitPolicySpecs())
xds.UpdateRateLimitXDSCacheForSubscriptionBasedAIRatelimitPolicies(r.ods.GetSubscriptionBasedAIRatelimitPolicySpecs())
xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label)
} else {
loggers.LoggerAPKOperator.Infof("ratelimits found")
Expand All @@ -112,9 +115,15 @@ func (r *AIRateLimitPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Re
r.ods.AddorUpdateAIRatelimitToStore(ratelimitKey, ratelimitPolicy.Spec)
xds.UpdateRateLimitXDSCacheForAIRatelimitPolicies(r.ods.GetAIRatelimitPolicySpecs())
xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label)
} else if strings.EqualFold(string(ratelimitPolicy.Spec.TargetRef.Kind), "Subscription") {
r.ods.AddorUpdateSubscriptionBasedAIRatelimitToStore(ratelimitKey, ratelimitPolicy.Spec)
xds.UpdateRateLimitXDSCacheForSubscriptionBasedAIRatelimitPolicies(r.ods.GetSubscriptionBasedAIRatelimitPolicySpecs())
xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label)
} else {
r.ods.DeleteAIRatelimitPolicySpec(ratelimitKey)
r.ods.DeleteSubscriptionBasedAIRatelimitPolicySpec(ratelimitKey)
xds.UpdateRateLimitXDSCacheForAIRatelimitPolicies(r.ods.GetAIRatelimitPolicySpecs())
xds.UpdateRateLimitXDSCacheForSubscriptionBasedAIRatelimitPolicies(r.ods.GetSubscriptionBasedAIRatelimitPolicySpecs())
xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label)
}
}
Expand Down
2 changes: 1 addition & 1 deletion common-controller/internal/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func InitOperator(metricsConfig config.Metrics) {
loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error3115, logging.MAJOR,
"Error creating Application controller, error: %v", err))
}
if err := cpcontrollers.NewSubscriptionController(mgr, subscriptionStore, ratelimitStore); err != nil {
if err := cpcontrollers.NewSubscriptionController(mgr, subscriptionStore); err != nil {
loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error3116, logging.MAJOR,
"Error creating Subscription controller, error: %v", err))
}
Expand Down
38 changes: 28 additions & 10 deletions common-controller/internal/web/notify_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,30 @@ func initRedisClient() error {
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

rdb = redis.NewClient(&redis.Options{
Addr: redisAddr,
Username: redisUsername,
Password: redisPassword,
options := &redis.Options{
Addr: redisAddr,
Password: redisPassword,
TLSConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: true,
},
})
}
if redisUsername != "" {
options.Username = redisUsername
}
rdb = redis.NewClient(options)
} else {
rdb = redis.NewClient(&redis.Options{
Addr: redisAddr,
Username: redisUsername,
options := &redis.Options{
Addr: redisAddr,
Password: redisPassword,
})
}
// Only set Username if it's not empty
if redisUsername != "" {
options.Username = redisUsername
}
rdb = redis.NewClient(options)
}
return nil;
}
Expand Down Expand Up @@ -175,7 +183,17 @@ func storeTokenInRedis(token string, expiry int64) error {
key := generateKey(token)
err := rdb.Do(context.Background(), "set", key, expiry, "EXAT", expiry).Err()
if err != nil {
return err
loggers.LoggerAPI.Warnf("Error occured while trying to set key with expiry. Error: %+v. \n Trying to use SET and EXPIREAT command...", err)
err = rdb.Do(context.Background(), "set", key, expiry).Err()
if err != nil {
loggers.LoggerAPI.Errorf("Error occured while setting the key. Error %+v", err)
return err
}
err = rdb.Do(context.Background(), "expireat", key, expiry).Err()
if err != nil {
loggers.LoggerAPI.Errorf("Error occured while setting the expiry. Error %+v", err)
return err
}
}
publishValue := fmt.Sprintf("%s%s%d", token, tokenExpiryDivider, expiry)
err = rdb.Do(context.Background(), "publish", redisRevokedTokenChannel, publishValue).Err()
Expand Down
109 changes: 54 additions & 55 deletions common-controller/internal/xds/ratelimiter_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,72 +333,71 @@ func (r *rateLimitPolicyCache) AddCustomRateLimitPolicies(customRateLimitPolicy
}

// ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache process the specs and update the cache
func (r *rateLimitPolicyCache) ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache(subscriptionEnabledAIRatelimitPolicies map[types.NamespacedName]struct{}, aiRatelimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec) {
func (r *rateLimitPolicyCache) ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache(aiRatelimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec) {
aiRlDescriptors := make([]*rls_config.RateLimitDescriptor, 0)
for namespacedNameRl := range subscriptionEnabledAIRatelimitPolicies {
if airl, exists := aiRatelimitPolicySpecs[namespacedNameRl]; exists {
if airl.Override.TokenCount != nil {
// Add descriptor for RequestTokenCount
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAIRequestTokenCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.TokenCount.Unit),
RequestsPerUnit: uint32(airl.Override.TokenCount.RequestTokenCount),
},
for namespacedNameRl, airl := range aiRatelimitPolicySpecs {
if airl.Override.TokenCount != nil {
// Add descriptor for RequestTokenCount
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAIRequestTokenCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.TokenCount.Unit),
RequestsPerUnit: uint32(airl.Override.TokenCount.RequestTokenCount),
},
},
})
// Add descriptor for ResponseTokenCount
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAIResponseTokenCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.TokenCount.Unit),
RequestsPerUnit: uint32(airl.Override.TokenCount.ResponseTokenCount),
},
},
})
// Add descriptor for ResponseTokenCount
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAIResponseTokenCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.TokenCount.Unit),
RequestsPerUnit: uint32(airl.Override.TokenCount.ResponseTokenCount),
},
},
})
// Add descriptor for TotalTokenCount
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAITotalTokenCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.TokenCount.Unit),
RequestsPerUnit: uint32(airl.Override.TokenCount.TotalTokenCount),
},
},
})
// Add descriptor for TotalTokenCount
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAITotalTokenCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.TokenCount.Unit),
RequestsPerUnit: uint32(airl.Override.TokenCount.TotalTokenCount),
},
},
})
}
// Add descriptor for RequestCount
if airl.Override.RequestCount != nil {
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAIRequestCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.RequestCount.Unit),
RequestsPerUnit: uint32(airl.Override.RequestCount.RequestsPerUnit),
},
},
})
}
// Add descriptor for RequestCount
if airl.Override.RequestCount != nil {
aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{
Key: DescriptorKeyForSubscriptionBasedAIRequestCount,
Value: prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl),
Descriptors: []*rls_config.RateLimitDescriptor{
{
Key: DescriptorKeyForSubscription,
RateLimit: &rls_config.RateLimitPolicy{
Unit: getRateLimitUnit(airl.Override.RequestCount.Unit),
RequestsPerUnit: uint32(airl.Override.RequestCount.RequestsPerUnit),
},
},
})
}
},
})
}
}

r.subscriptionBasedAIRatelimitDescriptors = aiRlDescriptors
}

Expand Down
6 changes: 3 additions & 3 deletions common-controller/internal/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ func UpdateRateLimitXDSCacheForAIRatelimitPolicies(aiRatelimitPolicySpecs map[ap
rlsPolicyCache.ProcessAIRatelimitPolicySpecsAndUpdateCache(aiRatelimitPolicySpecs)
}

// UpdateRateLimitXDSCacheForAubscriptionBasedAIRatelimitPolicies updates the xDS cache of the RateLimiter for AI ratelimit policies.
func UpdateRateLimitXDSCacheForAubscriptionBasedAIRatelimitPolicies(subscriptionEnabledAIRatelimitPolicies map[apimachiner_types.NamespacedName]struct{}, aiRatelimitPolicySpecs map[apimachiner_types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec) {
rlsPolicyCache.ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache(subscriptionEnabledAIRatelimitPolicies, aiRatelimitPolicySpecs)
// UpdateRateLimitXDSCacheForSubscriptionBasedAIRatelimitPolicies updates the xDS cache of the RateLimiter for AI ratelimit policies.
func UpdateRateLimitXDSCacheForSubscriptionBasedAIRatelimitPolicies(aiRatelimitPolicySpecs map[apimachiner_types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec) {
rlsPolicyCache.ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache(aiRatelimitPolicySpecs)
}

// DeleteAPILevelRateLimitPolicies delete the ratelimit xds cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class EnvVarConfig {
public static final String DEFAULT_XDS_MAX_RETRIES = Integer.toString(Constants.MAX_XDS_RETRIES);
public static final String DEFAULT_XDS_RETRY_PERIOD = Integer.toString(Constants.XDS_DEFAULT_RETRY);
public static final String DEFAULT_HOSTNAME = "Unassigned";
public static final String DEFAULT_REDIS_USERNAME = "default";
public static final String DEFAULT_REDIS_USERNAME = "";
public static final String DEFAULT_REDIS_PASSWORD = "";
public static final String DEFAULT_REDIS_HOST = "redis-master";
public static final int DEFAULT_REDIS_PORT = 6379;
Expand Down
Loading
Loading