Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concord client request cmf changes #2639

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ CONCORD_BFT_RECONFIGURATION_CMF_PATHS?=${CONCORD_BFT_TARGET_SOURCE_PATH}/build/r
CONCORD_BFT_BFTENGINE_CMF_PATHS?=${CONCORD_BFT_TARGET_SOURCE_PATH}/build/bftengine/cmf
CONCORD_BFT_CCRON_CMF_PATHS?=${CONCORD_BFT_TARGET_SOURCE_PATH}/build/ccron/cmf
CONCORD_BFT_SKVBC_CMF_PATHS?=${CONCORD_BFT_TARGET_SOURCE_PATH}/build/tests/simpleKVBC/cmf
CONCORD_BFT_CONCORD_CLIENT_CMF_PATHS?=${CONCORD_BFT_TARGET_SOURCE_PATH}/build/client/concordclient/cmf
CONCORD_BFT_CLIENT_PROTO_PATH?=${CONCORD_BFT_TARGET_SOURCE_PATH}/build/client/proto
CONCORD_BFT_THIN_REPLICA_PROTO_PATH?=${CONCORD_BFT_TARGET_SOURCE_PATH}/build/thin-replica-server/proto
CONCORD_BFT_KVBC_PROTO_PATH?=${CONCORD_BFT_TARGET_SOURCE_PATH}/build/kvbc/proto
Expand Down Expand Up @@ -197,6 +198,7 @@ tidy-check: gen_cmake ## Run clang-tidy
make -C ${CONCORD_BFT_BFTENGINE_CMF_PATHS} &> /dev/null && \
make -C ${CONCORD_BFT_CCRON_CMF_PATHS} &> /dev/null && \
make -C ${CONCORD_BFT_SKVBC_CMF_PATHS} &> /dev/null && \
make -C ${CONCORD_BFT_CONCORD_CLIENT_CMF_PATHS} &> /dev/null && \
make -C ${CONCORD_BFT_CLIENT_PROTO_PATH} &> /dev/null && \
make -C ${CONCORD_BFT_THIN_REPLICA_PROTO_PATH} &> /dev/null && \
make -C ${CONCORD_BFT_KVBC_PROTO_PATH} &> /dev/null && \
Expand Down
2 changes: 2 additions & 0 deletions bftengine/include/bftengine/SharedTypes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ enum class OperationResult : uint32_t {
INTERNAL_ERROR
};

enum class RequestType : uint32_t { RAW_MESSAGE, ANY_MESSAGE };

} // namespace bftEngine
3 changes: 3 additions & 0 deletions client/bftclient/include/bftclient/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "bftclient/base_types.h"
#include "bftclient/quorums.h"
#include "secret_data.h"
#include "SharedTypes.hpp"

using namespace std::chrono_literals;

Expand Down Expand Up @@ -81,6 +82,8 @@ struct RequestConfig {
std::string span_context = "";
bool key_exchange = false;
bool reconfiguration = false;
bftEngine::RequestType request_type = bftEngine::RequestType::RAW_MESSAGE;
std::string client_service_id = "";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client_service_id can be moved to ClientConfig since it is not request specific but specific to a client

};

// The configuration for a single write request.
Expand Down
1 change: 1 addition & 0 deletions client/client_pool/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ target_link_libraries(concord_client_pool PUBLIC
bftclient
bftclient_new
corebft
concord_client_message
)

install (TARGETS concord_client_pool DESTINATION lib${LIB_SUFFIX})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ class ConcordClientPool {
uint64_t seq_num,
std::string correlation_id = {},
const std::string& span_context = std::string(),
const bftEngine::RequestType request_type = bftEngine::RequestType::RAW_MESSAGE,
const std::string& client_service_id = std::string(),
const bftEngine::RequestCallBack& callback = {});

// This method is responsible to get write requests with the new client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "client_pool_config.hpp"
#include "communication/StatusInfo.h"
#include "external_client_exception.hpp"
#include "concord_client_message.cmf.hpp"

namespace concord {

Expand Down Expand Up @@ -104,6 +105,12 @@ class ConcordClient {

std::string messageSignature(bft::client::Msg&);

void prepareConcordClientRequest(bft::client::Msg& request,
bftEngine::RequestType request_type,
const std::string& client_service_id);

void prepareConcordClientResponse(bft::client::Msg& response);

private:
void CreateClient(std::shared_ptr<concordMetrics::Aggregator> aggregator);

Expand Down
18 changes: 16 additions & 2 deletions client/client_pool/src/concord_client_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ SubmitResult ConcordClientPool::SendRequest(std::vector<uint8_t> &&request,
uint64_t seq_num,
std::string correlation_id,
const std::string &span_context,
const bftEngine::RequestType request_type,
const std::string &client_service_id,
const bftEngine::RequestCallBack &callback) {
if (callback && timeout_ms.count() == 0) {
callback(bftEngine::SendResult{static_cast<uint32_t>(OperationResult::INVALID_REQUEST)});
Expand All @@ -56,6 +58,8 @@ SubmitResult ConcordClientPool::SendRequest(std::vector<uint8_t> &&request,

while (!clients_.empty() && serving_candidates != 0) {
auto client = clients_.front();
client->prepareConcordClientRequest(request, request_type, client_service_id);
LOG_DEBUG(logger_, "In ConcordClientPool::SendRequest completed packing concord client request to cmf format");
client_id = client->getClientId();
if (is_overloaded_) {
is_overloaded_ = false;
Expand Down Expand Up @@ -224,6 +228,8 @@ SubmitResult ConcordClientPool::SendRequest(const bft::client::WriteConfig &conf
config.request.sequence_number,
config.request.correlation_id,
config.request.span_context,
config.request.request_type,
config.request.client_service_id,
callback);
}

Expand All @@ -245,6 +251,8 @@ SubmitResult ConcordClientPool::SendRequest(const bft::client::ReadConfig &confi
config.request.sequence_number,
config.request.correlation_id,
config.request.span_context,
config.request.request_type,
config.request.client_service_id,
callback);
}

Expand Down Expand Up @@ -487,10 +495,16 @@ void SingleRequestProcessingJob::execute() {
OperationResult operation_result = processing_client_->getRequestExecutionResult();
reply_size = res.matched_data.size();
if (callback_) {
if (operation_result == OperationResult::SUCCESS)
logging::Logger logger_(logging::getLogger("com.vmware.SingleRequestProcessingJob"));
if (operation_result == OperationResult::SUCCESS) {
processing_client_->prepareConcordClientResponse(res.matched_data);
LOG_DEBUG(logger_,
"In SingleRequestProcessingJob::execute completed extracting concord client response from cmf packing");
reply_size = res.matched_data.size();
callback_(res);
else
} else {
callback_(static_cast<uint32_t>(operation_result));
}
}
external_client::ConcordClient::PendingReplies replies;
replies.push_back(ClientReply{static_cast<uint32_t>(request_.size()),
Expand Down
23 changes: 22 additions & 1 deletion client/client_pool/src/external_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ std::pair<int32_t, ConcordClient::PendingReplies> ConcordClient::SendPendingRequ
try {
LOG_INFO(logger_, "Batch processing started" << KVLOG(client_id_, batch_cid));
auto received_replies_map = new_client_->sendBatch(request_queue, batch_cid);
for (const auto& received_reply_entry : received_replies_map) {
for (auto& received_reply_entry : received_replies_map) {
bandatarunkumar marked this conversation as resolved.
Show resolved Hide resolved
const auto received_reply_seq_num = received_reply_entry.first;
const auto& pending_seq_num_to_cid_entry = seq_num_to_cid.find(received_reply_seq_num);
if (pending_seq_num_to_cid_entry == seq_num_to_cid.end()) {
Expand All @@ -195,6 +195,9 @@ std::pair<int32_t, ConcordClient::PendingReplies> ConcordClient::SendPendingRequ
}
auto cid = pending_seq_num_to_cid_entry->second;
cid_response_map_[cid] = std::chrono::steady_clock::now();
prepareConcordClientResponse(received_reply_entry.second.matched_data);
LOG_DEBUG(logger_,
"In ConcordClient::SendPendingRequests completed extracting concord client response from cmf packing");
auto data_size = received_reply_entry.second.matched_data.size();
for (auto& pending_reply : pending_replies_) {
if (pending_reply.cid != cid) continue;
Expand Down Expand Up @@ -392,6 +395,24 @@ OperationResult ConcordClient::getRequestExecutionResult() { return clientReques

std::string ConcordClient::messageSignature(bft::client::Msg& message) { return new_client_->signMessage(message); }

void ConcordClient::prepareConcordClientRequest(bft::client::Msg& request,
bftEngine::RequestType request_type,
const std::string& client_service_id) {
concord::client::message::ConcordClientRequest concord_request;
concord_request.request_type = static_cast<decltype(concord_request.request_type)>(request_type);
concord_request.client_service_id = static_cast<decltype(concord_request.client_service_id)>(client_service_id);
concord_request.application_request = std::vector<uint8_t>(request.begin(), request.end());
request.clear();
concord::client::message::serialize(request, concord_request);
}

void ConcordClient::prepareConcordClientResponse(bft::client::Msg& response) {
concord::client::message::ConcordClientResponse concord_response;
concord::client::message::deserialize(response, concord_response);
response.clear();
response.assign(concord_response.application_response.begin(), concord_response.application_response.end());
}

void ConcordClient::stopClientComm() { new_client_->stop(); }

} // namespace concord::external_client
26 changes: 16 additions & 10 deletions client/clientservice/src/request_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@

#include "client/clientservice/request_service.hpp"
#include "client/concordclient/concord_client.hpp"
#include "concord_client_request.pb.h"
#include "client/thin-replica-client/trace_contexts.hpp"

using namespace client::thin_replica_client;
using namespace vmware::concord::client::concord_client_request::v1;

namespace concord::client::clientservice {

Expand Down Expand Up @@ -75,12 +73,9 @@ void RequestServiceCallData::sendToConcordClient() {
bool is_any_request_type = false;
bft::client::Msg msg;
if (request_.has_typed_request()) {
ConcordClientRequest concord_request;
concord_request.set_client_service_id(client_->getSubscriptionId());
concord_request.mutable_application_request()->CopyFrom(request_.typed_request());
size_t request_size = concord_request.ByteSizeLong();
size_t request_size = request_.typed_request().ByteSizeLong();
std::string request(request_size, '\0');
concord_request.SerializeToArray(request.data(), request_size);
request_.typed_request().SerializeToArray(request.data(), request_size);
msg = bft::client::Msg(request.begin(), request.end());
is_any_request_type = true;
} else {
Expand All @@ -95,6 +90,12 @@ void RequestServiceCallData::sendToConcordClient() {
req_config.pre_execute = request_.pre_execute();
req_config.timeout = timeout;
req_config.correlation_id = request_.correlation_id();
if (request_.has_typed_request()) {
req_config.request_type = bftEngine::RequestType::ANY_MESSAGE;
} else {
req_config.request_type = bftEngine::RequestType::RAW_MESSAGE;
}
req_config.client_service_id = client_->getSubscriptionId();

auto callback = [this, req_config, is_any_request_type](concord::client::concordclient::SendResult&& send_result) {
grpc::Status status;
Expand Down Expand Up @@ -163,15 +164,20 @@ void RequestServiceCallData::sendToConcordClient() {

// Check if the application response is of Any Type then set it to Any response.
if (is_any_request_type) {
ConcordClientResponse concord_response;
if (!concord_response.ParseFromArray(data.c_str(), data.size())) {
google::protobuf::Any* app_response = this->response_.mutable_typed_response();
if (!app_response->ParseFromArray(data.c_str(), data.size())) {
status = grpc::Status(grpc::StatusCode::INTERNAL, "Internal error in parsing typed response");
this->populateResult(status);
return;
}
this->response_.mutable_typed_response()->CopyFrom(concord_response.application_response());
LOG_DEBUG(logger_,
"In RequestServiceCallData::sendToConcordClient callback function done parsing application ANY Data "
"and updated the response");
} else {
this->response_.set_raw_response(std::move(data));
LOG_DEBUG(logger_,
"In RequestServiceCallData::sendToConcordClient callback function done parsing application RAW Data "
"and updated the response");
}

this->populateResult(grpc::Status::OK);
Expand Down
4 changes: 4 additions & 0 deletions client/concordclient/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
add_library(concordclient "src/concord_client.cpp")
target_include_directories(concordclient PUBLIC include)
# TODO: Mark libraries as PRIVATE once the interface is selfcontained

add_subdirectory("cmf")

target_link_libraries(concordclient PUBLIC
thin_replica_client_lib
concord_client_message
concord_client_pool
concordclient-event-api
util
Expand Down
4 changes: 4 additions & 0 deletions client/concordclient/cmf/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
cmf_generate_cpp(header cpp concord::client::message concord_client_message.cmf)
add_library(concord_client_message ${cpp})
set_target_properties(concord_client_message PROPERTIES LINKER_LANGUAGE CXX)
target_include_directories(concord_client_message PUBLIC ${CMAKE_CURRENT_BINARY_DIR})
19 changes: 19 additions & 0 deletions client/concordclient/cmf/concord_client_message.cmf
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# ConcordClientRequest is used to send application request along
# with clientservice specific information to the blockchain network.

Msg ConcordClientRequest 1 {
# request_type is type of input request, which can be of RAW or ANY request
uint32 request_type
# Client service ID or thin replica client's subscription ID
# used for filtering events for this client.
string client_service_id
# Required application request which gets evaluated by the execution engine.
bytes application_request
}

Msg ConcordClientResponse 2 {
# Required application response which is returned by the execution engine.
uint32 type
bytes application_response
}

This file was deleted.

2 changes: 0 additions & 2 deletions client/proto/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@ protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS ${CMAKE_CURRENT_BINARY_DIR}
request/v1/request.proto
event/v1/event.proto
state_snapshot/v1/state_snapshot.proto
../concordclient/proto/concord_client_request/v1/concord_client_request.proto
)
grpc_generate_cpp(GRPC_SRCS GRPC_HDRS ${CMAKE_CURRENT_BINARY_DIR}
request/v1/request.proto
event/v1/event.proto
state_snapshot/v1/state_snapshot.proto
../concordclient/proto/concord_client_request/v1/concord_client_request.proto
)

add_library(clientservice-proto STATIC ${PROTO_SRCS} ${GRPC_SRCS})
Expand Down