diff --git a/src/Constants.cpp b/src/Constants.cpp index 450be60..ce826ea 100644 --- a/src/Constants.cpp +++ b/src/Constants.cpp @@ -65,4 +65,16 @@ const char* INDEX_STATUS_READY = "ready"; const char* INDEX_STATUS_BUILDING = "building"; const char* INDEX_STATUS_ERROR = "error"; +const char* MT_GUAGE_ACTIVE_CONNECTIONS = "dl_active_connections"; +const char* MT_GUAGE_ACTIVE_CURSORS = "dl_active_cursors"; +const char* MT_HIST_MESSAGE_SZ = "dl_message_size_bytes"; +const char* MT_TIME_QUERY_LATENCY_US = "dl_query_latency_useconds"; +const char* MT_HIST_KEYS_PER_DOCUMENT = "dl_keys_per_doc"; +const char* MT_HIST_DOCUMENT_SZ = "dl_doc_size_bytes"; +const char* MT_HIST_DOCS_PER_INSERT = "dl_docs_per_insert"; +const char* MT_TIME_INSERT_LATENCY_US = "dl_insert_latency_useconds"; +const char* MT_HIST_INSERT_SZ = "dl_insert_size_bytes"; +const char* MT_HIST_TR_PER_REQUEST = "dl_tr_per_request"; +const char* MT_RATE_IDX_REBUILD = "dl_index_rebuild_rate"; + } // namespace DocLayerConstants \ No newline at end of file diff --git a/src/Constants.h b/src/Constants.h index e39ccad..e0c87c7 100644 --- a/src/Constants.h +++ b/src/Constants.h @@ -77,6 +77,19 @@ extern const char* INDEX_STATUS_READY; extern const char* INDEX_STATUS_BUILDING; extern const char* INDEX_STATUS_ERROR; +// Metrics +extern const char* MT_GUAGE_ACTIVE_CONNECTIONS; +extern const char* MT_GUAGE_ACTIVE_CURSORS; +extern const char* MT_HIST_MESSAGE_SZ; +extern const char* MT_TIME_QUERY_LATENCY_US; +extern const char* MT_HIST_KEYS_PER_DOCUMENT; +extern const char* MT_HIST_DOCUMENT_SZ; +extern const char* MT_HIST_DOCS_PER_INSERT; +extern const char* MT_TIME_INSERT_LATENCY_US; +extern const char* MT_HIST_INSERT_SZ; +extern const char* MT_HIST_TR_PER_REQUEST; +extern const char* MT_RATE_IDX_REBUILD; + } // namespace DocLayerConstants #endif // FDB_DOC_LAYER_CONSTANTS_H diff --git a/src/Cursor.actor.cpp b/src/Cursor.actor.cpp index e475aac..7498807 100644 --- a/src/Cursor.actor.cpp +++ b/src/Cursor.actor.cpp @@ -19,6 +19,7 @@ */ #include "Cursor.h" +#include "DocLayer.h" #include "Knobs.h" int32_t Cursor::prune(std::map>& cursors) { @@ -33,7 +34,7 @@ int32_t Cursor::prune(std::map>& cursors) { ++it; } - for (auto i : to_be_pruned) { + for (const auto& i : to_be_pruned) { (void)pluck(i); pruned++; } @@ -45,13 +46,14 @@ void Cursor::pluck(Reference cursor) { if (cursor) { cursor->siblings->erase(cursor->id); cursor->checkpoint->stop(); + DocumentLayer::metricReporter->captureGauge(DocLayerConstants::MT_GUAGE_ACTIVE_CURSORS, + cursor->siblings->size()); } } Reference Cursor::add(std::map>& siblings, Reference cursor) { cursor->siblings = &siblings; - - // FIXME: limit the number of allowed cursors? - - return siblings[cursor->id] = cursor; + siblings[cursor->id] = cursor; + DocumentLayer::metricReporter->captureGauge(DocLayerConstants::MT_GUAGE_ACTIVE_CURSORS, siblings.size()); + return cursor; } diff --git a/src/DocLayer.actor.cpp b/src/DocLayer.actor.cpp index a2d8dea..86016f0 100644 --- a/src/DocLayer.actor.cpp +++ b/src/DocLayer.actor.cpp @@ -182,7 +182,8 @@ ACTOR Future extServerConnection(Reference docLayer, state PromiseStream>> msg_size_inuse; state Future onError = ec->bc->onClosed() || popDisposedMessages(bc, msg_size_inuse.getFuture()); - DocumentLayer::metricReporter->captureGauge("activeConnections", ++docLayer->nrConnections); + DocumentLayer::metricReporter->captureGauge(DocLayerConstants::MT_GUAGE_ACTIVE_CONNECTIONS, + ++docLayer->nrConnections); try { ec->startHousekeeping(); @@ -205,8 +206,8 @@ ACTOR Future extServerConnection(Reference docLayer, Void _ = wait(ec->bc->onBytesAvailable(header->messageLength)); auto messageBytes = ec->bc->peekExact(header->messageLength); - DocumentLayer::metricReporter->captureHistogram("messageLength", header->messageLength); - DocumentLayer::metricReporter->captureMeter("messageRate", 1); + DocumentLayer::metricReporter->captureHistogram(DocLayerConstants::MT_HIST_MESSAGE_SZ, + header->messageLength); /* We don't use hdr in this call because the second peek may have triggered a copy that the first did not, but it's nice @@ -222,7 +223,8 @@ ACTOR Future extServerConnection(Reference docLayer, } } } catch (Error& e) { - DocumentLayer::metricReporter->captureGauge("activeConnections", --docLayer->nrConnections); + DocumentLayer::metricReporter->captureGauge(DocLayerConstants::MT_GUAGE_ACTIVE_CONNECTIONS, + --docLayer->nrConnections); return Void(); } } diff --git a/src/ExtMsg.actor.cpp b/src/ExtMsg.actor.cpp index bf0b069..62d2b12 100644 --- a/src/ExtMsg.actor.cpp +++ b/src/ExtMsg.actor.cpp @@ -490,7 +490,6 @@ ACTOR static Future doRun(Reference query, Reference x; state uint64_t startTime = timer_int(); - DocumentLayer::metricReporter->captureMeter("queryRate", 1); if (query->isCmd) { // It's a command x = runCommand(ec, query, replyStream); @@ -513,7 +512,8 @@ ACTOR static Future doRun(Reference query, ReferencecaptureTime("queryLatency_us", (timer_int() - startTime) / 1000); + DocumentLayer::metricReporter->captureTime(DocLayerConstants::MT_TIME_QUERY_LATENCY_US, + (timer_int() - startTime) / 1000); return Void(); } @@ -647,10 +647,13 @@ ACTOR Future> insertDocument(Referenceset(LiteralStringRef(""), DataValue::subObject().encode_value()); + int nrFDBKeys = 0; for (auto i = d.begin(); i.more();) { auto e = i.next(); - insertElementRecursive(e, dcx); + nrFDBKeys += insertElementRecursive(e, dcx); } + DocumentLayer::metricReporter->captureHistogram(DocLayerConstants::MT_HIST_KEYS_PER_DOCUMENT, nrFDBKeys); + DocumentLayer::metricReporter->captureHistogram(DocLayerConstants::MT_HIST_DOCUMENT_SZ, d.objsize()); if (idObj.present()) insertElementRecursive(DocLayerConstants::ID_FIELD, idObj.get(), dcx); @@ -835,6 +838,7 @@ ACTOR Future doInsertCmd(Namespace ns, std::list* documents, Reference ec) { state Reference tr = ec->getOperationTransaction(); + state uint64_t startTime = timer_int(); if (ns.second == DocLayerConstants::SYSTEM_INDEXES) { if (verboseLogging) @@ -847,11 +851,15 @@ ACTOR Future doInsertCmd(Namespace ns, const char* collnsStr = firstDoc.getField(DocLayerConstants::NS_FIELD).String().c_str(); const auto collns = getDBCollectionPair(collnsStr, std::make_pair("msg", "Bad coll name in index insert")); WriteCmdResult result = wait(attemptIndexInsertion(firstDoc.getOwned(), ec, tr, collns)); + + DocumentLayer::metricReporter->captureTime(DocLayerConstants::MT_TIME_INSERT_LATENCY_US, + (timer_int() - startTime) / 1000); return result; } std::vector> inserts; std::set ids; + int insertSize = 0; for (const auto& d : *documents) { const bson::BSONObj& obj = d; Optional encodedIds = extractEncodedIds(obj); @@ -861,10 +869,16 @@ ACTOR Future doInsertCmd(Namespace ns, } } inserts.push_back(Reference(new ExtInsert(obj, encodedIds))); + insertSize += obj.objsize(); } + DocumentLayer::metricReporter->captureHistogram(DocLayerConstants::MT_HIST_INSERT_SZ, insertSize); + DocumentLayer::metricReporter->captureHistogram(DocLayerConstants::MT_HIST_DOCS_PER_INSERT, documents->size()); Reference plan = ec->isolatedWrapOperationPlan(ref(new InsertPlan(inserts, ec->mm, ns))); int64_t i = wait(executeUntilCompletionTransactionally(plan, tr)); + + DocumentLayer::metricReporter->captureTime(DocLayerConstants::MT_TIME_INSERT_LATENCY_US, + (timer_int() - startTime) / 1000); return WriteCmdResult(i); } @@ -1273,9 +1287,10 @@ std::string ExtMsgKillCursors::toString() { Future doKillCursorsRun(Reference msg, Reference ec) { int64_t* ptr = msg->cursorIDs; - int32_t numberOfCursorIDs = - msg->numberOfCursorIDs; // FIXME: I'm not quite sure what the contract around the memory owned by - // BufferedConnection is. So do this copy for now to be conservative. + + // FIXME: I'm not quite sure what the contract around the memory owned by + // BufferedConnection is. So do this copy for now to be conservative. + int32_t numberOfCursorIDs = msg->numberOfCursorIDs; while (numberOfCursorIDs--) { Cursor::pluck(ec->cursors[*ptr++]); diff --git a/src/ExtUtil.actor.cpp b/src/ExtUtil.actor.cpp index ef26a4a..b68812a 100644 --- a/src/ExtUtil.actor.cpp +++ b/src/ExtUtil.actor.cpp @@ -73,86 +73,106 @@ std::string encodeMaybeDotted(std::string fieldname) { return path; } -void insertElementRecursive(std::string fn, bson::BSONObj const& obj, Reference cx) { +int insertElementRecursive(std::string fn, bson::BSONObj const& obj, Reference cx) { if (fn[0] == '$') throw fieldname_with_dollar(); std::string kp = encodeMaybeDotted(fn); + + int nrFDBKeys = 1; cx->set(kp, DataValue::subObject().encode_value()); + auto scx = cx->getSubContext(kp); for (auto i = obj.begin(); i.more();) { auto e = i.next(); - insertElementRecursive(e, scx); + nrFDBKeys += insertElementRecursive(e, scx); } + + return nrFDBKeys; } -void insertElementRecursive(std::string fn, bson::BSONArray const& arr, Reference cx) { +int insertElementRecursive(std::string fn, bson::BSONArray const& arr, Reference cx) { if (fn[0] == '$') throw fieldname_with_dollar(); + std::string kp = encodeMaybeDotted(fn); + int nrFDBKeys = 1; + cx->set(kp, DataValue::arrayOfLength(arr.nFields()).encode_value()); + auto scx = cx->getSubContext(kp); for (auto i = arr.begin(); i.more();) { bson::BSONElement e = i.next(); - insertElementRecursive(e, scx); + nrFDBKeys += insertElementRecursive(e, scx); } + + return nrFDBKeys; } -void insertElementRecursive(std::string fn, bson::BSONElement const& elem, Reference cx) { +int insertElementRecursive(std::string fn, bson::BSONElement const& elem, Reference cx) { if (fn[0] == '$') throw fieldname_with_dollar(); + std::string kp = encodeMaybeDotted(fn); if (!elem.isABSONObj()) { cx->set(kp, DataValue(elem).encode_value()); - } else { - if (elem.type() == bson::BSONType::Array) { - insertElementRecursive(fn, bson::BSONArray(elem.Obj()), cx); - } else { - insertElementRecursive(fn, elem.Obj(), cx); - } + return 1; } + + if (elem.type() == bson::BSONType::Array) + return insertElementRecursive(fn, bson::BSONArray(elem.Obj()), cx); + + return insertElementRecursive(fn, elem.Obj(), cx); } -void insertElementRecursive(bson::BSONElement const& elem, Reference cx) { +int insertElementRecursive(bson::BSONElement const& elem, Reference cx) { std::string fn = elem.fieldName(); if (std::all_of(fn.begin(), fn.end(), ::isdigit)) { const char* c_fn = fn.c_str(); - insertElementRecursive(atoi(c_fn), elem, cx); - } else { - insertElementRecursive(fn, elem, cx); + return insertElementRecursive(atoi(c_fn), elem, cx); } + + return insertElementRecursive(fn, elem, cx); } -void insertElementRecursive(int fn, bson::BSONElement const& elem, Reference cx) { +int insertElementRecursive(int fn, bson::BSONElement const& elem, Reference cx) { std::string kp = DataValue(fn).encode_key_part(); if (!elem.isABSONObj()) { cx->set(kp, DataValue(elem).encode_value()); - } else { - if (elem.type() == bson::BSONType::Array) { - insertElementRecursive(fn, bson::BSONArray(elem.Obj()), cx); - } else { - insertElementRecursive(fn, elem.Obj(), cx); - } + return 1; } + + if (elem.type() == bson::BSONType::Array) + return insertElementRecursive(fn, bson::BSONArray(elem.Obj()), cx); + + return insertElementRecursive(fn, elem.Obj(), cx); } -void insertElementRecursive(int fn, bson::BSONObj const& obj, Reference cx) { +int insertElementRecursive(int fn, bson::BSONObj const& obj, Reference cx) { std::string kp = DataValue(fn).encode_key_part(); + int nrFDBKeys = 1; + cx->set(kp, DataValue::subObject().encode_value()); + auto scx = cx->getSubContext(kp); for (auto i = obj.begin(); i.more();) { auto e = i.next(); - insertElementRecursive(e, scx); + nrFDBKeys += insertElementRecursive(e, scx); } + return nrFDBKeys; } -void insertElementRecursive(int fn, bson::BSONArray const& arr, Reference cx) { +int insertElementRecursive(int fn, bson::BSONArray const& arr, Reference cx) { std::string kp = DataValue(fn).encode_key_part(); + int nrFDBKeys = 1; + cx->set(kp, DataValue::arrayOfLength(arr.nFields()).encode_value()); + auto scx = cx->getSubContext(kp); for (auto i = arr.begin(); i.more();) { bson::BSONElement e = i.next(); - insertElementRecursive(e, scx); + nrFDBKeys += insertElementRecursive(e, scx); } + return nrFDBKeys; } ACTOR Future ensureValidObject(Reference cx, diff --git a/src/ExtUtil.actor.h b/src/ExtUtil.actor.h index 51a1307..951ca7e 100644 --- a/src/ExtUtil.actor.h +++ b/src/ExtUtil.actor.h @@ -80,19 +80,19 @@ std::string encodeMaybeDotted(std::string fieldname); /** * The usual way of inserting an element */ -void insertElementRecursive(const bson::BSONElement& elem, Reference cx); +int insertElementRecursive(const bson::BSONElement& elem, Reference cx); /** * An overload that is used only in bizarre cases involving upserting things with compound ids. */ -void insertElementRecursive(std::string fn, bson::BSONObj const& obj, Reference cx); +int insertElementRecursive(std::string fn, bson::BSONObj const& obj, Reference cx); /** * Utility overloads used by some of the array update operators */ -void insertElementRecursive(int fn, bson::BSONElement const& elem, Reference cx); -void insertElementRecursive(int fn, bson::BSONObj const& obj, Reference cx); -void insertElementRecursive(int fn, bson::BSONArray const& arr, Reference cx); +int insertElementRecursive(int fn, bson::BSONElement const& elem, Reference cx); +int insertElementRecursive(int fn, bson::BSONObj const& obj, Reference cx); +int insertElementRecursive(int fn, bson::BSONArray const& arr, Reference cx); Future ensureValidObject(const Reference& cx, const std::string& objectRoot, diff --git a/src/QLExpression.actor.cpp b/src/QLExpression.actor.cpp index 909cb06..348f80c 100644 --- a/src/QLExpression.actor.cpp +++ b/src/QLExpression.actor.cpp @@ -35,7 +35,7 @@ ACTOR Future, DataValue>>> getArrayAnce futures.push_back(cx->get(dk.bytes())); } - if (futures.size()) { + if (!futures.empty()) { std::vector> results = wait(getAll(futures)); for (int i = 0; i < (checkLast ? results.size() - 1 : results.size()); ++i) { diff --git a/src/QLPlan.actor.cpp b/src/QLPlan.actor.cpp index 16d8610..6f80090 100644 --- a/src/QLPlan.actor.cpp +++ b/src/QLPlan.actor.cpp @@ -698,7 +698,7 @@ ACTOR static Future doNonIsolatedRO(PlanCheckpoint* outerCheckpoint, dtr = self->newTransaction(); state double startt = now(); state Reference innerCheckpoint(new PlanCheckpoint); - state int64_t nTransactions = 0; + state int nTransactions = 1; state int64_t nResults = 0; state FlowLock* outerLock = outerCheckpoint->getDocumentFinishedLock(); try { @@ -749,8 +749,7 @@ ACTOR static Future doNonIsolatedRO(PlanCheckpoint* outerCheckpoint, ++nTransactions; } } catch (Error& e) { - // printf("NonIsolatedRO: %d transactions, %d results, %0.1f sec, %s\n", nTransactions, nResults, now() - - // startt, e.what()); + DocumentLayer::metricReporter->captureHistogram(DocLayerConstants::MT_HIST_TR_PER_REQUEST, nTransactions); innerCheckpoint->stop(); output.sendError(e); throw; @@ -769,12 +768,10 @@ ACTOR static Future doNonIsolatedRW(PlanCheckpoint* outerCheckpoint, state Reference innerCheckpoint(new PlanCheckpoint); state FlowLock* outerLock = outerCheckpoint->getDocumentFinishedLock(); state int oCount = 0; + state int nTransactions = 1; try { state uint64_t metadataVersion = wait(cx->bindCollectionContext(dtr)->getMetadataVersion()); loop { - // printf("Trying nonIsolatedRW with %d outputs and checkpoint '%s'-'%s'\n", oCount, - // printable(innerCheckpoint->getBounds(0).begin).c_str(), - // printable(innerCheckpoint->getBounds(0).end).c_str()); state FutureStream> docs = subPlan->execute(innerCheckpoint.getPtr(), dtr); state FlowLock* innerLock = innerCheckpoint->getDocumentFinishedLock(); state bool first = true; @@ -846,8 +843,8 @@ ACTOR static Future doNonIsolatedRW(PlanCheckpoint* outerCheckpoint, // except that code is structured in a way makes it hard to use any other transaction. dtr->tr = self->newTransaction()->tr; - innerCheckpoint = next_checkpoint; // Since commit succeeded, we can do the next part next instead of - // redoing this part + // Since commit succeeded, we can do the next part next instead of redoing this part + innerCheckpoint = next_checkpoint; while (!bufferedDocs.empty()) { Void _ = wait(outerLock->take(1)); @@ -878,8 +875,10 @@ ACTOR static Future doNonIsolatedRW(PlanCheckpoint* outerCheckpoint, throw metadata_changed_nonisolated(); } } + nTransactions++; } } catch (Error& e) { + DocumentLayer::metricReporter->captureHistogram(DocLayerConstants::MT_HIST_TR_PER_REQUEST, nTransactions); innerCheckpoint->stop(); output.sendError(e); throw; @@ -1164,7 +1163,7 @@ ACTOR static Future findAndModify(PlanCheckpoint* outerCheckpoint, dtr = NonIsolatedPlan::newTransaction(database); state double startt = now(); state Reference innerCheckpoint(new PlanCheckpoint); - state int64_t nTransactions = 0; + state int nTransactions = 1; state int64_t nResults = 0; state FlowLock* outerLock = outerCheckpoint->getDocumentFinishedLock(); state Reference firstDoc; @@ -1257,14 +1256,12 @@ ACTOR static Future findAndModify(PlanCheckpoint* outerCheckpoint, Void _ = wait(outerLock->take()); - // fprintf(stderr, "any: %d projectNew %d upsertOp %d\n", any, projectNew, (bool)upsertOp); - if (any || (projectNew && upsertOp)) output.send(ref( new ScanReturnedContext(ref(new BsonContext(proj, false)), firstDoc->scanId(), firstDoc->scanKey()))); throw end_of_stream(); } catch (Error& e) { - // fprintf(stderr, "findAndModify error: %s\n", e.what()); + DocumentLayer::metricReporter->captureHistogram(DocLayerConstants::MT_HIST_TR_PER_REQUEST, nTransactions); innerCheckpoint->stop(); output.sendError(e); throw; @@ -1696,21 +1693,31 @@ ACTOR static Future scanAndBuildIndex(PlanCheckpoint* checkpoint, } Void _ = wait(indexDoc->commitChanges()); } + state int nrDocs = 0; try { loop choose { when(state Reference doc = waitNext(input)) { futures.push_back(std::make_pair(doc, buildIndexEntry(doc, index))); + nrDocs++; } when(Void _ = wait(futures.empty() ? Never() : futures.front().second)) { output.send(futures.front().first); futures.pop_front(); } + when(Void _ = wait(delay(0.1))) { + DocumentLayer::metricReporter->captureMeter(DocLayerConstants::MT_RATE_IDX_REBUILD, nrDocs); + nrDocs = 0; + } } } catch (Error& e) { if (e.code() != error_code_end_of_stream) throw; } + if (nrDocs) { + DocumentLayer::metricReporter->captureMeter(DocLayerConstants::MT_RATE_IDX_REBUILD, nrDocs); + } + while (!futures.empty()) { Void _ = wait(futures.front().second); output.send(futures.front().first); diff --git a/test/correctness/smoke/test_update.py b/test/correctness/smoke/test_update.py index 696da4f..80970f2 100644 --- a/test/correctness/smoke/test_update.py +++ b/test/correctness/smoke/test_update.py @@ -24,20 +24,22 @@ from collections import OrderedDict import pytest + @pytest.mark.xfail def test_update_array_containing_none_value(fixture_collection): collection = fixture_collection - collection.insert({'A': [1, None]}) + collection.insert_one({'A': [1, None]}) docCount = collection.find( {'A': {'$in': [ None]}}).count() assert docCount == 1, "Expect 1 documents before the update but got {}".format(docCount) - collection.update({'A': {'$size': 2}} , OrderedDict([('$pop', {'A': -1})])) + collection.update_one({'A': {'$size': 2}} , OrderedDict([('$pop', {'A': -1})])) docCount = collection.find( {'A': {'$in': [ None]}}).count() assert docCount == 1, "Expect 1 documents after the update but got {}".format(docCount) + def test_addToSet_with_none_value(fixture_collection): collection = fixture_collection