Skip to content

Commit

Permalink
feat: reporter uses sources-server (#357)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrii-codefresh authored Dec 20, 2024
1 parent 21ccd78 commit 4dbe2c4
Show file tree
Hide file tree
Showing 12 changed files with 191 additions and 13 deletions.
2 changes: 1 addition & 1 deletion changelog/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
### Features
- feat(event-reporter): multisourced apps support improvements: reporting syncOperationRevisions, detecting correct resource sourceIdx, reporting correct git commit info
- feat(event-reporter): using sources-server for getting application version
10 changes: 10 additions & 0 deletions cmd/event-reporter-server/commands/event_reporter_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"math"
"time"

"github.com/argoproj/argo-cd/v2/pkg/sources_server_client"

"github.com/argoproj/argo-cd/v2/event_reporter/reporter"

"github.com/argoproj/argo-cd/v2/event_reporter"
Expand Down Expand Up @@ -104,6 +106,8 @@ func NewCommand() *cobra.Command {
rateLimiterBucketSize int
rateLimiterDuration time.Duration
rateLimiterLearningMode bool
useSourcesServer bool
sourcesServerBaseURL string
)
command := &cobra.Command{
Use: cliName,
Expand Down Expand Up @@ -192,6 +196,10 @@ func NewCommand() *cobra.Command {
Capacity: rateLimiterBucketSize,
LearningMode: rateLimiterLearningMode,
},
UseSourcesServer: useSourcesServer,
SourcesServerConfig: &sources_server_client.SourcesServerConfig{
BaseURL: sourcesServerBaseURL,
},
}

log.Infof("Starting event reporter server with grpc transport %v", useGrpc)
Expand Down Expand Up @@ -245,6 +253,8 @@ func NewCommand() *cobra.Command {
command.Flags().IntVar(&rateLimiterBucketSize, "rate-limiter-bucket-size", env.ParseNumFromEnv("RATE_LIMITER_BUCKET_SIZE", math.MaxInt, 0, math.MaxInt), "The maximum amount of requests allowed per window.")
command.Flags().DurationVar(&rateLimiterDuration, "rate-limiter-period", env.ParseDurationFromEnv("RATE_LIMITER_DURATION", 24*time.Hour, 0, math.MaxInt64), "The rate limit window size.")
command.Flags().BoolVar(&rateLimiterLearningMode, "rate-limiter-learning-mode", env.ParseBoolFromEnv("RATE_LIMITER_LEARNING_MODE_ENABLED", false), "The rate limit enabled in learning mode ( not blocking sending to queue but logging it )")
command.Flags().BoolVar(&useSourcesServer, "use-sources-server", env.ParseBoolFromEnv("SOURCES_SERVER_ENABLED", false), "Use sources-server instead of repo-server fork")
command.Flags().StringVar(&sourcesServerBaseURL, "sources-server-base-url", env.StringFromEnv("SOURCES_SERVER_BASE_URL", common.DefaultSourcesServerAddr), "Sources-server base URL")
cacheSrc = servercache.AddCacheFlagsToCmd(command, cacheutil.Options{
OnClientCreated: func(client *redis.Client) {
redisClient = client
Expand Down
3 changes: 2 additions & 1 deletion common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ const (
// DefaultDexServerAddr is the HTTP address of the Dex OIDC server, which we run a reverse proxy against
DefaultDexServerAddr = "argocd-dex-server:5556"
// DefaultRedisAddr is the default redis address
DefaultRedisAddr = "argocd-redis:6379"
DefaultRedisAddr = "argocd-redis:6379"
DefaultSourcesServerAddr = "sources-server:8090"
)

// Kubernetes ConfigMap and Secret resource names which hold Argo CD settings
Expand Down
6 changes: 4 additions & 2 deletions event_reporter/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"time"

"github.com/argoproj/argo-cd/v2/pkg/sources_server_client"

"github.com/argoproj/argo-cd/v2/util/db"

appclient "github.com/argoproj/argo-cd/v2/event_reporter/application"
Expand Down Expand Up @@ -45,15 +47,15 @@ type eventReporterController struct {
metricsServer *metrics.MetricsServer
}

func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, featureManager *reporter.FeatureManager, rateLimiterOpts *reporter.RateLimiterOpts, db db.ArgoDB) EventReporterController {
func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, featureManager *reporter.FeatureManager, rateLimiterOpts *reporter.RateLimiterOpts, db db.ArgoDB, useSourcesServer bool, sourcesServerConfig *sources_server_client.SourcesServerConfig) EventReporterController {
appBroadcaster := reporter.NewBroadcaster(featureManager, metricsServer, rateLimiterOpts)
_, err := appInformer.AddEventHandler(appBroadcaster)
if err != nil {
log.Error(err)
}
return &eventReporterController{
appBroadcaster: appBroadcaster,
applicationEventReporter: reporter.NewApplicationEventReporter(cache, applicationServiceClient, appLister, codefreshConfig, metricsServer, db),
applicationEventReporter: reporter.NewApplicationEventReporter(cache, applicationServiceClient, appLister, codefreshConfig, metricsServer, db, useSourcesServer, sourcesServerConfig),
cache: cache,
settingsMgr: settingsMgr,
applicationServiceClient: applicationServiceClient,
Expand Down
6 changes: 6 additions & 0 deletions event_reporter/reporter/app_revision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func TestGetRevisionsDetails(t *testing.T) {
&metrics.MetricsServer{},
fakeArgoDb(),
"0.0.1",
false,
nil,
}

result, _ := reporter.getRevisionsDetails(context.Background(), &app, []string{expectedRevision})
Expand Down Expand Up @@ -122,6 +124,8 @@ func TestGetRevisionsDetails(t *testing.T) {
&metrics.MetricsServer{},
fakeArgoDb(),
"0.0.1",
false,
nil,
}

result, _ := reporter.getRevisionsDetails(context.Background(), &app, []string{expectedRevision1, expectedRevision2})
Expand Down Expand Up @@ -165,6 +169,8 @@ func TestGetRevisionsDetails(t *testing.T) {
&metrics.MetricsServer{},
fakeArgoDb(),
"0.0.1",
false,
nil,
}

result, _ := reporter.getRevisionsDetails(context.Background(), &app, []string{expectedRevision})
Expand Down
38 changes: 33 additions & 5 deletions event_reporter/reporter/application_event_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strings"
"time"

"github.com/argoproj/argo-cd/v2/pkg/sources_server_client"

"github.com/argoproj/argo-cd/v2/util/db"

"github.com/argoproj/argo-cd/v2/event_reporter/utils"
Expand Down Expand Up @@ -46,6 +48,8 @@ type applicationEventReporter struct {
metricsServer *metrics.MetricsServer
db db.ArgoDB
runtimeVersion string
useSourcesServer bool
sourcesServerClient sources_server_client.SourceServerClientInteface
}

type ApplicationEventReporter interface {
Expand All @@ -59,7 +63,7 @@ type ApplicationEventReporter interface {
ShouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) (shouldSend bool, syncStatusChanged bool)
}

func NewApplicationEventReporter(cache *servercache.Cache, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, db db.ArgoDB) ApplicationEventReporter {
func NewApplicationEventReporter(cache *servercache.Cache, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, db db.ArgoDB, useSourcesServer bool, sourcesServerConfig *sources_server_client.SourcesServerConfig) ApplicationEventReporter {
return &applicationEventReporter{
cache: cache,
applicationServiceClient: applicationServiceClient,
Expand All @@ -68,6 +72,8 @@ func NewApplicationEventReporter(cache *servercache.Cache, applicationServiceCli
metricsServer: metricsServer,
db: db,
runtimeVersion: codefreshConfig.RuntimeVersion,
useSourcesServer: useSourcesServer,
sourcesServerClient: sources_server_client.NewSourceServerClient(sourcesServerConfig),
}
}

Expand Down Expand Up @@ -265,7 +271,20 @@ func (s *applicationEventReporter) resolveApplicationVersions(ctx context.Contex
}

syncManifests, _ := s.getDesiredManifests(ctx, logCtx, a, nil, &sourcePositions, syncResultRevisions)
return syncManifests.GetApplicationVersions()

var applicationVersions *apiclient.ApplicationVersions
if s.useSourcesServer {
log.Infof("cfGetAppVersion. Getting version from sourcesserver")
if len(*syncResultRevisions) == 0 {
return nil
}
appVers := s.sourcesServerClient.GetAppVersion(a, &(*syncResultRevisions)[0])
applicationVersions = utils.SourcesAppVersionsToRepo(appVers, logCtx)
} else {
applicationVersions = syncManifests.GetApplicationVersions()
}

return applicationVersions
}

syncResultRevision := utils.GetOperationSyncResultRevision(a)
Expand All @@ -275,7 +294,16 @@ func (s *applicationEventReporter) resolveApplicationVersions(ctx context.Contex
}

syncManifests, _ := s.getDesiredManifests(ctx, logCtx, a, syncResultRevision, nil, nil)
return syncManifests.GetApplicationVersions()

var applicationVersions *apiclient.ApplicationVersions
if s.useSourcesServer {
log.Infof("cfGetAppVersion. Getting version from sourcesserver")
appVers := s.sourcesServerClient.GetAppVersion(a, syncResultRevision)
applicationVersions = utils.SourcesAppVersionsToRepo(appVers, logCtx)
} else {
applicationVersions = syncManifests.GetApplicationVersions()
}
return applicationVersions
}

func (s *applicationEventReporter) getAppForResourceReporting(
Expand Down Expand Up @@ -309,8 +337,8 @@ func (s *applicationEventReporter) processResource(
appEventProcessingStartedAt string,
desiredManifests *apiclient.ManifestResponse,
manifestGenErr bool,
originalApplication *appv1.Application, // passed onlu if resource is app
applicationVersions *apiclient.ApplicationVersions, // passed onlu if resource is app
originalApplication *appv1.Application, // passed only if resource is app
applicationVersions *apiclient.ApplicationVersions, // passed only if resource is app
reportedEntityParentApp *ReportedEntityParentApp,
argoTrackingMetadata *ArgoTrackingMetadata,
) error {
Expand Down
2 changes: 2 additions & 0 deletions event_reporter/reporter/application_event_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ func fakeReporter(customAppServiceClient appclient.ApplicationClient) *applicati
metricsServ,
fakeArgoDb(),
"0.0.1",
false,
nil,
}
}

Expand Down
6 changes: 5 additions & 1 deletion event_reporter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"strings"
"time"

"github.com/argoproj/argo-cd/v2/pkg/sources_server_client"

appclient "github.com/argoproj/argo-cd/v2/event_reporter/application"
"github.com/argoproj/argo-cd/v2/event_reporter/reporter"

Expand Down Expand Up @@ -95,6 +97,8 @@ type EventReporterServerOpts struct {
RootPath string
CodefreshConfig *codefresh.CodefreshConfig
RateLimiterOpts *reporter.RateLimiterOpts
UseSourcesServer bool
SourcesServerConfig *sources_server_client.SourcesServerConfig
}

type handlerSwitcher struct {
Expand Down Expand Up @@ -153,7 +157,7 @@ func (a *EventReporterServer) Init(ctx context.Context) {
}

func (a *EventReporterServer) RunController(ctx context.Context) {
controller := event_reporter.NewEventReporterController(a.appInformer, a.Cache, a.settingsMgr, a.ApplicationServiceClient, a.appLister, a.CodefreshConfig, a.serviceSet.MetricsServer, a.featureManager, a.RateLimiterOpts, a.db)
controller := event_reporter.NewEventReporterController(a.appInformer, a.Cache, a.settingsMgr, a.ApplicationServiceClient, a.appLister, a.CodefreshConfig, a.serviceSet.MetricsServer, a.featureManager, a.RateLimiterOpts, a.db, a.UseSourcesServer, a.SourcesServerConfig)
go controller.Run(ctx)
}

Expand Down
17 changes: 17 additions & 0 deletions event_reporter/utils/app_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package utils
import (
"encoding/json"

log "github.com/sirupsen/logrus"

"github.com/argoproj/argo-cd/v2/pkg/apiclient/events"
"github.com/argoproj/argo-cd/v2/pkg/sources_server_client"
"github.com/argoproj/argo-cd/v2/reposerver/apiclient"
)

Expand All @@ -16,3 +19,17 @@ func RepoAppVersionsToEvent(applicationVersions *apiclient.ApplicationVersions)
}
return applicationVersionsEvents, nil
}

func SourcesAppVersionsToRepo(applicationVersions *sources_server_client.AppVersionResult, logCtx *log.Entry) *apiclient.ApplicationVersions {
if applicationVersions == nil {
return nil
}
applicationVersionsRepo := &apiclient.ApplicationVersions{}
applicationVersionsData, _ := json.Marshal(applicationVersions)
err := json.Unmarshal(applicationVersionsData, applicationVersionsRepo)
if err != nil {
logCtx.Errorf("can't unmarshal app version: %v", err)
return nil
}
return applicationVersionsRepo
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dlclark/regexp2 v1.11.2
github.com/dlclark/regexp2 v1.11.4
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/evanphx/json-patch/v5 v5.8.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -848,8 +848,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0=
github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
github.com/dlclark/regexp2 v1.11.2 h1:/u628IuisSTwri5/UKloiIsH8+qF2Pu7xEQX+yIKg68=
github.com/dlclark/regexp2 v1.11.2/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
github.com/dlclark/regexp2 v1.11.4 h1:rPYF9/LECdNymJufQKmri9gV604RvvABwgOA8un7yAo=
github.com/dlclark/regexp2 v1.11.4/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c=
github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
Expand Down
108 changes: 108 additions & 0 deletions pkg/sources_server_client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package sources_server_client

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
netUrl "net/url"

log "github.com/sirupsen/logrus"

"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
)

type VersionPayload struct {
App v1alpha1.Application `json:"app"`
Revision string `json:"revision"`
}

type DependenciesMap struct {
Lock string `json:"helm/Chart.lock"`
Deps string `json:"helm/dependencies"`
Requirements string `json:"helm/requirements.yaml"`
}

type AppVersionResult struct {
AppVersion string `json:"appVersion"`
Dependencies DependenciesMap `json:"dependencies"`
}

type SourcesServerConfig struct {
BaseURL string
}

type sourceServerClient struct {
clientConfig *SourcesServerConfig
}

type SourceServerClientInteface interface {
GetAppVersion(app *v1alpha1.Application, revision *string) *AppVersionResult
}

func (c *sourceServerClient) sendRequest(method, url string, payload interface{}) ([]byte, error) {
var requestBody []byte
var err error
if payload != nil {
requestBody, err = json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("error marshalling payload: %w", err)
}
}

fullURL, err := netUrl.JoinPath(c.clientConfig.BaseURL, url)
if err != nil {
return nil, fmt.Errorf("error joining path: %w", err)
}

req, err := http.NewRequest(method, fullURL, bytes.NewBuffer(requestBody))
if err != nil {
return nil, fmt.Errorf("error creating request: %w", err)
}

req.Header.Set("Content-Type", "application/json")

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("error making request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode < 200 || resp.StatusCode >= 300 {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("server responded with status %d: %s", resp.StatusCode, string(body))
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response body: %w", err)
}

return body, nil
}

func (c *sourceServerClient) GetAppVersion(app *v1alpha1.Application, revision *string) *AppVersionResult {
log.Infof("cfGetAppVersion. Sending request to sources-server for %s", app.Name)
appVersionResult, err := c.sendRequest("POST", "/getAppVersion", VersionPayload{App: *app, Revision: *revision})
if err != nil {
log.Errorf("error getting app version: %v", err)
return nil
}

var versionStruct AppVersionResult
err = json.Unmarshal(appVersionResult, &versionStruct)
if err != nil {
log.Errorf("error unmarshaling app version: %v", err)
return nil
}

return &versionStruct
}

func NewSourceServerClient(clientConfig *SourcesServerConfig) SourceServerClientInteface {
return &sourceServerClient{
clientConfig: clientConfig,
}
}

0 comments on commit 4dbe2c4

Please sign in to comment.