Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch task queue user data persistence updates #7039

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
25 changes: 25 additions & 0 deletions common/clock/event_time_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ package clock
import (
"sync"
"time"

"go.temporal.io/server/common/util"
)

type (
Expand Down Expand Up @@ -135,6 +137,23 @@ func (ts *EventTimeSource) Advance(d time.Duration) {
ts.fireTimers()
}

// AdvanceNext advances to the next timer.
func (ts *EventTimeSource) AdvanceNext() {
ts.mu.Lock()
defer ts.mu.Unlock()

if len(ts.timers) == 0 {
return
}
// just do linear search, this is efficient enough for now
tmin := ts.timers[0].deadline
for _, t := range ts.timers[1:] {
tmin = util.MinTime(tmin, t.deadline)
}
ts.now = tmin
ts.fireTimers()
}

// NumTimers returns the number of outstanding timers.
func (ts *EventTimeSource) NumTimers() int {
ts.mu.Lock()
Expand All @@ -143,6 +162,12 @@ func (ts *EventTimeSource) NumTimers() int {
return len(ts.timers)
}

// Sleep is a convenience function for waiting on a new timer.
func (ts *EventTimeSource) Sleep(d time.Duration) {
t, _ := ts.NewTimer(d)
<-t
}

// fireTimers fires all timers that are ready.
func (ts *EventTimeSource) fireTimers() {
n := 0
Expand Down
84 changes: 46 additions & 38 deletions common/persistence/cassandra/matching_task_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,53 +522,61 @@ func (d *MatchingTaskStore) UpdateTaskQueueUserData(
) error {
batch := d.Session.NewBatch(gocql.UnloggedBatch).WithContext(ctx)

if request.Version == 0 {
batch.Query(templateInsertTaskQueueUserDataQuery,
request.NamespaceID,
request.TaskQueue,
request.UserData.Data,
request.UserData.EncodingType.String(),
)
} else {
batch.Query(templateUpdateTaskQueueUserDataQuery,
request.UserData.Data,
request.UserData.EncodingType.String(),
request.Version+1,
request.NamespaceID,
request.TaskQueue,
request.Version,
)
}
for _, buildId := range request.BuildIdsAdded {
batch.Query(templateInsertBuildIdTaskQueueMappingQuery, request.NamespaceID, buildId, request.TaskQueue)
}
for _, buildId := range request.BuildIdsRemoved {
batch.Query(templateDeleteBuildIdTaskQueueMappingQuery, request.NamespaceID, buildId, request.TaskQueue)
for taskQueue, update := range request.Updates {
if update.Version == 0 {
batch.Query(templateInsertTaskQueueUserDataQuery,
request.NamespaceID,
taskQueue,
update.UserData.Data,
update.UserData.EncodingType.String(),
)
} else {
batch.Query(templateUpdateTaskQueueUserDataQuery,
update.UserData.Data,
update.UserData.EncodingType.String(),
update.Version+1,
request.NamespaceID,
taskQueue,
update.Version,
)
}
for _, buildId := range update.BuildIdsAdded {
batch.Query(templateInsertBuildIdTaskQueueMappingQuery, request.NamespaceID, buildId, taskQueue)
}
for _, buildId := range update.BuildIdsRemoved {
batch.Query(templateDeleteBuildIdTaskQueueMappingQuery, request.NamespaceID, buildId, taskQueue)
}
}

previous := make(map[string]interface{})
previous := make(map[string]any)
applied, iter, err := d.Session.MapExecuteBatchCAS(batch, previous)
stephanos marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
return gocql.ConvertError("UpdateTaskQueueUserData", err)
}

// We only care about the conflict in the first query
err = iter.Close()
if err != nil {
return gocql.ConvertError("UpdateTaskQueueUserData", err)
}
defer iter.Close()
ychebotarev marked this conversation as resolved.
Show resolved Hide resolved

if !applied {
var columns []string
for k, v := range previous {
columns = append(columns, fmt.Sprintf("%s=%v", k, v))
}

return &p.ConditionFailedError{
Msg: fmt.Sprintf("Failed to update task queue. name: %v, version: %v, columns: (%v)",
request.TaskQueue, request.Version, strings.Join(columns, ",")),
// No error, but not applied. That means we had a conflict.
// Iterate through results to identify first conflicting row.
for {
name, nameErr := getTypedFieldFromRow[string]("task_queue_name", previous)
previousVersion, verErr := getTypedFieldFromRow[int64]("version", previous)
update, hasUpdate := request.Updates[name]
if nameErr == nil && verErr == nil && hasUpdate && update.Version != previousVersion {
if update.Conflicting != nil {
*update.Conflicting = true
}
return &p.ConditionFailedError{
Msg: fmt.Sprintf("Failed to update task queues: task queue %q version %d != %d",
name, update.Version, previousVersion),
}
}
clear(previous)
if !iter.MapScan(previous) {
break
}
}
return &p.ConditionFailedError{Msg: "Failed to update task queues: unknown conflict"}
Comment on lines +559 to +579
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible that I missed it (it's a big PR), but this is untested, isn't it? If so, that makes me a little nervous.

}

return nil
Expand Down
16 changes: 13 additions & 3 deletions common/persistence/data_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,13 +523,23 @@ type (
UserData *persistencespb.VersionedTaskQueueUserData
}

// UpdateTaskQueueUserDataRequest is the input type for the UpdateTaskQueueUserData API
// UpdateTaskQueueUserDataRequest is the input type for the UpdateTaskQueueUserData API.
// This updates user data for multiple task queues in one namespace.
UpdateTaskQueueUserDataRequest struct {
NamespaceID string
TaskQueue string
NamespaceID string
Updates map[string]*SingleTaskQueueUserDataUpdate // key is task queue name
stephanos marked this conversation as resolved.
Show resolved Hide resolved
}

SingleTaskQueueUserDataUpdate struct {
UserData *persistencespb.VersionedTaskQueueUserData
BuildIdsAdded []string
BuildIdsRemoved []string
// If Conflicting is non-nil, and this single update fails due to a version conflict,
// then it will be set to true. Conflicting updates should not be retried.
// Note that even if Conflicting is not set to true, the update may still be
// conflicting, because persistence implementations may only be able to identify the
// first conflict in a set.
Conflicting *bool
}

ListTaskQueueUserDataEntriesRequest struct {
Expand Down
10 changes: 7 additions & 3 deletions common/persistence/persistence_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,12 +289,16 @@ type (

InternalUpdateTaskQueueUserDataRequest struct {
NamespaceID string
TaskQueue string
Version int64
UserData *commonpb.DataBlob
Updates map[string]*InternalSingleTaskQueueUserDataUpdate // key is task queue name
}

InternalSingleTaskQueueUserDataUpdate struct {
Version int64
UserData *commonpb.DataBlob
// Used to build an index of build_id to task_queues
BuildIdsAdded []string
BuildIdsRemoved []string
Conflicting *bool
}

InternalTaskQueueUserDataEntry struct {
Expand Down
57 changes: 31 additions & 26 deletions common/persistence/sql/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,38 +481,43 @@ func (m *sqlTaskManager) UpdateTaskQueueUserData(ctx context.Context, request *p
return serviceerror.NewInternal(fmt.Sprintf("failed to parse namespace ID as UUID: %v", err))
}
err = m.txExecute(ctx, "UpdateTaskQueueUserData", func(tx sqlplugin.Tx) error {
err := tx.UpdateTaskQueueUserData(ctx, &sqlplugin.UpdateTaskQueueDataRequest{
NamespaceID: namespaceID,
TaskQueueName: request.TaskQueue,
Data: request.UserData.Data,
DataEncoding: request.UserData.EncodingType.String(),
Version: request.Version,
})
if m.Db.IsDupEntryError(err) {
return &persistence.ConditionFailedError{Msg: err.Error()}
}
if err != nil {
return err
}
if len(request.BuildIdsAdded) > 0 {
err = tx.AddToBuildIdToTaskQueueMapping(ctx, sqlplugin.AddToBuildIdToTaskQueueMapping{
for taskQueue, update := range request.Updates {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be up to 100 separate Update writes, right? I assume self-hosted users won't usually see a lot of parallel updates?

err := tx.UpdateTaskQueueUserData(ctx, &sqlplugin.UpdateTaskQueueDataRequest{
NamespaceID: namespaceID,
TaskQueueName: request.TaskQueue,
BuildIds: request.BuildIdsAdded,
TaskQueueName: taskQueue,
Data: update.UserData.Data,
DataEncoding: update.UserData.EncodingType.String(),
Version: update.Version,
})
if err != nil {
return err
if m.Db.IsDupEntryError(err) {
err = &persistence.ConditionFailedError{Msg: err.Error()}
}
if persistence.IsConflictErr(err) && update.Conflicting != nil {
*update.Conflicting = true
}
}
if len(request.BuildIdsRemoved) > 0 {
err = tx.RemoveFromBuildIdToTaskQueueMapping(ctx, sqlplugin.RemoveFromBuildIdToTaskQueueMapping{
NamespaceID: namespaceID,
TaskQueueName: request.TaskQueue,
BuildIds: request.BuildIdsRemoved,
})
if err != nil {
return err
}
if len(update.BuildIdsAdded) > 0 {
err = tx.AddToBuildIdToTaskQueueMapping(ctx, sqlplugin.AddToBuildIdToTaskQueueMapping{
NamespaceID: namespaceID,
TaskQueueName: taskQueue,
BuildIds: update.BuildIdsAdded,
})
if err != nil {
return err
}
}
if len(update.BuildIdsRemoved) > 0 {
err = tx.RemoveFromBuildIdToTaskQueueMapping(ctx, sqlplugin.RemoveFromBuildIdToTaskQueueMapping{
NamespaceID: namespaceID,
TaskQueueName: taskQueue,
BuildIds: update.BuildIdsRemoved,
})
if err != nil {
return err
}
}
}
return nil
})
Expand Down
25 changes: 15 additions & 10 deletions common/persistence/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,17 +256,22 @@ func (m *taskManagerImpl) GetTaskQueueUserData(ctx context.Context, request *Get

// UpdateTaskQueueUserData implements TaskManager
func (m *taskManagerImpl) UpdateTaskQueueUserData(ctx context.Context, request *UpdateTaskQueueUserDataRequest) error {
userData, err := m.serializer.TaskQueueUserDataToBlob(request.UserData.Data, enumspb.ENCODING_TYPE_PROTO3)
if err != nil {
return err
}
internalRequest := &InternalUpdateTaskQueueUserDataRequest{
NamespaceID: request.NamespaceID,
TaskQueue: request.TaskQueue,
Version: request.UserData.Version,
UserData: userData,
BuildIdsAdded: request.BuildIdsAdded,
BuildIdsRemoved: request.BuildIdsRemoved,
NamespaceID: request.NamespaceID,
Updates: make(map[string]*InternalSingleTaskQueueUserDataUpdate, len(request.Updates)),
}
for taskQueue, update := range request.Updates {
userData, err := m.serializer.TaskQueueUserDataToBlob(update.UserData.Data, enumspb.ENCODING_TYPE_PROTO3)
if err != nil {
return err
ychebotarev marked this conversation as resolved.
Show resolved Hide resolved
}
internalRequest.Updates[taskQueue] = &InternalSingleTaskQueueUserDataUpdate{
Version: update.UserData.Version,
UserData: userData,
BuildIdsAdded: update.BuildIdsAdded,
BuildIdsRemoved: update.BuildIdsRemoved,
Conflicting: update.Conflicting,
}
}
return m.taskStore.UpdateTaskQueueUserData(ctx, internalRequest)
}
Expand Down
13 changes: 13 additions & 0 deletions common/persistence/tests/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,19 @@ func TestCassandraTaskQueueTaskSuite(t *testing.T) {
suite.Run(t, s)
}

func TestCassandraTaskQueueUserDataSuite(t *testing.T) {
testData, tearDown := setUpCassandraTest(t)
defer tearDown()

taskQueueStore, err := testData.Factory.NewTaskStore()
if err != nil {
t.Fatalf("unable to create Cassandra DB: %v", err)
}

s := NewTaskQueueUserDataSuite(t, taskQueueStore, testData.Logger)
suite.Run(t, s)
}

func TestCassandraHistoryV2Persistence(t *testing.T) {
s := new(persistencetests.HistoryV2PersistenceSuite)
s.TestBase = persistencetests.NewTestBaseWithCassandra(&persistencetests.TestBaseOptions{})
Expand Down
13 changes: 13 additions & 0 deletions common/persistence/tests/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,19 @@ func TestMySQLTaskQueueTaskSuite(t *testing.T) {
suite.Run(t, s)
}

func TestMySQLTaskQueueUserDataSuite(t *testing.T) {
testData, tearDown := setUpMySQLTest(t)
defer tearDown()

taskQueueStore, err := testData.Factory.NewTaskStore()
if err != nil {
t.Fatalf("unable to create MySQL DB: %v", err)
}

s := NewTaskQueueUserDataSuite(t, taskQueueStore, testData.Logger)
suite.Run(t, s)
}

func TestMySQLVisibilityPersistenceSuite(t *testing.T) {
s := &VisibilityPersistenceSuite{
TestBase: persistencetests.NewTestBaseWithSQL(persistencetests.GetMySQLTestClusterOption()),
Expand Down
13 changes: 13 additions & 0 deletions common/persistence/tests/postgresql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,19 @@ func (p *PostgreSQLSuite) TestPostgreSQLTaskQueueTaskSuite() {
suite.Run(p.T(), s)
}

func (p *PostgreSQLSuite) TestPostgreSQLTaskQueueUserDataSuite() {
testData, tearDown := setUpPostgreSQLTest(p.T(), p.pluginName)
defer tearDown()

taskQueueStore, err := testData.Factory.NewTaskStore()
if err != nil {
p.T().Fatalf("unable to create PostgreSQL DB: %v", err)
}

s := NewTaskQueueUserDataSuite(p.T(), taskQueueStore, testData.Logger)
suite.Run(p.T(), s)
}

func (p *PostgreSQLSuite) TestPostgreSQLVisibilityPersistenceSuite() {
s := &VisibilityPersistenceSuite{
TestBase: persistencetests.NewTestBaseWithSQL(persistencetests.GetPostgreSQLTestClusterOption()),
Expand Down
Loading
Loading