Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding more metrics. #161

Merged
merged 1 commit into from
Apr 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/Constants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,16 @@ const char* INDEX_STATUS_READY = "ready";
const char* INDEX_STATUS_BUILDING = "building";
const char* INDEX_STATUS_ERROR = "error";

const char* MT_GUAGE_ACTIVE_CONNECTIONS = "dl_active_connections";
const char* MT_GUAGE_ACTIVE_CURSORS = "dl_active_cursors";
const char* MT_HIST_MESSAGE_SZ = "dl_message_size_bytes";
const char* MT_TIME_QUERY_LATENCY_US = "dl_query_latency_useconds";
const char* MT_HIST_KEYS_PER_DOCUMENT = "dl_keys_per_doc";
const char* MT_HIST_DOCUMENT_SZ = "dl_doc_size_bytes";
const char* MT_HIST_DOCS_PER_INSERT = "dl_docs_per_insert";
const char* MT_TIME_INSERT_LATENCY_US = "dl_insert_latency_useconds";
const char* MT_HIST_INSERT_SZ = "dl_insert_size_bytes";
const char* MT_HIST_TR_PER_REQUEST = "dl_tr_per_request";
const char* MT_RATE_IDX_REBUILD = "dl_index_rebuild_rate";

} // namespace DocLayerConstants
13 changes: 13 additions & 0 deletions src/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ extern const char* INDEX_STATUS_READY;
extern const char* INDEX_STATUS_BUILDING;
extern const char* INDEX_STATUS_ERROR;

// Metrics
extern const char* MT_GUAGE_ACTIVE_CONNECTIONS;
extern const char* MT_GUAGE_ACTIVE_CURSORS;
extern const char* MT_HIST_MESSAGE_SZ;
extern const char* MT_TIME_QUERY_LATENCY_US;
extern const char* MT_HIST_KEYS_PER_DOCUMENT;
extern const char* MT_HIST_DOCUMENT_SZ;
extern const char* MT_HIST_DOCS_PER_INSERT;
extern const char* MT_TIME_INSERT_LATENCY_US;
extern const char* MT_HIST_INSERT_SZ;
extern const char* MT_HIST_TR_PER_REQUEST;
extern const char* MT_RATE_IDX_REBUILD;

} // namespace DocLayerConstants

#endif // FDB_DOC_LAYER_CONSTANTS_H
12 changes: 7 additions & 5 deletions src/Cursor.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/

#include "Cursor.h"
#include "DocLayer.h"
#include "Knobs.h"

int32_t Cursor::prune(std::map<int64_t, Reference<Cursor>>& cursors) {
Expand All @@ -33,7 +34,7 @@ int32_t Cursor::prune(std::map<int64_t, Reference<Cursor>>& cursors) {
++it;
}

for (auto i : to_be_pruned) {
for (const auto& i : to_be_pruned) {
(void)pluck(i);
pruned++;
}
Expand All @@ -45,13 +46,14 @@ void Cursor::pluck(Reference<Cursor> cursor) {
if (cursor) {
cursor->siblings->erase(cursor->id);
cursor->checkpoint->stop();
DocumentLayer::metricReporter->captureGauge(DocLayerConstants::MT_GUAGE_ACTIVE_CURSORS,
cursor->siblings->size());
}
}

Reference<Cursor> Cursor::add(std::map<int64_t, Reference<Cursor>>& siblings, Reference<Cursor> cursor) {
cursor->siblings = &siblings;

// FIXME: limit the number of allowed cursors?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why removing this comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created an issue #160


return siblings[cursor->id] = cursor;
siblings[cursor->id] = cursor;
DocumentLayer::metricReporter->captureGauge(DocLayerConstants::MT_GUAGE_ACTIVE_CURSORS, siblings.size());
return cursor;
}
10 changes: 6 additions & 4 deletions src/DocLayer.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ ACTOR Future<Void> extServerConnection(Reference<DocumentLayer> docLayer,
state PromiseStream<std::pair<int, Future<Void>>> msg_size_inuse;
state Future<Void> onError = ec->bc->onClosed() || popDisposedMessages(bc, msg_size_inuse.getFuture());

DocumentLayer::metricReporter->captureGauge("activeConnections", ++docLayer->nrConnections);
DocumentLayer::metricReporter->captureGauge(DocLayerConstants::MT_GUAGE_ACTIVE_CONNECTIONS,
++docLayer->nrConnections);
try {
ec->startHousekeeping();

Expand All @@ -205,8 +206,8 @@ ACTOR Future<Void> extServerConnection(Reference<DocumentLayer> docLayer,
Void _ = wait(ec->bc->onBytesAvailable(header->messageLength));
auto messageBytes = ec->bc->peekExact(header->messageLength);

DocumentLayer::metricReporter->captureHistogram("messageLength", header->messageLength);
DocumentLayer::metricReporter->captureMeter("messageRate", 1);
DocumentLayer::metricReporter->captureHistogram(DocLayerConstants::MT_HIST_MESSAGE_SZ,
header->messageLength);

/* We don't use hdr in this call because the second peek may
have triggered a copy that the first did not, but it's nice
Expand All @@ -222,7 +223,8 @@ ACTOR Future<Void> extServerConnection(Reference<DocumentLayer> docLayer,
}
}
} catch (Error& e) {
DocumentLayer::metricReporter->captureGauge("activeConnections", --docLayer->nrConnections);
DocumentLayer::metricReporter->captureGauge(DocLayerConstants::MT_GUAGE_ACTIVE_CONNECTIONS,
--docLayer->nrConnections);
return Void();
}
}
Expand Down
27 changes: 21 additions & 6 deletions src/ExtMsg.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,6 @@ ACTOR static Future<Void> doRun(Reference<ExtMsgQuery> query, Reference<ExtConne
state Future<Void> x;
state uint64_t startTime = timer_int();

DocumentLayer::metricReporter->captureMeter("queryRate", 1);
if (query->isCmd) {
// It's a command
x = runCommand(ec, query, replyStream);
Expand All @@ -513,7 +512,8 @@ ACTOR static Future<Void> doRun(Reference<ExtMsgQuery> query, Reference<ExtConne
}
}

DocumentLayer::metricReporter->captureTime("queryLatency_us", (timer_int() - startTime) / 1000);
DocumentLayer::metricReporter->captureTime(DocLayerConstants::MT_TIME_QUERY_LATENCY_US,
(timer_int() - startTime) / 1000);

return Void();
}
Expand Down Expand Up @@ -647,10 +647,13 @@ ACTOR Future<Reference<IReadWriteContext>> insertDocument(Reference<CollectionCo

dcx->set(LiteralStringRef(""), DataValue::subObject().encode_value());

int nrFDBKeys = 0;
for (auto i = d.begin(); i.more();) {
auto e = i.next();
insertElementRecursive(e, dcx);
nrFDBKeys += insertElementRecursive(e, dcx);
}
DocumentLayer::metricReporter->captureHistogram(DocLayerConstants::MT_HIST_KEYS_PER_DOCUMENT, nrFDBKeys);
DocumentLayer::metricReporter->captureHistogram(DocLayerConstants::MT_HIST_DOCUMENT_SZ, d.objsize());

if (idObj.present())
insertElementRecursive(DocLayerConstants::ID_FIELD, idObj.get(), dcx);
Expand Down Expand Up @@ -835,6 +838,7 @@ ACTOR Future<WriteCmdResult> doInsertCmd(Namespace ns,
std::list<bson::BSONObj>* documents,
Reference<ExtConnection> ec) {
state Reference<DocTransaction> tr = ec->getOperationTransaction();
state uint64_t startTime = timer_int();

if (ns.second == DocLayerConstants::SYSTEM_INDEXES) {
if (verboseLogging)
Expand All @@ -847,11 +851,15 @@ ACTOR Future<WriteCmdResult> doInsertCmd(Namespace ns,
const char* collnsStr = firstDoc.getField(DocLayerConstants::NS_FIELD).String().c_str();
const auto collns = getDBCollectionPair(collnsStr, std::make_pair("msg", "Bad coll name in index insert"));
WriteCmdResult result = wait(attemptIndexInsertion(firstDoc.getOwned(), ec, tr, collns));

DocumentLayer::metricReporter->captureTime(DocLayerConstants::MT_TIME_INSERT_LATENCY_US,
(timer_int() - startTime) / 1000);
return result;
}

std::vector<Reference<IInsertOp>> inserts;
std::set<std::string> ids;
int insertSize = 0;
for (const auto& d : *documents) {
const bson::BSONObj& obj = d;
Optional<IdInfo> encodedIds = extractEncodedIds(obj);
Expand All @@ -861,10 +869,16 @@ ACTOR Future<WriteCmdResult> doInsertCmd(Namespace ns,
}
}
inserts.push_back(Reference<IInsertOp>(new ExtInsert(obj, encodedIds)));
insertSize += obj.objsize();
}
DocumentLayer::metricReporter->captureHistogram(DocLayerConstants::MT_HIST_INSERT_SZ, insertSize);
DocumentLayer::metricReporter->captureHistogram(DocLayerConstants::MT_HIST_DOCS_PER_INSERT, documents->size());

Reference<Plan> plan = ec->isolatedWrapOperationPlan(ref(new InsertPlan(inserts, ec->mm, ns)));
int64_t i = wait(executeUntilCompletionTransactionally(plan, tr));

DocumentLayer::metricReporter->captureTime(DocLayerConstants::MT_TIME_INSERT_LATENCY_US,
(timer_int() - startTime) / 1000);
return WriteCmdResult(i);
}

Expand Down Expand Up @@ -1273,9 +1287,10 @@ std::string ExtMsgKillCursors::toString() {

Future<Void> doKillCursorsRun(Reference<ExtMsgKillCursors> msg, Reference<ExtConnection> ec) {
int64_t* ptr = msg->cursorIDs;
int32_t numberOfCursorIDs =
msg->numberOfCursorIDs; // FIXME: I'm not quite sure what the contract around the memory owned by
// BufferedConnection is. So do this copy for now to be conservative.

// FIXME: I'm not quite sure what the contract around the memory owned by
// BufferedConnection is. So do this copy for now to be conservative.
int32_t numberOfCursorIDs = msg->numberOfCursorIDs;

while (numberOfCursorIDs--) {
Cursor::pluck(ec->cursors[*ptr++]);
Expand Down
72 changes: 46 additions & 26 deletions src/ExtUtil.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,86 +73,106 @@ std::string encodeMaybeDotted(std::string fieldname) {
return path;
}

void insertElementRecursive(std::string fn, bson::BSONObj const& obj, Reference<IReadWriteContext> cx) {
int insertElementRecursive(std::string fn, bson::BSONObj const& obj, Reference<IReadWriteContext> cx) {
if (fn[0] == '$')
throw fieldname_with_dollar();
std::string kp = encodeMaybeDotted(fn);

int nrFDBKeys = 1;
cx->set(kp, DataValue::subObject().encode_value());

auto scx = cx->getSubContext(kp);
for (auto i = obj.begin(); i.more();) {
auto e = i.next();
insertElementRecursive(e, scx);
nrFDBKeys += insertElementRecursive(e, scx);
}

return nrFDBKeys;
}

void insertElementRecursive(std::string fn, bson::BSONArray const& arr, Reference<IReadWriteContext> cx) {
int insertElementRecursive(std::string fn, bson::BSONArray const& arr, Reference<IReadWriteContext> cx) {
if (fn[0] == '$')
throw fieldname_with_dollar();

std::string kp = encodeMaybeDotted(fn);
int nrFDBKeys = 1;

cx->set(kp, DataValue::arrayOfLength(arr.nFields()).encode_value());

auto scx = cx->getSubContext(kp);
for (auto i = arr.begin(); i.more();) {
bson::BSONElement e = i.next();
insertElementRecursive(e, scx);
nrFDBKeys += insertElementRecursive(e, scx);
}

return nrFDBKeys;
}

void insertElementRecursive(std::string fn, bson::BSONElement const& elem, Reference<IReadWriteContext> cx) {
int insertElementRecursive(std::string fn, bson::BSONElement const& elem, Reference<IReadWriteContext> cx) {
if (fn[0] == '$')
throw fieldname_with_dollar();

std::string kp = encodeMaybeDotted(fn);
if (!elem.isABSONObj()) {
cx->set(kp, DataValue(elem).encode_value());
} else {
if (elem.type() == bson::BSONType::Array) {
insertElementRecursive(fn, bson::BSONArray(elem.Obj()), cx);
} else {
insertElementRecursive(fn, elem.Obj(), cx);
}
return 1;
}

if (elem.type() == bson::BSONType::Array)
return insertElementRecursive(fn, bson::BSONArray(elem.Obj()), cx);

return insertElementRecursive(fn, elem.Obj(), cx);
}

void insertElementRecursive(bson::BSONElement const& elem, Reference<IReadWriteContext> cx) {
int insertElementRecursive(bson::BSONElement const& elem, Reference<IReadWriteContext> cx) {
std::string fn = elem.fieldName();
if (std::all_of(fn.begin(), fn.end(), ::isdigit)) {
const char* c_fn = fn.c_str();
insertElementRecursive(atoi(c_fn), elem, cx);
} else {
insertElementRecursive(fn, elem, cx);
return insertElementRecursive(atoi(c_fn), elem, cx);
}

return insertElementRecursive(fn, elem, cx);
}

void insertElementRecursive(int fn, bson::BSONElement const& elem, Reference<IReadWriteContext> cx) {
int insertElementRecursive(int fn, bson::BSONElement const& elem, Reference<IReadWriteContext> cx) {
std::string kp = DataValue(fn).encode_key_part();
if (!elem.isABSONObj()) {
cx->set(kp, DataValue(elem).encode_value());
} else {
if (elem.type() == bson::BSONType::Array) {
insertElementRecursive(fn, bson::BSONArray(elem.Obj()), cx);
} else {
insertElementRecursive(fn, elem.Obj(), cx);
}
return 1;
}

if (elem.type() == bson::BSONType::Array)
return insertElementRecursive(fn, bson::BSONArray(elem.Obj()), cx);

return insertElementRecursive(fn, elem.Obj(), cx);
}

void insertElementRecursive(int fn, bson::BSONObj const& obj, Reference<IReadWriteContext> cx) {
int insertElementRecursive(int fn, bson::BSONObj const& obj, Reference<IReadWriteContext> cx) {
std::string kp = DataValue(fn).encode_key_part();
int nrFDBKeys = 1;

cx->set(kp, DataValue::subObject().encode_value());

auto scx = cx->getSubContext(kp);
for (auto i = obj.begin(); i.more();) {
auto e = i.next();
insertElementRecursive(e, scx);
nrFDBKeys += insertElementRecursive(e, scx);
}
return nrFDBKeys;
}

void insertElementRecursive(int fn, bson::BSONArray const& arr, Reference<IReadWriteContext> cx) {
int insertElementRecursive(int fn, bson::BSONArray const& arr, Reference<IReadWriteContext> cx) {
std::string kp = DataValue(fn).encode_key_part();
int nrFDBKeys = 1;

cx->set(kp, DataValue::arrayOfLength(arr.nFields()).encode_value());

auto scx = cx->getSubContext(kp);
for (auto i = arr.begin(); i.more();) {
bson::BSONElement e = i.next();
insertElementRecursive(e, scx);
nrFDBKeys += insertElementRecursive(e, scx);
}
return nrFDBKeys;
}

ACTOR Future<Void> ensureValidObject(Reference<IReadWriteContext> cx,
Expand Down
10 changes: 5 additions & 5 deletions src/ExtUtil.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,19 @@ std::string encodeMaybeDotted(std::string fieldname);
/**
* The usual way of inserting an element
*/
void insertElementRecursive(const bson::BSONElement& elem, Reference<IReadWriteContext> cx);
int insertElementRecursive(const bson::BSONElement& elem, Reference<IReadWriteContext> cx);

/**
* An overload that is used only in bizarre cases involving upserting things with compound ids.
*/
void insertElementRecursive(std::string fn, bson::BSONObj const& obj, Reference<IReadWriteContext> cx);
int insertElementRecursive(std::string fn, bson::BSONObj const& obj, Reference<IReadWriteContext> cx);

/**
* Utility overloads used by some of the array update operators
*/
void insertElementRecursive(int fn, bson::BSONElement const& elem, Reference<IReadWriteContext> cx);
void insertElementRecursive(int fn, bson::BSONObj const& obj, Reference<IReadWriteContext> cx);
void insertElementRecursive(int fn, bson::BSONArray const& arr, Reference<IReadWriteContext> cx);
int insertElementRecursive(int fn, bson::BSONElement const& elem, Reference<IReadWriteContext> cx);
int insertElementRecursive(int fn, bson::BSONObj const& obj, Reference<IReadWriteContext> cx);
int insertElementRecursive(int fn, bson::BSONArray const& arr, Reference<IReadWriteContext> cx);

Future<Void> ensureValidObject(const Reference<IReadWriteContext>& cx,
const std::string& objectRoot,
Expand Down
2 changes: 1 addition & 1 deletion src/QLExpression.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ ACTOR Future<Optional<std::pair<Standalone<StringRef>, DataValue>>> getArrayAnce
futures.push_back(cx->get(dk.bytes()));
}

if (futures.size()) {
if (!futures.empty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👏

std::vector<Optional<DataValue>> results = wait(getAll(futures));

for (int i = 0; i < (checkLast ? results.size() - 1 : results.size()); ++i) {
Expand Down
Loading