Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use new versioning fields #7119

Merged
merged 5 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,134 changes: 1,077 additions & 1,057 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

236 changes: 128 additions & 108 deletions api/taskqueue/v1/message.pb.go

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions client/frontend/client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions client/frontend/metric_client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions client/frontend/retryable_client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions common/rpc/interceptor/logtags/workflow_service_server_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 16 additions & 3 deletions common/testing/testvars/test_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/worker_versioning"
"google.golang.org/protobuf/types/known/durationpb"
)

Expand Down Expand Up @@ -255,9 +256,9 @@ func (tv *TestVars) Deployment() *deploymentpb.Deployment {
}
}

func (tv *TestVars) DeploymentTransition() *workflowpb.DeploymentTransition {
return &workflowpb.DeploymentTransition{
Deployment: tv.Deployment(),
func (tv *TestVars) DeploymentVersionTransition() *workflowpb.DeploymentVersionTransition {
return &workflowpb.DeploymentVersionTransition{
DeploymentVersion: worker_versioning.DeploymentVersionFromDeployment(tv.Deployment()),
}
}

Expand Down Expand Up @@ -396,3 +397,15 @@ func (tv *TestVars) Any() Any {
func (tv *TestVars) Global() Global {
return newGlobal()
}

func (tv *TestVars) WorkerDeploymentOptions(versioned bool) *deploymentpb.WorkerDeploymentOptions {
m := enumspb.WORKFLOW_VERSIONING_MODE_UNVERSIONED
if versioned {
m = enumspb.WORKFLOW_VERSIONING_MODE_VERSIONING_BEHAVIORS
}
return &deploymentpb.WorkerDeploymentOptions{
Version: tv.BuildID(),
Name: tv.DeploymentSeries(),
Comment on lines +428 to +429
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can now use: tv.DeploymentName() and tv.DeploymentVersion

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I'm actually removing them in https://github.com/temporalio/temporal/pull/7126/files#diff-969e474fa5d1098f4d0735292efd98defd6af7cc7cf4506613dc5e07991f4c46R263. The idea is we just use old BuildID and DeploymentSeries testvars in all places. Until we refactor the code and get rid of old names altogether. The problem with having two testvars (say DeploymentSeries and DeploymentName) is that test helper functions don't know to use which one and that defeats the purpose of testvars.

Copy link
Member

@Shivs11 Shivs11 Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test helper functions don't know to use which one and that defeats the purpose of testvars

can we not just write newer tests with the updated test-vars names and then when removing the old versioning names altogether, remove the old test-vars objects too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that, but that would require creating a clone of versioning_3_tests.go (which is not a bad idea, but not what I did, I just repurposed the tests to use new proto fields.)

WorkflowVersioningMode: m,
}
}
87 changes: 84 additions & 3 deletions common/worker_versioning/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,16 @@ func BuildIdIfUsingVersioning(stamp *commonpb.WorkerVersionStamp) string {
}

// DeploymentFromCapabilities returns the deployment if it is using versioning V3, otherwise nil.
func DeploymentFromCapabilities(capabilities *commonpb.WorkerVersionCapabilities) *deploymentpb.Deployment {
// It returns the deployment from the `options` if present, otherwise, from `capabilities`,
func DeploymentFromCapabilities(capabilities *commonpb.WorkerVersionCapabilities, options *deploymentpb.WorkerDeploymentOptions) *deploymentpb.Deployment {
if options.GetWorkflowVersioningMode() != enumspb.WORKFLOW_VERSIONING_MODE_UNVERSIONED &&
options.GetName() != "" &&
options.GetVersion() != "" {
return &deploymentpb.Deployment{
SeriesName: options.GetName(),
BuildId: options.GetVersion(),
}
}
if capabilities.GetUseVersioning() && capabilities.GetDeploymentSeriesName() != "" && capabilities.GetBuildId() != "" {
return &deploymentpb.Deployment{
SeriesName: capabilities.GetDeploymentSeriesName(),
Expand All @@ -169,6 +178,23 @@ func DeploymentFromCapabilities(capabilities *commonpb.WorkerVersionCapabilities
return nil
}

// DeploymentOrVersion Temporary helper function to return a Deployment based on passed Deployment
// or WorkerDeploymentVersion objects, if `v` is not nil, it'll take precedence.
func DeploymentOrVersion(d *deploymentpb.Deployment, v *deploymentpb.WorkerDeploymentVersion) *deploymentpb.Deployment {
if v != nil {
return DeploymentIfValid(DeploymentFromDeploymentVersion(v))
}
return DeploymentIfValid(d)
}

// DeploymentIfValid returns the deployment back if is both of its fields have value.
func DeploymentIfValid(d *deploymentpb.Deployment) *deploymentpb.Deployment {
if d.GetSeriesName() != "" && d.GetBuildId() != "" {
return d
}
return nil
}

// DeploymentToString is intended to be used for logs and metrics only. Theoretically, it can map
// different deployments to the string.
// DO NOT USE IN SERVER LOGIC.
Expand All @@ -195,7 +221,10 @@ func MakeDirectiveForWorkflowTask(
deployment *deploymentpb.Deployment,
) *taskqueuespb.TaskVersionDirective {
if behavior != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED {
return &taskqueuespb.TaskVersionDirective{Behavior: behavior, Deployment: deployment}
return &taskqueuespb.TaskVersionDirective{
Behavior: behavior,
DeploymentVersion: DeploymentVersionFromDeployment(deployment),
}
}
if id := BuildIdIfUsingVersioning(stamp); id != "" && assignedBuildId == "" {
// TODO: old versioning only [cleanup-old-wv]
Expand All @@ -211,6 +240,30 @@ func MakeDirectiveForWorkflowTask(
return nil
}

// DeploymentVersionFromDeployment Temporary helper function to convert Deployment to
// WorkerDeploymentVersion proto until we update code to use the new proto in all places.
func DeploymentVersionFromDeployment(deployment *deploymentpb.Deployment) *deploymentpb.WorkerDeploymentVersion {
if deployment == nil {
return nil
}
return &deploymentpb.WorkerDeploymentVersion{
Version: deployment.GetBuildId(),
DeploymentName: deployment.GetSeriesName(),
}
}

// DeploymentFromDeploymentVersion Temporary helper function to convert WorkerDeploymentVersion to
// Deployment proto until we update code to use the new proto in all places.
func DeploymentFromDeploymentVersion(dv *deploymentpb.WorkerDeploymentVersion) *deploymentpb.Deployment {
if dv == nil {
return nil
}
return &deploymentpb.Deployment{
BuildId: dv.GetVersion(),
SeriesName: dv.GetDeploymentName(),
}
}

func MakeUseAssignmentRulesDirective() *taskqueuespb.TaskVersionDirective {
return &taskqueuespb.TaskVersionDirective{BuildId: &taskqueuespb.TaskVersionDirective_UseAssignmentRules{UseAssignmentRules: &emptypb.Empty{}}}
}
Expand Down Expand Up @@ -252,6 +305,21 @@ func ValidateDeployment(deployment *deploymentpb.Deployment) error {
return nil
}

// ValidateDeploymentVersion returns error if the deployment version is nil or it has empty version
// or deployment name.
func ValidateDeploymentVersion(version *deploymentpb.WorkerDeploymentVersion) error {
if version == nil {
return serviceerror.NewInvalidArgument("deployment cannot be nil")
}
if version.GetDeploymentName() == "" {
return serviceerror.NewInvalidArgument("deployment name name cannot be empty")
}
if version.GetVersion() == "" {
return serviceerror.NewInvalidArgument("version cannot be empty")
}
return nil
}

func ValidateVersioningOverride(override *workflowpb.VersioningOverride) error {
if override == nil {
return nil
Expand All @@ -260,13 +328,18 @@ func ValidateVersioningOverride(override *workflowpb.VersioningOverride) error {
case enumspb.VERSIONING_BEHAVIOR_PINNED:
if override.GetDeployment() != nil {
return ValidateDeployment(override.GetDeployment())
} else if override.GetPinnedVersion() != nil {
return ValidateDeploymentVersion(override.GetPinnedVersion())
} else {
return serviceerror.NewInvalidArgument("must provide deployment if behavior is 'PINNED'")
}
case enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE:
if override.GetDeployment() != nil {
return serviceerror.NewInvalidArgument("only provide deployment if behavior is 'PINNED'")
}
if override.GetPinnedVersion() != nil {
return serviceerror.NewInvalidArgument("only provide pinned version if behavior is 'PINNED'")
}
case enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED:
return serviceerror.NewInvalidArgument("override behavior is required")
default:
Expand Down Expand Up @@ -307,7 +380,7 @@ func ValidateTaskVersionDirective(
directiveBehavior, wfBehavior))
}

directiveDeployment := directive.GetDeployment()
directiveDeployment := DirectiveDeployment(directive)
if directiveDeployment == nil {
// TODO: remove this once the ScheduledDeployment field is removed from proto
directiveDeployment = scheduledDeployment
Expand All @@ -321,3 +394,11 @@ func ValidateTaskVersionDirective(
}
return nil
}

// DirectiveDeployment Temporary function until Directive proto is removed.
func DirectiveDeployment(directive *taskqueuespb.TaskVersionDirective) *deploymentpb.Deployment {
if dv := directive.GetDeploymentVersion(); dv != nil {
return DeploymentFromDeploymentVersion(dv)
}
return directive.GetDeployment()
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ retract (
v1.26.0 // Published accidentally.
)

replace go.temporal.io/api => go.temporal.io/api v1.43.2-0.20250116180204-00f5e1d8a293
replace go.temporal.io/api => go.temporal.io/api v1.43.2-0.20250117224308-f070b00264f6

require (
cloud.google.com/go/storage v1.41.0
Expand Down Expand Up @@ -58,7 +58,7 @@ require (
go.opentelemetry.io/otel/sdk v1.31.0
go.opentelemetry.io/otel/sdk/metric v1.31.0
go.opentelemetry.io/otel/trace v1.31.0
go.temporal.io/api v1.43.2-0.20250114194029-61f369a5f511
go.temporal.io/api v1.43.2-0.20250117001709-252998cf2351
go.temporal.io/sdk v1.31.0
go.temporal.io/version v0.3.0
go.uber.org/automaxprocs v1.5.3
Expand Down
Loading
Loading