Skip to content

Commit

Permalink
fix: Udf code object should have mutex
Browse files Browse the repository at this point in the history
Bug: 326086363
Fix: 326086363
Change-Id: I04ad0134a599b8b275f0c54b831c2ec65ba90d68
GitOrigin-RevId: 12a0af019ab40bde10f04a4a54ffbb65ab01b5ac
  • Loading branch information
lusayaa authored and copybara-github committed Dec 11, 2024
1 parent ab0b1ea commit c9db19a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 11 deletions.
27 changes: 16 additions & 11 deletions components/udf/udf_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,15 @@ class UdfClientImpl : public UdfClient {
absl::Status SetCodeObject(
CodeConfig code_config,
privacy_sandbox::server_common::log::PSLogContext& log_context) {
absl::MutexLock lock(&mutex_);
// Only update code if logical commit time is larger.
if (logical_commit_time_ >= code_config.logical_commit_time) {
if (code_object_metadata_.logical_commit_time >=
code_config.logical_commit_time) {
PS_VLOG(1, log_context)
<< "Not updating code object. logical_commit_time "
<< code_config.logical_commit_time
<< " too small, should be greater than " << logical_commit_time_;
<< " too small, should be greater than "
<< code_object_metadata_.logical_commit_time;
return absl::OkStatus();
}
std::shared_ptr<absl::Status> response_status =
Expand Down Expand Up @@ -262,12 +265,13 @@ class UdfClientImpl : public UdfClient {
<< "Error compiling UDF code object. " << *response_status;
return *response_status;
}
handler_name_ = std::move(code_config.udf_handler_name);
logical_commit_time_ = code_config.logical_commit_time;
version_ = code_config.version;
code_object_metadata_.handler_name =
std::move(code_config.udf_handler_name);
code_object_metadata_.logical_commit_time = code_config.logical_commit_time;
code_object_metadata_.version = code_config.version;
PS_VLOG(5, log_context)
<< "Successfully set UDF code object with handler_name "
<< handler_name_;
<< code_object_metadata_.handler_name;
return absl::OkStatus();
}

Expand All @@ -286,9 +290,10 @@ class UdfClientImpl : public UdfClient {
InvocationStrRequest<std::weak_ptr<RequestContext>> BuildInvocationRequest(
const RequestContextFactory& request_context_factory,
std::vector<std::string> input) const {
absl::ReaderMutexLock lock(&mutex_);
return {.id = kInvocationRequestId,
.version_string = absl::StrCat("v", version_),
.handler_name = handler_name_,
.version_string = absl::StrCat("v", code_object_metadata_.version),
.handler_name = code_object_metadata_.handler_name,
.tags = {{std::string(kTimeoutDurationTag),
FormatDuration(udf_timeout_)}},
.input = std::move(input),
Expand All @@ -304,9 +309,9 @@ class UdfClientImpl : public UdfClient {
.wasm = std::move(wasm)};
}

std::string handler_name_;
int64_t logical_commit_time_ = -1;
int64_t version_ = 1;
// Mutex for code_object_metadata_;
mutable absl::Mutex mutex_;
CodeObjectMetadata code_object_metadata_ ABSL_GUARDED_BY(mutex_);
const absl::Duration udf_timeout_;
const absl::Duration udf_update_timeout_;
int udf_min_log_level_;
Expand Down
6 changes: 6 additions & 0 deletions components/udf/udf_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ struct ExecutionMetadata {
std::optional<int64_t> custom_code_total_execution_time_micros;
};

struct CodeObjectMetadata {
std::string handler_name;
int64_t logical_commit_time = -1;
int64_t version = 1;
};

// Client to execute UDF
class UdfClient {
public:
Expand Down

0 comments on commit c9db19a

Please sign in to comment.