Skip to content

Commit

Permalink
Use new versioning fields (#7119)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
Using new Deployment Options fields sent by SDK in Versioning 3
functionality. Old fields are still used when new fields are absent.
Implementation did not change, both new and old fields sent in polls and
task responses are still converted to old `Deployment` object and used
as before. Later, code will be refactored to change the `Deployment`
usages to `DeploymentVersion`.
Also added new fields to replace `Deployment` with `DeploymentVersion`
fields in internal protos where needed. Matching<->History communication
happens via these new fields, only new internal fields are written but
both new and old fields are read.

## Why?
<!-- Tell your future self why have you made these changes -->
Incorporating latest renames in Versioning APIs.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
Existing tests changed to use new fields (or both old and new depending
on the test).

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
None.

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->
None yet.

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
No.
  • Loading branch information
ShahabT authored Jan 21, 2025
1 parent 957bc26 commit 6fbdc7b
Show file tree
Hide file tree
Showing 28 changed files with 1,602 additions and 1,388 deletions.
2,134 changes: 1,077 additions & 1,057 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

236 changes: 128 additions & 108 deletions api/taskqueue/v1/message.pb.go

Large diffs are not rendered by default.

19 changes: 16 additions & 3 deletions common/testing/testvars/test_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/worker_versioning"
"google.golang.org/protobuf/types/known/durationpb"
)

Expand Down Expand Up @@ -276,9 +277,9 @@ func (tv *TestVars) WithDeploymentNameNumber(n int) *TestVars {
return tv.cloneSetN("deployment_name", n)
}

func (tv *TestVars) DeploymentTransition() *workflowpb.DeploymentTransition {
return &workflowpb.DeploymentTransition{
Deployment: tv.Deployment(),
func (tv *TestVars) DeploymentVersionTransition() *workflowpb.DeploymentVersionTransition {
return &workflowpb.DeploymentVersionTransition{
DeploymentVersion: worker_versioning.DeploymentVersionFromDeployment(tv.Deployment()),
}
}

Expand Down Expand Up @@ -417,3 +418,15 @@ func (tv *TestVars) Any() Any {
func (tv *TestVars) Global() Global {
return newGlobal()
}

func (tv *TestVars) WorkerDeploymentOptions(versioned bool) *deploymentpb.WorkerDeploymentOptions {
m := enumspb.WORKFLOW_VERSIONING_MODE_UNVERSIONED
if versioned {
m = enumspb.WORKFLOW_VERSIONING_MODE_VERSIONING_BEHAVIORS
}
return &deploymentpb.WorkerDeploymentOptions{
Version: tv.BuildID(),
Name: tv.DeploymentSeries(),
WorkflowVersioningMode: m,
}
}
87 changes: 84 additions & 3 deletions common/worker_versioning/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,16 @@ func BuildIdIfUsingVersioning(stamp *commonpb.WorkerVersionStamp) string {
}

// DeploymentFromCapabilities returns the deployment if it is using versioning V3, otherwise nil.
func DeploymentFromCapabilities(capabilities *commonpb.WorkerVersionCapabilities) *deploymentpb.Deployment {
// It returns the deployment from the `options` if present, otherwise, from `capabilities`,
func DeploymentFromCapabilities(capabilities *commonpb.WorkerVersionCapabilities, options *deploymentpb.WorkerDeploymentOptions) *deploymentpb.Deployment {
if options.GetWorkflowVersioningMode() != enumspb.WORKFLOW_VERSIONING_MODE_UNVERSIONED &&
options.GetName() != "" &&
options.GetVersion() != "" {
return &deploymentpb.Deployment{
SeriesName: options.GetName(),
BuildId: options.GetVersion(),
}
}
if capabilities.GetUseVersioning() && capabilities.GetDeploymentSeriesName() != "" && capabilities.GetBuildId() != "" {
return &deploymentpb.Deployment{
SeriesName: capabilities.GetDeploymentSeriesName(),
Expand All @@ -169,6 +178,23 @@ func DeploymentFromCapabilities(capabilities *commonpb.WorkerVersionCapabilities
return nil
}

// DeploymentOrVersion Temporary helper function to return a Deployment based on passed Deployment
// or WorkerDeploymentVersion objects, if `v` is not nil, it'll take precedence.
func DeploymentOrVersion(d *deploymentpb.Deployment, v *deploymentpb.WorkerDeploymentVersion) *deploymentpb.Deployment {
if v != nil {
return DeploymentIfValid(DeploymentFromDeploymentVersion(v))
}
return DeploymentIfValid(d)
}

// DeploymentIfValid returns the deployment back if is both of its fields have value.
func DeploymentIfValid(d *deploymentpb.Deployment) *deploymentpb.Deployment {
if d.GetSeriesName() != "" && d.GetBuildId() != "" {
return d
}
return nil
}

// DeploymentToString is intended to be used for logs and metrics only. Theoretically, it can map
// different deployments to the string.
// DO NOT USE IN SERVER LOGIC.
Expand All @@ -195,7 +221,10 @@ func MakeDirectiveForWorkflowTask(
deployment *deploymentpb.Deployment,
) *taskqueuespb.TaskVersionDirective {
if behavior != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED {
return &taskqueuespb.TaskVersionDirective{Behavior: behavior, Deployment: deployment}
return &taskqueuespb.TaskVersionDirective{
Behavior: behavior,
DeploymentVersion: DeploymentVersionFromDeployment(deployment),
}
}
if id := BuildIdIfUsingVersioning(stamp); id != "" && assignedBuildId == "" {
// TODO: old versioning only [cleanup-old-wv]
Expand All @@ -211,6 +240,30 @@ func MakeDirectiveForWorkflowTask(
return nil
}

// DeploymentVersionFromDeployment Temporary helper function to convert Deployment to
// WorkerDeploymentVersion proto until we update code to use the new proto in all places.
func DeploymentVersionFromDeployment(deployment *deploymentpb.Deployment) *deploymentpb.WorkerDeploymentVersion {
if deployment == nil {
return nil
}
return &deploymentpb.WorkerDeploymentVersion{
Version: deployment.GetBuildId(),
DeploymentName: deployment.GetSeriesName(),
}
}

// DeploymentFromDeploymentVersion Temporary helper function to convert WorkerDeploymentVersion to
// Deployment proto until we update code to use the new proto in all places.
func DeploymentFromDeploymentVersion(dv *deploymentpb.WorkerDeploymentVersion) *deploymentpb.Deployment {
if dv == nil {
return nil
}
return &deploymentpb.Deployment{
BuildId: dv.GetVersion(),
SeriesName: dv.GetDeploymentName(),
}
}

func MakeUseAssignmentRulesDirective() *taskqueuespb.TaskVersionDirective {
return &taskqueuespb.TaskVersionDirective{BuildId: &taskqueuespb.TaskVersionDirective_UseAssignmentRules{UseAssignmentRules: &emptypb.Empty{}}}
}
Expand Down Expand Up @@ -252,6 +305,21 @@ func ValidateDeployment(deployment *deploymentpb.Deployment) error {
return nil
}

// ValidateDeploymentVersion returns error if the deployment version is nil or it has empty version
// or deployment name.
func ValidateDeploymentVersion(version *deploymentpb.WorkerDeploymentVersion) error {
if version == nil {
return serviceerror.NewInvalidArgument("deployment cannot be nil")
}
if version.GetDeploymentName() == "" {
return serviceerror.NewInvalidArgument("deployment name name cannot be empty")
}
if version.GetVersion() == "" {
return serviceerror.NewInvalidArgument("version cannot be empty")
}
return nil
}

func ValidateVersioningOverride(override *workflowpb.VersioningOverride) error {
if override == nil {
return nil
Expand All @@ -260,13 +328,18 @@ func ValidateVersioningOverride(override *workflowpb.VersioningOverride) error {
case enumspb.VERSIONING_BEHAVIOR_PINNED:
if override.GetDeployment() != nil {
return ValidateDeployment(override.GetDeployment())
} else if override.GetPinnedVersion() != nil {
return ValidateDeploymentVersion(override.GetPinnedVersion())
} else {
return serviceerror.NewInvalidArgument("must provide deployment if behavior is 'PINNED'")
}
case enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE:
if override.GetDeployment() != nil {
return serviceerror.NewInvalidArgument("only provide deployment if behavior is 'PINNED'")
}
if override.GetPinnedVersion() != nil {
return serviceerror.NewInvalidArgument("only provide pinned version if behavior is 'PINNED'")
}
case enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED:
return serviceerror.NewInvalidArgument("override behavior is required")
default:
Expand Down Expand Up @@ -307,7 +380,7 @@ func ValidateTaskVersionDirective(
directiveBehavior, wfBehavior))
}

directiveDeployment := directive.GetDeployment()
directiveDeployment := DirectiveDeployment(directive)
if directiveDeployment == nil {
// TODO: remove this once the ScheduledDeployment field is removed from proto
directiveDeployment = scheduledDeployment
Expand All @@ -321,3 +394,11 @@ func ValidateTaskVersionDirective(
}
return nil
}

// DirectiveDeployment Temporary function until Directive proto is removed.
func DirectiveDeployment(directive *taskqueuespb.TaskVersionDirective) *deploymentpb.Deployment {
if dv := directive.GetDeploymentVersion(); dv != nil {
return DeploymentFromDeploymentVersion(dv)
}
return directive.GetDeployment()
}
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,12 @@ message ActivityInfo {

// The deployment this activity was dispatched to most recently. Present only if the activity
// was dispatched to a versioned worker.
// Deprecated. Replaced by last_deployment_version.
temporal.api.deployment.v1.Deployment last_started_deployment = 43;

// The deployment this activity was dispatched to most recently. Present only if the activity
// was dispatched to a versioned worker.
temporal.api.deployment.v1.WorkerDeploymentVersion last_deployment_version = 44;
}

// timer_map column
Expand Down
4 changes: 4 additions & 0 deletions proto/internal/temporal/server/api/taskqueue/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ message TaskVersionDirective {
// Default (if build_id is not present) is "unversioned":
// Use the unversioned task queue, even if the task queue has versioning data.
// Absent value means the task is the non-starting task of an unversioned execution so it should remain unversioned.
// Deprecated. Use deployment_version.
oneof build_id {
// If use_assignment_rules is present, matching should use the assignment rules
// to determine the build ID.
Expand All @@ -54,7 +55,10 @@ message TaskVersionDirective {
// Workflow's effective behavior when the task is scheduled.
temporal.api.enums.v1.VersioningBehavior behavior = 3;
// Workflow's effective deployment when the task is scheduled.
// Deprecated. Use deployment_version.
temporal.api.deployment.v1.Deployment deployment = 4;
// Workflow's effective deployment version when the task is scheduled.
temporal.api.deployment.v1.WorkerDeploymentVersion deployment_version = 5;
}

message InternalTaskQueueStatus {
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/recordactivitytaskstarted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func recordActivityTaskStarted(

wfBehavior := mutableState.GetEffectiveVersioningBehavior()
wfDeployment := mutableState.GetEffectiveDeployment()
pollerDeployment := worker_versioning.DeploymentFromCapabilities(request.PollRequest.WorkerVersionCapabilities)
pollerDeployment := worker_versioning.DeploymentFromCapabilities(request.PollRequest.WorkerVersionCapabilities, request.PollRequest.DeploymentOptions)
err = worker_versioning.ValidateTaskVersionDirective(request.GetVersionDirective(), wfBehavior, wfDeployment, request.ScheduledDeployment)
if err != nil {
return nil, false, err
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/recordworkflowtaskstarted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func Invoke(

wfBehavior := mutableState.GetEffectiveVersioningBehavior()
wfDeployment := mutableState.GetEffectiveDeployment()
pollerDeployment := worker_versioning.DeploymentFromCapabilities(req.PollRequest.WorkerVersionCapabilities)
pollerDeployment := worker_versioning.DeploymentFromCapabilities(req.PollRequest.WorkerVersionCapabilities, req.PollRequest.DeploymentOptions)
err = worker_versioning.ValidateTaskVersionDirective(req.GetVersionDirective(), wfBehavior, wfDeployment, req.ScheduledDeployment)
if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions service/history/api/updateworkflowoptions/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ var (
}
pinnedOverrideOptionsA = &workflowpb.WorkflowExecutionOptions{
VersioningOverride: &workflowpb.VersioningOverride{
Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED,
Deployment: &deploymentpb.Deployment{SeriesName: "X", BuildId: "A"},
Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED,
PinnedVersion: &deploymentpb.WorkerDeploymentVersion{DeploymentName: "X", Version: "A"},
},
}
pinnedOverrideOptionsB = &workflowpb.WorkflowExecutionOptions{
VersioningOverride: &workflowpb.VersioningOverride{
Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED,
Deployment: &deploymentpb.Deployment{SeriesName: "X", BuildId: "B"},
Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED,
PinnedVersion: &deploymentpb.WorkerDeploymentVersion{DeploymentName: "X", Version: "B"},
},
}
)
Expand Down
3 changes: 2 additions & 1 deletion service/history/historybuilder/event_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"go.temporal.io/server/common"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/worker_versioning"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -170,7 +171,7 @@ func (b *EventFactory) CreateWorkflowTaskCompletedEvent(
WorkerVersion: workerVersionStamp,
SdkMetadata: sdkMetadata,
MeteringMetadata: meteringMetadata,
Deployment: deployment,
DeploymentVersion: worker_versioning.DeploymentVersionFromDeployment(deployment),
VersioningBehavior: behavior,
},
}
Expand Down
5 changes: 4 additions & 1 deletion service/history/worker_versioning_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,10 @@ func MakeDirectiveForWorkflowTask(ms workflow.MutableState) *taskqueuespb.TaskVe

func MakeDirectiveForActivityTask(mutableState workflow.MutableState, activityInfo *persistencespb.ActivityInfo) *taskqueuespb.TaskVersionDirective {
if behavior := mutableState.GetEffectiveVersioningBehavior(); behavior != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED {
return &taskqueuespb.TaskVersionDirective{Behavior: behavior, Deployment: mutableState.GetEffectiveDeployment()}
d := mutableState.GetEffectiveDeployment()
return &taskqueuespb.TaskVersionDirective{Behavior: behavior,
DeploymentVersion: worker_versioning.DeploymentVersionFromDeployment(d),
}
}
if !activityInfo.UseCompatibleVersion && activityInfo.GetUseWorkflowBuildIdInfo() == nil {
return worker_versioning.MakeUseAssignmentRulesDirective()
Expand Down
4 changes: 2 additions & 2 deletions service/history/workflow/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func GetPendingActivityInfo(
now := shardContext.GetTimeSource().Now().UTC()

p := &workflowpb.PendingActivityInfo{
ActivityId: ai.ActivityId,
LastDeployment: ai.LastStartedDeployment,
ActivityId: ai.ActivityId,
LastDeploymentVersion: ai.LastDeploymentVersion,
}
if ai.GetUseWorkflowBuildIdInfo() != nil {
p.AssignedBuildId = &workflowpb.PendingActivityInfo_UseWorkflowBuildId{UseWorkflowBuildId: &emptypb.Empty{}}
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ type (

ShouldResetActivityTimerTaskMask(current, incoming *persistencespb.ActivityInfo) bool
// GetEffectiveDeployment returns the effective deployment in the following order:
// 1. DeploymentTransition.Deployment: this is returned when the wf is transitioning to a
// 1. DeploymentVersionTransition.Deployment: this is returned when the wf is transitioning to a
// new deployment
// 2. VersioningOverride.Deployment: this is returned when user has set a PINNED override
// at wf start time, or later via UpdateWorkflowExecutionOptions.
Expand Down
34 changes: 27 additions & 7 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3159,7 +3159,7 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent(
}
}

ai.LastStartedDeployment = deployment
ai.LastDeploymentVersion = worker_versioning.DeploymentVersionFromDeployment(deployment)

if !ai.HasRetryPolicy {
event := ms.hBuilder.AddActivityTaskStartedEvent(
Expand Down Expand Up @@ -4411,10 +4411,20 @@ func (ms *MutableStateImpl) ApplyWorkflowExecutionOptionsUpdatedEvent(event *his
override := event.GetWorkflowExecutionOptionsUpdatedEventAttributes().GetVersioningOverride()
previousEffectiveDeployment := ms.GetEffectiveDeployment()
previousEffectiveVersioningBehavior := ms.GetEffectiveVersioningBehavior()
if ms.GetExecutionInfo().GetVersioningInfo() == nil {
ms.GetExecutionInfo().VersioningInfo = &workflowpb.WorkflowExecutionVersioningInfo{}
if override != nil {
if ms.GetExecutionInfo().GetVersioningInfo() == nil {
ms.GetExecutionInfo().VersioningInfo = &workflowpb.WorkflowExecutionVersioningInfo{}
}
ms.GetExecutionInfo().VersioningInfo.VersioningOverride = &workflowpb.VersioningOverride{
Behavior: override.GetBehavior(),
// We read from both old and new fields but write in the new fields only.
//nolint:staticcheck // SA1019 deprecated Deployment will clean up later
PinnedVersion: worker_versioning.DeploymentVersionFromDeployment(worker_versioning.DeploymentOrVersion(override.GetDeployment(), override.GetPinnedVersion())),
}
} else if ms.GetExecutionInfo().GetVersioningInfo() != nil {
// TODO (shahab): this behavior has changed in main branch. Update it when merging to main.
ms.GetExecutionInfo().VersioningInfo.VersioningOverride = nil
}
ms.GetExecutionInfo().VersioningInfo.VersioningOverride = override

if !proto.Equal(ms.GetEffectiveDeployment(), previousEffectiveDeployment) ||
ms.GetEffectiveVersioningBehavior() != previousEffectiveVersioningBehavior {
Expand Down Expand Up @@ -4442,6 +4452,7 @@ func (ms *MutableStateImpl) ApplyWorkflowExecutionOptionsUpdatedEvent(event *his
// and it will start the same transition in the workflow. So removing the transition would not make a difference
// and would in fact add some extra work for the server.
ms.executionInfo.GetVersioningInfo().DeploymentTransition = nil
ms.executionInfo.GetVersioningInfo().VersionTransition = nil

// If effective deployment or behavior change, we need to reschedule any pending tasks, because History will reject
// the task's start request if the task is being started by a poller that is not from the workflow's effective
Expand Down Expand Up @@ -7312,7 +7323,7 @@ func (ms *MutableStateImpl) initVersionedTransitionInDB() {
}

// GetEffectiveDeployment returns the effective deployment in the following order:
// 1. DeploymentTransition.Deployment: this is returned when the wf is transitioning to a
// 1. DeploymentVersionTransition.Deployment: this is returned when the wf is transitioning to a
// new deployment
// 2. VersioningOverride.Deployment: this is returned when user has set a PINNED override
// at wf start time, or later via UpdateWorkflowExecutionOptions.
Expand All @@ -7327,6 +7338,12 @@ func (ms *MutableStateImpl) GetEffectiveDeployment() *deploymentpb.Deployment {
}

func (ms *MutableStateImpl) GetDeploymentTransition() *workflowpb.DeploymentTransition {
vi := ms.GetExecutionInfo().GetVersioningInfo()
if t := vi.GetVersionTransition(); t != nil {
return &workflowpb.DeploymentTransition{
Deployment: worker_versioning.DeploymentFromDeploymentVersion(t.GetDeploymentVersion()),
}
}
return ms.GetExecutionInfo().GetVersioningInfo().GetDeploymentTransition()
}

Expand Down Expand Up @@ -7361,8 +7378,11 @@ func (ms *MutableStateImpl) StartDeploymentTransition(deployment *deploymentpb.D
ms.GetExecutionInfo().VersioningInfo = versioningInfo
}

versioningInfo.DeploymentTransition = &workflowpb.DeploymentTransition{
Deployment: deployment,
// Only store transition in VersionTransition but read from both VersionTransition and DeploymentVersionTransition.
//nolint:staticcheck // SA1019 deprecated DeploymentTransition will clean up later
versioningInfo.DeploymentTransition = nil
versioningInfo.VersionTransition = &workflowpb.DeploymentVersionTransition{
DeploymentVersion: worker_versioning.DeploymentVersionFromDeployment(deployment),
}

// Because deployment is changed, we clear sticky queue to make sure the next wf task does not
Expand Down
Loading

0 comments on commit 6fbdc7b

Please sign in to comment.