Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use new versioning fields #7119

Merged
merged 5 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,134 changes: 1,077 additions & 1,057 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

236 changes: 128 additions & 108 deletions api/taskqueue/v1/message.pb.go

Large diffs are not rendered by default.

19 changes: 16 additions & 3 deletions common/testing/testvars/test_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/worker_versioning"
"google.golang.org/protobuf/types/known/durationpb"
)

Expand Down Expand Up @@ -276,9 +277,9 @@ func (tv *TestVars) WithDeploymentNameNumber(n int) *TestVars {
return tv.cloneSetN("deployment_name", n)
}

func (tv *TestVars) DeploymentTransition() *workflowpb.DeploymentTransition {
return &workflowpb.DeploymentTransition{
Deployment: tv.Deployment(),
func (tv *TestVars) DeploymentVersionTransition() *workflowpb.DeploymentVersionTransition {
return &workflowpb.DeploymentVersionTransition{
DeploymentVersion: worker_versioning.DeploymentVersionFromDeployment(tv.Deployment()),
}
}

Expand Down Expand Up @@ -417,3 +418,15 @@ func (tv *TestVars) Any() Any {
func (tv *TestVars) Global() Global {
return newGlobal()
}

func (tv *TestVars) WorkerDeploymentOptions(versioned bool) *deploymentpb.WorkerDeploymentOptions {
m := enumspb.WORKFLOW_VERSIONING_MODE_UNVERSIONED
if versioned {
m = enumspb.WORKFLOW_VERSIONING_MODE_VERSIONING_BEHAVIORS
}
return &deploymentpb.WorkerDeploymentOptions{
Version: tv.BuildID(),
Name: tv.DeploymentSeries(),
Comment on lines +428 to +429
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can now use: tv.DeploymentName() and tv.DeploymentVersion

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I'm actually removing them in https://github.com/temporalio/temporal/pull/7126/files#diff-969e474fa5d1098f4d0735292efd98defd6af7cc7cf4506613dc5e07991f4c46R263. The idea is we just use old BuildID and DeploymentSeries testvars in all places. Until we refactor the code and get rid of old names altogether. The problem with having two testvars (say DeploymentSeries and DeploymentName) is that test helper functions don't know to use which one and that defeats the purpose of testvars.

Copy link
Member

@Shivs11 Shivs11 Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test helper functions don't know to use which one and that defeats the purpose of testvars

can we not just write newer tests with the updated test-vars names and then when removing the old versioning names altogether, remove the old test-vars objects too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that, but that would require creating a clone of versioning_3_tests.go (which is not a bad idea, but not what I did, I just repurposed the tests to use new proto fields.)

WorkflowVersioningMode: m,
}
}
87 changes: 84 additions & 3 deletions common/worker_versioning/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,16 @@ func BuildIdIfUsingVersioning(stamp *commonpb.WorkerVersionStamp) string {
}

// DeploymentFromCapabilities returns the deployment if it is using versioning V3, otherwise nil.
func DeploymentFromCapabilities(capabilities *commonpb.WorkerVersionCapabilities) *deploymentpb.Deployment {
// It returns the deployment from the `options` if present, otherwise, from `capabilities`,
func DeploymentFromCapabilities(capabilities *commonpb.WorkerVersionCapabilities, options *deploymentpb.WorkerDeploymentOptions) *deploymentpb.Deployment {
if options.GetWorkflowVersioningMode() != enumspb.WORKFLOW_VERSIONING_MODE_UNVERSIONED &&
options.GetName() != "" &&
options.GetVersion() != "" {
return &deploymentpb.Deployment{
SeriesName: options.GetName(),
BuildId: options.GetVersion(),
}
}
if capabilities.GetUseVersioning() && capabilities.GetDeploymentSeriesName() != "" && capabilities.GetBuildId() != "" {
return &deploymentpb.Deployment{
SeriesName: capabilities.GetDeploymentSeriesName(),
Expand All @@ -169,6 +178,23 @@ func DeploymentFromCapabilities(capabilities *commonpb.WorkerVersionCapabilities
return nil
}

// DeploymentOrVersion Temporary helper function to return a Deployment based on passed Deployment
// or WorkerDeploymentVersion objects, if `v` is not nil, it'll take precedence.
func DeploymentOrVersion(d *deploymentpb.Deployment, v *deploymentpb.WorkerDeploymentVersion) *deploymentpb.Deployment {
if v != nil {
return DeploymentIfValid(DeploymentFromDeploymentVersion(v))
}
return DeploymentIfValid(d)
}

// DeploymentIfValid returns the deployment back if is both of its fields have value.
func DeploymentIfValid(d *deploymentpb.Deployment) *deploymentpb.Deployment {
if d.GetSeriesName() != "" && d.GetBuildId() != "" {
return d
}
return nil
}

// DeploymentToString is intended to be used for logs and metrics only. Theoretically, it can map
// different deployments to the string.
// DO NOT USE IN SERVER LOGIC.
Expand All @@ -195,7 +221,10 @@ func MakeDirectiveForWorkflowTask(
deployment *deploymentpb.Deployment,
) *taskqueuespb.TaskVersionDirective {
if behavior != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED {
return &taskqueuespb.TaskVersionDirective{Behavior: behavior, Deployment: deployment}
return &taskqueuespb.TaskVersionDirective{
Behavior: behavior,
DeploymentVersion: DeploymentVersionFromDeployment(deployment),
}
}
if id := BuildIdIfUsingVersioning(stamp); id != "" && assignedBuildId == "" {
// TODO: old versioning only [cleanup-old-wv]
Expand All @@ -211,6 +240,30 @@ func MakeDirectiveForWorkflowTask(
return nil
}

// DeploymentVersionFromDeployment Temporary helper function to convert Deployment to
// WorkerDeploymentVersion proto until we update code to use the new proto in all places.
func DeploymentVersionFromDeployment(deployment *deploymentpb.Deployment) *deploymentpb.WorkerDeploymentVersion {
if deployment == nil {
return nil
}
return &deploymentpb.WorkerDeploymentVersion{
Version: deployment.GetBuildId(),
DeploymentName: deployment.GetSeriesName(),
}
}

// DeploymentFromDeploymentVersion Temporary helper function to convert WorkerDeploymentVersion to
// Deployment proto until we update code to use the new proto in all places.
func DeploymentFromDeploymentVersion(dv *deploymentpb.WorkerDeploymentVersion) *deploymentpb.Deployment {
if dv == nil {
return nil
}
return &deploymentpb.Deployment{
BuildId: dv.GetVersion(),
SeriesName: dv.GetDeploymentName(),
}
}

func MakeUseAssignmentRulesDirective() *taskqueuespb.TaskVersionDirective {
return &taskqueuespb.TaskVersionDirective{BuildId: &taskqueuespb.TaskVersionDirective_UseAssignmentRules{UseAssignmentRules: &emptypb.Empty{}}}
}
Expand Down Expand Up @@ -252,6 +305,21 @@ func ValidateDeployment(deployment *deploymentpb.Deployment) error {
return nil
}

// ValidateDeploymentVersion returns error if the deployment version is nil or it has empty version
// or deployment name.
func ValidateDeploymentVersion(version *deploymentpb.WorkerDeploymentVersion) error {
if version == nil {
return serviceerror.NewInvalidArgument("deployment cannot be nil")
}
if version.GetDeploymentName() == "" {
return serviceerror.NewInvalidArgument("deployment name name cannot be empty")
}
if version.GetVersion() == "" {
return serviceerror.NewInvalidArgument("version cannot be empty")
}
return nil
}

func ValidateVersioningOverride(override *workflowpb.VersioningOverride) error {
if override == nil {
return nil
Expand All @@ -260,13 +328,18 @@ func ValidateVersioningOverride(override *workflowpb.VersioningOverride) error {
case enumspb.VERSIONING_BEHAVIOR_PINNED:
if override.GetDeployment() != nil {
return ValidateDeployment(override.GetDeployment())
} else if override.GetPinnedVersion() != nil {
return ValidateDeploymentVersion(override.GetPinnedVersion())
} else {
return serviceerror.NewInvalidArgument("must provide deployment if behavior is 'PINNED'")
}
case enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE:
if override.GetDeployment() != nil {
return serviceerror.NewInvalidArgument("only provide deployment if behavior is 'PINNED'")
}
if override.GetPinnedVersion() != nil {
return serviceerror.NewInvalidArgument("only provide pinned version if behavior is 'PINNED'")
}
case enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED:
return serviceerror.NewInvalidArgument("override behavior is required")
default:
Expand Down Expand Up @@ -307,7 +380,7 @@ func ValidateTaskVersionDirective(
directiveBehavior, wfBehavior))
}

directiveDeployment := directive.GetDeployment()
directiveDeployment := DirectiveDeployment(directive)
if directiveDeployment == nil {
// TODO: remove this once the ScheduledDeployment field is removed from proto
directiveDeployment = scheduledDeployment
Expand All @@ -321,3 +394,11 @@ func ValidateTaskVersionDirective(
}
return nil
}

// DirectiveDeployment Temporary function until Directive proto is removed.
func DirectiveDeployment(directive *taskqueuespb.TaskVersionDirective) *deploymentpb.Deployment {
if dv := directive.GetDeploymentVersion(); dv != nil {
return DeploymentFromDeploymentVersion(dv)
}
return directive.GetDeployment()
}
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,12 @@ message ActivityInfo {

// The deployment this activity was dispatched to most recently. Present only if the activity
// was dispatched to a versioned worker.
// Deprecated. Replaced by last_deployment_version.
temporal.api.deployment.v1.Deployment last_started_deployment = 43;

// The deployment this activity was dispatched to most recently. Present only if the activity
// was dispatched to a versioned worker.
temporal.api.deployment.v1.WorkerDeploymentVersion last_deployment_version = 44;
}

// timer_map column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ message TaskVersionDirective {
// Default (if build_id is not present) is "unversioned":
// Use the unversioned task queue, even if the task queue has versioning data.
// Absent value means the task is the non-starting task of an unversioned execution so it should remain unversioned.
// Deprecated. Use deployment_version.
oneof build_id {
// If use_assignment_rules is present, matching should use the assignment rules
// to determine the build ID.
Expand All @@ -54,7 +55,10 @@ message TaskVersionDirective {
// Workflow's effective behavior when the task is scheduled.
temporal.api.enums.v1.VersioningBehavior behavior = 3;
// Workflow's effective deployment when the task is scheduled.
// Deprecated. Use deployment_version.
temporal.api.deployment.v1.Deployment deployment = 4;
// Workflow's effective deployment version when the task is scheduled.
temporal.api.deployment.v1.WorkerDeploymentVersion deployment_version = 5;
}

message InternalTaskQueueStatus {
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/recordactivitytaskstarted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func recordActivityTaskStarted(

wfBehavior := mutableState.GetEffectiveVersioningBehavior()
wfDeployment := mutableState.GetEffectiveDeployment()
pollerDeployment := worker_versioning.DeploymentFromCapabilities(request.PollRequest.WorkerVersionCapabilities)
pollerDeployment := worker_versioning.DeploymentFromCapabilities(request.PollRequest.WorkerVersionCapabilities, request.PollRequest.DeploymentOptions)
err = worker_versioning.ValidateTaskVersionDirective(request.GetVersionDirective(), wfBehavior, wfDeployment, request.ScheduledDeployment)
if err != nil {
return nil, false, err
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/recordworkflowtaskstarted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func Invoke(

wfBehavior := mutableState.GetEffectiveVersioningBehavior()
wfDeployment := mutableState.GetEffectiveDeployment()
pollerDeployment := worker_versioning.DeploymentFromCapabilities(req.PollRequest.WorkerVersionCapabilities)
pollerDeployment := worker_versioning.DeploymentFromCapabilities(req.PollRequest.WorkerVersionCapabilities, req.PollRequest.DeploymentOptions)
err = worker_versioning.ValidateTaskVersionDirective(req.GetVersionDirective(), wfBehavior, wfDeployment, req.ScheduledDeployment)
if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions service/history/api/updateworkflowoptions/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ var (
}
pinnedOverrideOptionsA = &workflowpb.WorkflowExecutionOptions{
VersioningOverride: &workflowpb.VersioningOverride{
Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED,
Deployment: &deploymentpb.Deployment{SeriesName: "X", BuildId: "A"},
Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED,
PinnedVersion: &deploymentpb.WorkerDeploymentVersion{DeploymentName: "X", Version: "A"},
},
}
pinnedOverrideOptionsB = &workflowpb.WorkflowExecutionOptions{
VersioningOverride: &workflowpb.VersioningOverride{
Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED,
Deployment: &deploymentpb.Deployment{SeriesName: "X", BuildId: "B"},
Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED,
PinnedVersion: &deploymentpb.WorkerDeploymentVersion{DeploymentName: "X", Version: "B"},
},
}
)
Expand Down
3 changes: 2 additions & 1 deletion service/history/historybuilder/event_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"go.temporal.io/server/common"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/worker_versioning"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -170,7 +171,7 @@ func (b *EventFactory) CreateWorkflowTaskCompletedEvent(
WorkerVersion: workerVersionStamp,
SdkMetadata: sdkMetadata,
MeteringMetadata: meteringMetadata,
Deployment: deployment,
DeploymentVersion: worker_versioning.DeploymentVersionFromDeployment(deployment),
VersioningBehavior: behavior,
},
}
Expand Down
5 changes: 4 additions & 1 deletion service/history/worker_versioning_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,10 @@ func MakeDirectiveForWorkflowTask(ms workflow.MutableState) *taskqueuespb.TaskVe

func MakeDirectiveForActivityTask(mutableState workflow.MutableState, activityInfo *persistencespb.ActivityInfo) *taskqueuespb.TaskVersionDirective {
if behavior := mutableState.GetEffectiveVersioningBehavior(); behavior != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED {
return &taskqueuespb.TaskVersionDirective{Behavior: behavior, Deployment: mutableState.GetEffectiveDeployment()}
d := mutableState.GetEffectiveDeployment()
return &taskqueuespb.TaskVersionDirective{Behavior: behavior,
DeploymentVersion: worker_versioning.DeploymentVersionFromDeployment(d),
}
}
if !activityInfo.UseCompatibleVersion && activityInfo.GetUseWorkflowBuildIdInfo() == nil {
return worker_versioning.MakeUseAssignmentRulesDirective()
Expand Down
4 changes: 2 additions & 2 deletions service/history/workflow/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func GetPendingActivityInfo(
now := shardContext.GetTimeSource().Now().UTC()

p := &workflowpb.PendingActivityInfo{
ActivityId: ai.ActivityId,
LastDeployment: ai.LastStartedDeployment,
ActivityId: ai.ActivityId,
LastDeploymentVersion: ai.LastDeploymentVersion,
}
if ai.GetUseWorkflowBuildIdInfo() != nil {
p.AssignedBuildId = &workflowpb.PendingActivityInfo_UseWorkflowBuildId{UseWorkflowBuildId: &emptypb.Empty{}}
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ type (

ShouldResetActivityTimerTaskMask(current, incoming *persistencespb.ActivityInfo) bool
// GetEffectiveDeployment returns the effective deployment in the following order:
// 1. DeploymentTransition.Deployment: this is returned when the wf is transitioning to a
// 1. DeploymentVersionTransition.Deployment: this is returned when the wf is transitioning to a
// new deployment
// 2. VersioningOverride.Deployment: this is returned when user has set a PINNED override
// at wf start time, or later via UpdateWorkflowExecutionOptions.
Expand Down
34 changes: 27 additions & 7 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3159,7 +3159,7 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent(
}
}

ai.LastStartedDeployment = deployment
ai.LastDeploymentVersion = worker_versioning.DeploymentVersionFromDeployment(deployment)

if !ai.HasRetryPolicy {
event := ms.hBuilder.AddActivityTaskStartedEvent(
Expand Down Expand Up @@ -4411,10 +4411,20 @@ func (ms *MutableStateImpl) ApplyWorkflowExecutionOptionsUpdatedEvent(event *his
override := event.GetWorkflowExecutionOptionsUpdatedEventAttributes().GetVersioningOverride()
previousEffectiveDeployment := ms.GetEffectiveDeployment()
previousEffectiveVersioningBehavior := ms.GetEffectiveVersioningBehavior()
if ms.GetExecutionInfo().GetVersioningInfo() == nil {
ms.GetExecutionInfo().VersioningInfo = &workflowpb.WorkflowExecutionVersioningInfo{}
if override != nil {
if ms.GetExecutionInfo().GetVersioningInfo() == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we wanna set ms.GetExecutionInfo().VersioningInfo = &workflowpb.WorkflowExecutionVersioningInfo{} regardless of override being nil or not?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, the idea was if override is nil it means we have an override already and want to unset it, hence ms.GetExecutionInfo().VersioningInfo has already a value. But I don't think it's wise to make that assumption. I improved the else block in line 4423.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, this looks much better - thanks for addressing

ms.GetExecutionInfo().VersioningInfo = &workflowpb.WorkflowExecutionVersioningInfo{}
}
ms.GetExecutionInfo().VersioningInfo.VersioningOverride = &workflowpb.VersioningOverride{
Behavior: override.GetBehavior(),
// We read from both old and new fields but write in the new fields only.
//nolint:staticcheck // SA1019 deprecated Deployment will clean up later
PinnedVersion: worker_versioning.DeploymentVersionFromDeployment(worker_versioning.DeploymentOrVersion(override.GetDeployment(), override.GetPinnedVersion())),
}
} else if ms.GetExecutionInfo().GetVersioningInfo() != nil {
// TODO (shahab): this behavior has changed in main branch. Update it when merging to main.
ms.GetExecutionInfo().VersioningInfo.VersioningOverride = nil
}
ms.GetExecutionInfo().VersioningInfo.VersioningOverride = override

if !proto.Equal(ms.GetEffectiveDeployment(), previousEffectiveDeployment) ||
ms.GetEffectiveVersioningBehavior() != previousEffectiveVersioningBehavior {
Expand Down Expand Up @@ -4442,6 +4452,7 @@ func (ms *MutableStateImpl) ApplyWorkflowExecutionOptionsUpdatedEvent(event *his
// and it will start the same transition in the workflow. So removing the transition would not make a difference
// and would in fact add some extra work for the server.
ms.executionInfo.GetVersioningInfo().DeploymentTransition = nil
ms.executionInfo.GetVersioningInfo().VersionTransition = nil

// If effective deployment or behavior change, we need to reschedule any pending tasks, because History will reject
// the task's start request if the task is being started by a poller that is not from the workflow's effective
Expand Down Expand Up @@ -7312,7 +7323,7 @@ func (ms *MutableStateImpl) initVersionedTransitionInDB() {
}

// GetEffectiveDeployment returns the effective deployment in the following order:
// 1. DeploymentTransition.Deployment: this is returned when the wf is transitioning to a
// 1. DeploymentVersionTransition.Deployment: this is returned when the wf is transitioning to a
// new deployment
// 2. VersioningOverride.Deployment: this is returned when user has set a PINNED override
// at wf start time, or later via UpdateWorkflowExecutionOptions.
Expand All @@ -7327,6 +7338,12 @@ func (ms *MutableStateImpl) GetEffectiveDeployment() *deploymentpb.Deployment {
}

func (ms *MutableStateImpl) GetDeploymentTransition() *workflowpb.DeploymentTransition {
vi := ms.GetExecutionInfo().GetVersioningInfo()
if t := vi.GetVersionTransition(); t != nil {
return &workflowpb.DeploymentTransition{
Deployment: worker_versioning.DeploymentFromDeploymentVersion(t.GetDeploymentVersion()),
}
}
return ms.GetExecutionInfo().GetVersioningInfo().GetDeploymentTransition()
}

Expand Down Expand Up @@ -7361,8 +7378,11 @@ func (ms *MutableStateImpl) StartDeploymentTransition(deployment *deploymentpb.D
ms.GetExecutionInfo().VersioningInfo = versioningInfo
}

versioningInfo.DeploymentTransition = &workflowpb.DeploymentTransition{
Deployment: deployment,
// Only store transition in VersionTransition but read from both VersionTransition and DeploymentVersionTransition.
//nolint:staticcheck // SA1019 deprecated DeploymentTransition will clean up later
versioningInfo.DeploymentTransition = nil
versioningInfo.VersionTransition = &workflowpb.DeploymentVersionTransition{
DeploymentVersion: worker_versioning.DeploymentVersionFromDeployment(deployment),
}

// Because deployment is changed, we clear sticky queue to make sure the next wf task does not
Expand Down
Loading
Loading