From ac1d3b89ceb397bc05f6309cba681b89758cc8e9 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Mon, 18 Dec 2023 10:11:55 +0100 Subject: [PATCH 01/10] perms: add MultiverseRoots to macaroon whitelist We'll want to use the new MultiverseRoots RPC as an optimized way to find out what roots need to be synced during the normal universe sync. So we need to allow access to that RPC without a macaroon as we do for the previously used AssetRoots call. --- perms/perms.go | 1 + 1 file changed, 1 insertion(+) diff --git a/perms/perms.go b/perms/perms.go index 2d7afc005..8588c4c35 100644 --- a/perms/perms.go +++ b/perms/perms.go @@ -218,6 +218,7 @@ var ( // gain a layer of DoS defense. defaultMacaroonWhitelist = map[string]struct{}{ "/universerpc.Universe/AssetRoots": {}, + "/universerpc.Universe/MultiverseRoot": {}, "/universerpc.Universe/QueryAssetRoots": {}, "/universerpc.Universe/AssetLeafKeys": {}, "/universerpc.Universe/AssetLeaves": {}, From c0dfd03c2c0adef9f3c0d7e6f9878bf7ba9e6907 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Mon, 18 Dec 2023 10:27:02 +0100 Subject: [PATCH 02/10] universe: fix formatting, typos --- universe/syncer.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/universe/syncer.go b/universe/syncer.go index 8b5c7ca2e..f54e6a4ce 100644 --- a/universe/syncer.go +++ b/universe/syncer.go @@ -110,7 +110,7 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine, ) switch { // If we have been given a specific set of Universes to sync, then we'll - // only fetch roots for those universes. We wont filter out any + // only fetch roots for those universes. We won't filter out any // Universes here as we assume that the caller has already done so. case len(idsToSync) != 0: targetRoots, err = fetchRootsForIDs(ctx, idsToSync, diffEngine) @@ -130,11 +130,9 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine, // Examine universe IDs of returned roots and filter out // universes that we don't want to sync. - targetRoots = fn.Filter( - targetRoots, func(r Root) bool { - return uniIdSyncFilter(r.ID) - }, - ) + targetRoots = fn.Filter(targetRoots, func(r Root) bool { + return uniIdSyncFilter(r.ID) + }) // At this point, we know that global insert is disabled, and we don't // have any specific Universes to sync. We will therefore fetch roots From bc66b1ba539d908faba55b95bf9723bc919fb255 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Mon, 18 Dec 2023 11:16:54 +0100 Subject: [PATCH 03/10] universe: add IDToMultiverseLeafDesc helper function --- universe/base.go | 12 +----------- universe/interface.go | 9 +++++++++ 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/universe/base.go b/universe/base.go index 244e1a2d0..4489f6d1b 100644 --- a/universe/base.go +++ b/universe/base.go @@ -160,17 +160,7 @@ func (a *Archive) MultiverseRoot(ctx context.Context, proofType ProofType, // Otherwise, we'll run the query to fetch the multiverse leaf for each // of the specified assets. - uniTargets := make([]MultiverseLeafDesc, len(filterByIDs)) - for idx, id := range filterByIDs { - if id.GroupKey != nil { - uniTargets[idx] = fn.NewRight[asset.ID](*id.GroupKey) - } else { - uniTargets[idx] = fn.NewLeft[asset.ID, btcec.PublicKey]( - id.AssetID, - ) - } - } - + uniTargets := fn.Map(filterByIDs, IDToMultiverseLeafDesc) multiverseLeaves, err := a.cfg.Multiverse.FetchLeaves( ctx, uniTargets, proofType, ) diff --git a/universe/interface.go b/universe/interface.go index ced57668e..f9bc864f3 100644 --- a/universe/interface.go +++ b/universe/interface.go @@ -370,6 +370,15 @@ type Root struct { // assumed) can be identified by either the asset ID or the target group key. type MultiverseLeafDesc = fn.Either[asset.ID, btcec.PublicKey] +// IDToMultiverseLeafDesc converts an ID to a multiverse leaf desc. +func IDToMultiverseLeafDesc(id Identifier) MultiverseLeafDesc { + if id.GroupKey != nil { + return fn.NewRight[asset.ID](*id.GroupKey) + } + + return fn.NewLeft[asset.ID, btcec.PublicKey](id.AssetID) +} + // MultiverseRoot is the ms-smt root for a multiverse. This root can be used to // authenticate any leaf proofs. type MultiverseRoot struct { From d87d84ffe49ad733cf780fad6c93539f15f321b1 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Mon, 18 Dec 2023 11:21:20 +0100 Subject: [PATCH 04/10] tapdb: allow test DBs to be used in benchmarks --- tapdb/postgres.go | 4 ++-- tapdb/postgres_fixture.go | 6 +++--- tapdb/sqlite.go | 6 +++--- tapdb/test_postgres.go | 6 +++--- tapdb/test_sqlite.go | 6 +++--- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/tapdb/postgres.go b/tapdb/postgres.go index 8dc3c01a0..ecc08c577 100644 --- a/tapdb/postgres.go +++ b/tapdb/postgres.go @@ -157,7 +157,7 @@ func (s *PostgresStore) ExecuteMigrations(target MigrationTarget) error { // NewTestPostgresDB is a helper function that creates a Postgres database for // testing. -func NewTestPostgresDB(t *testing.T) *PostgresStore { +func NewTestPostgresDB(t testing.TB) *PostgresStore { t.Helper() t.Logf("Creating new Postgres DB for testing") @@ -175,7 +175,7 @@ func NewTestPostgresDB(t *testing.T) *PostgresStore { // NewTestPostgresDBWithVersion is a helper function that creates a Postgres // database for testing and migrates it to the given version. -func NewTestPostgresDBWithVersion(t *testing.T, version uint) *PostgresStore { +func NewTestPostgresDBWithVersion(t testing.TB, version uint) *PostgresStore { t.Helper() t.Logf("Creating new Postgres DB for testing, migrating to version %d", diff --git a/tapdb/postgres_fixture.go b/tapdb/postgres_fixture.go index 7907b2af5..ae77f2c8a 100644 --- a/tapdb/postgres_fixture.go +++ b/tapdb/postgres_fixture.go @@ -35,7 +35,7 @@ type TestPgFixture struct { // NewTestPgFixture constructs a new TestPgFixture starting up a docker // container running Postgres 11. The started container will expire in after // the passed duration. -func NewTestPgFixture(t *testing.T, expiry time.Duration, +func NewTestPgFixture(t testing.TB, expiry time.Duration, autoRemove bool) *TestPgFixture { // Use a sensible default on Windows (tcp/http) and linux/osx (socket) @@ -77,7 +77,7 @@ func NewTestPgFixture(t *testing.T, expiry time.Duration, port: int(port), } databaseURL := fixture.GetDSN() - log.Infof("Connecting to Postgres fixture: %v\n", databaseURL) + log.Infof("Connecting to Postgres fixture: %v", databaseURL) // Tell docker to hard kill the container in "expiry" seconds. require.NoError(t, resource.Expire(uint(expiry.Seconds()))) @@ -122,7 +122,7 @@ func (f *TestPgFixture) GetConfig() *PostgresConfig { } // TearDown stops the underlying docker container. -func (f *TestPgFixture) TearDown(t *testing.T) { +func (f *TestPgFixture) TearDown(t testing.TB) { err := f.pool.Purge(f.resource) require.NoError(t, err, "Could not purge resource") } diff --git a/tapdb/sqlite.go b/tapdb/sqlite.go index aa3fbdb3e..7921f2c84 100644 --- a/tapdb/sqlite.go +++ b/tapdb/sqlite.go @@ -167,7 +167,7 @@ func (s *SqliteStore) ExecuteMigrations(target MigrationTarget) error { // NewTestSqliteDB is a helper function that creates an SQLite database for // testing. -func NewTestSqliteDB(t *testing.T) *SqliteStore { +func NewTestSqliteDB(t testing.TB) *SqliteStore { t.Helper() // TODO(roasbeef): if we pass :memory: for the file name, then we get @@ -180,7 +180,7 @@ func NewTestSqliteDB(t *testing.T) *SqliteStore { // NewTestSqliteDbHandleFromPath is a helper function that creates a SQLite // database handle given a database file path. -func NewTestSqliteDbHandleFromPath(t *testing.T, dbPath string) *SqliteStore { +func NewTestSqliteDbHandleFromPath(t testing.TB, dbPath string) *SqliteStore { t.Helper() sqlDB, err := NewSqliteStore(&SqliteConfig{ @@ -198,7 +198,7 @@ func NewTestSqliteDbHandleFromPath(t *testing.T, dbPath string) *SqliteStore { // NewTestSqliteDBWithVersion is a helper function that creates an SQLite // database for testing and migrates it to the given version. -func NewTestSqliteDBWithVersion(t *testing.T, version uint) *SqliteStore { +func NewTestSqliteDBWithVersion(t testing.TB, version uint) *SqliteStore { t.Helper() t.Logf("Creating new SQLite DB for testing, migrating to version %d", diff --git a/tapdb/test_postgres.go b/tapdb/test_postgres.go index 4e284e10b..ffb638f46 100644 --- a/tapdb/test_postgres.go +++ b/tapdb/test_postgres.go @@ -7,18 +7,18 @@ import ( ) // NewTestDB is a helper function that creates a Postgres database for testing. -func NewTestDB(t *testing.T) *PostgresStore { +func NewTestDB(t testing.TB) *PostgresStore { return NewTestPostgresDB(t) } // NewTestDbHandleFromPath is a helper function that creates a new handle to an // existing SQLite database for testing. -func NewTestDbHandleFromPath(t *testing.T, dbPath string) *PostgresStore { +func NewTestDbHandleFromPath(t testing.TB, dbPath string) *PostgresStore { return NewTestPostgresDB(t) } // NewTestDBWithVersion is a helper function that creates a Postgres database // for testing and migrates it to the given version. -func NewTestDBWithVersion(t *testing.T, version uint) *PostgresStore { +func NewTestDBWithVersion(t testing.TB, version uint) *PostgresStore { return NewTestPostgresDBWithVersion(t, version) } diff --git a/tapdb/test_sqlite.go b/tapdb/test_sqlite.go index 40e2b3673..96d67687f 100644 --- a/tapdb/test_sqlite.go +++ b/tapdb/test_sqlite.go @@ -7,18 +7,18 @@ import ( ) // NewTestDB is a helper function that creates an SQLite database for testing. -func NewTestDB(t *testing.T) *SqliteStore { +func NewTestDB(t testing.TB) *SqliteStore { return NewTestSqliteDB(t) } // NewTestDbHandleFromPath is a helper function that creates a new handle to an // existing SQLite database for testing. -func NewTestDbHandleFromPath(t *testing.T, dbPath string) *SqliteStore { +func NewTestDbHandleFromPath(t testing.TB, dbPath string) *SqliteStore { return NewTestSqliteDbHandleFromPath(t, dbPath) } // NewTestDBWithVersion is a helper function that creates an SQLite database for // testing and migrates it to the given version. -func NewTestDBWithVersion(t *testing.T, version uint) *SqliteStore { +func NewTestDBWithVersion(t testing.TB, version uint) *SqliteStore { return NewTestSqliteDBWithVersion(t, version) } From a241c419a185ff38bb1b3e1b09fa4b625052b5d0 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Mon, 18 Dec 2023 11:24:10 +0100 Subject: [PATCH 05/10] tapdb: add benchmark test for multiverse --- tapdb/universe_test.go | 185 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 181 insertions(+), 4 deletions(-) diff --git a/tapdb/universe_test.go b/tapdb/universe_test.go index 592428747..2e8cf6a50 100644 --- a/tapdb/universe_test.go +++ b/tapdb/universe_test.go @@ -40,7 +40,7 @@ func withProofType(proofType universe.ProofType) universeIDOptFunc { } } -func randUniverseID(t *testing.T, forceGroup bool, +func randUniverseID(t testing.TB, forceGroup bool, optFunctions ...universeIDOptFunc) universe.Identifier { opts := defaultUniverseIdOptions() @@ -155,14 +155,14 @@ func TestUniverseEmptyTree(t *testing.T) { require.ErrorIs(t, err, universe.ErrNoUniverseRoot) } -func randLeafKey(t *testing.T) universe.LeafKey { +func randLeafKey(t testing.TB) universe.LeafKey { return universe.LeafKey{ OutPoint: test.RandOp(t), ScriptKey: fn.Ptr(asset.NewScriptKey(test.RandPubKey(t))), } } -func randProof(t *testing.T, argAsset *asset.Asset) *proof.Proof { +func randProof(t testing.TB, argAsset *asset.Asset) *proof.Proof { proofAsset := *asset.RandAsset(t, asset.Normal) if argAsset != nil { proofAsset = *argAsset @@ -187,7 +187,7 @@ func randProof(t *testing.T, argAsset *asset.Asset) *proof.Proof { } } -func randMintingLeaf(t *testing.T, assetGen asset.Genesis, +func randMintingLeaf(t testing.TB, assetGen asset.Genesis, groupKey *btcec.PublicKey) universe.Leaf { randProof := randProof(t, nil) @@ -1073,3 +1073,180 @@ func TestMultiverseRootSum(t *testing.T) { }) } } + +// BenchmarkMultiverse benchmarks database calls around the multiverse tree. +func BenchmarkMultiverse(b *testing.B) { + ctx := context.Background() + + db := NewTestDB(b) + multiverse, _ := newTestMultiverseWithDb(db.BaseDB) + + // We first insert a couple of hundred leaves into the multiverse, the + // same number of issuance and transfer proofs. + const numLeaves = 200 + issuanceIDs := make([]universe.Identifier, 0, numLeaves) + transferIDs := make([]universe.Identifier, 0, numLeaves) + transferSum := uint64(0) + for i := 0; i < numLeaves; i++ { + // Create two universe identifiers first, one for issuance and + // one for transfer. + forceGroup := test.RandBool() + proofType := withProofType(universe.ProofTypeIssuance) + issuanceID := randUniverseID(b, forceGroup, proofType) + + transferID := issuanceID + transferID.ProofType = universe.ProofTypeTransfer + + issuanceIDs = append(issuanceIDs, issuanceID) + transferIDs = append(transferIDs, issuanceID) + + // Now we insert a single proof leaf into each of the two new + // universes. + assetGen := asset.RandGenesis(b, asset.Normal) + + issuanceLeaf := leafWithKey{ + LeafKey: randLeafKey(b), + Leaf: randMintingLeaf( + b, assetGen, issuanceID.GroupKey, + ), + } + transferLeaf := leafWithKey{ + LeafKey: randLeafKey(b), + Leaf: randMintingLeaf( + b, assetGen, transferID.GroupKey, + ), + } + + _, err := multiverse.UpsertProofLeaf( + ctx, issuanceID, issuanceLeaf.LeafKey, + &issuanceLeaf.Leaf, nil, + ) + require.NoError(b, err) + _, err = multiverse.UpsertProofLeaf( + ctx, transferID, transferLeaf.LeafKey, + &transferLeaf.Leaf, nil, + ) + require.NoError(b, err) + + transferSum += transferLeaf.Leaf.Amt + } + + b.ResetTimer() + var ( + fetchIRootNode int64 + fetchTRootNode int64 + fetchAllLeaves int64 + fetchSpecificAll int64 + fetchSpecificFew50 int64 + fetchSpecificFew10 int64 + ) + for i := 0; i < b.N; i++ { + // Count the time to fetch the issuance root node. + start := time.Now() + irn, err := multiverse.MultiverseRootNode( + ctx, universe.ProofTypeIssuance, + ) + require.NoError(b, err) + fetchIRootNode += time.Since(start).Nanoseconds() + + require.True(b, irn.IsSome()) + irn.WhenSome(func(rootNode universe.MultiverseRoot) { + require.EqualValues(b, numLeaves, rootNode.NodeSum()) + }) + + // Count the time to fetch the transfer root node. + start = time.Now() + trn, err := multiverse.MultiverseRootNode( + ctx, universe.ProofTypeTransfer, + ) + require.NoError(b, err) + fetchTRootNode += time.Since(start).Nanoseconds() + + require.True(b, trn.IsSome()) + trn.WhenSome(func(rootNode universe.MultiverseRoot) { + require.EqualValues(b, transferSum, rootNode.NodeSum()) + }) + + // Count the time to fetch all leaves. + start = time.Now() + allLeaves, err := multiverse.FetchLeaves( + ctx, nil, universe.ProofTypeTransfer, + ) + require.NoError(b, err) + fetchAllLeaves += time.Since(start).Nanoseconds() + + require.Len(b, allLeaves, numLeaves) + + // Count the time to fetch the same leaves but by specifying + // the universe IDs instead. + leafDescriptors := fn.Map( + transferIDs, universe.IDToMultiverseLeafDesc, + ) + + start = time.Now() + specificLeaves, err := multiverse.FetchLeaves( + ctx, leafDescriptors, universe.ProofTypeTransfer, + ) + require.NoError(b, err) + fetchSpecificAll += time.Since(start).Nanoseconds() + + require.Len(b, specificLeaves, numLeaves) + + // Count the time to fetch 50 specific leaves. + numSpecificLeaves := 50 + leafDescriptors = fn.Map( + transferIDs[0:numSpecificLeaves], + universe.IDToMultiverseLeafDesc, + ) + + start = time.Now() + specificLeaves, err = multiverse.FetchLeaves( + ctx, leafDescriptors, universe.ProofTypeTransfer, + ) + require.NoError(b, err) + fetchSpecificFew50 += time.Since(start).Nanoseconds() + + require.Len(b, specificLeaves, numSpecificLeaves) + + // Count the time to fetch 10 specific leaves. + numSpecificLeaves = 10 + leafDescriptors = fn.Map( + transferIDs[100:numSpecificLeaves], + universe.IDToMultiverseLeafDesc, + ) + + start = time.Now() + specificLeaves, err = multiverse.FetchLeaves( + ctx, leafDescriptors, universe.ProofTypeTransfer, + ) + require.NoError(b, err) + fetchSpecificFew10 += time.Since(start).Nanoseconds() + + require.Len(b, specificLeaves, numSpecificLeaves) + } + + b.ReportMetric( + float64(fetchIRootNode)/float64(b.N), + "fetch_issuance_root_ns/op", + ) + b.ReportMetric( + float64(fetchTRootNode)/float64(b.N), + "fetch_transfer_root_ns/op", + ) + b.ReportMetric( + float64(fetchAllLeaves)/float64(b.N), + "fetch_all_leaves_ns/op", + ) + b.ReportMetric( + float64(fetchSpecificAll)/float64(b.N), + "fetch_specific_leaves_all_ns/op", + ) + b.ReportMetric( + float64(fetchSpecificFew50)/float64(b.N), + "fetch_50_specific_leaves_ns/op", + ) + b.ReportMetric( + float64(fetchSpecificFew10)/float64(b.N), + "fetch_10_specific_leaves_ns/op", + ) +} From 862210db17c1104d78cdc3d2a5e1bccf9e42f566 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Mon, 18 Dec 2023 11:49:24 +0100 Subject: [PATCH 06/10] tapdb/sqlc: add indexes for multiverse queries This is a first optimization to make some of the queries around the multiverse faster. We create an index for all the fieds that appear either in a JOIN or WHERE clause in our multiverse queries. --- tapdb/sqlc/migrations/000015_multiverse_indexes.down.sql | 9 +++++++++ tapdb/sqlc/migrations/000015_multiverse_indexes.up.sql | 9 +++++++++ 2 files changed, 18 insertions(+) create mode 100644 tapdb/sqlc/migrations/000015_multiverse_indexes.down.sql create mode 100644 tapdb/sqlc/migrations/000015_multiverse_indexes.up.sql diff --git a/tapdb/sqlc/migrations/000015_multiverse_indexes.down.sql b/tapdb/sqlc/migrations/000015_multiverse_indexes.down.sql new file mode 100644 index 000000000..b17fe1fca --- /dev/null +++ b/tapdb/sqlc/migrations/000015_multiverse_indexes.down.sql @@ -0,0 +1,9 @@ +DROP INDEX IF EXISTS mssmt_nodes_key_idx; + +DROP INDEX IF EXISTS multiverse_roots_namespace_root_idx; +DROP INDEX IF EXISTS multiverse_roots_proof_type_idx; + +DROP INDEX IF EXISTS multiverse_leaves_multiverse_root_id_idx; + +DROP INDEX IF EXISTS multiverse_leaves_asset_id_idx; +DROP INDEX IF EXISTS multiverse_leaves_group_key_idx; diff --git a/tapdb/sqlc/migrations/000015_multiverse_indexes.up.sql b/tapdb/sqlc/migrations/000015_multiverse_indexes.up.sql new file mode 100644 index 000000000..776905e2f --- /dev/null +++ b/tapdb/sqlc/migrations/000015_multiverse_indexes.up.sql @@ -0,0 +1,9 @@ +CREATE INDEX IF NOT EXISTS mssmt_nodes_key_idx ON mssmt_nodes(key); + +CREATE INDEX IF NOT EXISTS multiverse_roots_namespace_root_idx ON multiverse_roots(namespace_root); +CREATE INDEX IF NOT EXISTS multiverse_roots_proof_type_idx ON multiverse_roots(proof_type); + +CREATE INDEX IF NOT EXISTS multiverse_leaves_multiverse_root_id_idx ON multiverse_leaves(multiverse_root_id); + +CREATE INDEX IF NOT EXISTS multiverse_leaves_asset_id_idx ON multiverse_leaves(asset_id); +CREATE INDEX IF NOT EXISTS multiverse_leaves_group_key_idx ON multiverse_leaves(group_key); From d6683a9ee4f562b520196eb7fc94b7f461d5b48a Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Mon, 18 Dec 2023 13:04:52 +0100 Subject: [PATCH 07/10] tapdb: add cache for multiverse leaves --- tapdb/multiverse.go | 220 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 177 insertions(+), 43 deletions(-) diff --git a/tapdb/multiverse.go b/tapdb/multiverse.go index fea562e0c..9c0f2ca89 100644 --- a/tapdb/multiverse.go +++ b/tapdb/multiverse.go @@ -30,6 +30,18 @@ const ( // transferMultiverseNS is the namespace used for the multiverse // issuance proofs. transferMultiverseNS = "multiverse-transfer" + + // numCachedProofs is the number of universe proofs we'll cache. + numCachedProofs = 50_000 + + // numMaxCachedPages is the maximum number of pages we'll cache for a + // given page cache. Each page is 512 items, so we'll cache 10 of them, + // up to 5,120 for a given namespace. + numMaxCachedPages = 1000 + + // numCachedMultiverseLeaves is the number of multiverse leaves we'll + // cache. + numCachedMultiverseLeaves = 10_000 ) var ( @@ -123,9 +135,6 @@ func NewProofKey(id universe.Identifier, key universe.LeafKey) ProofKey { return fn.ToArray[ProofKey](h.Sum(nil)) } -// numCachedProofs is the number of universe proofs we'll cache. -const numCachedProofs = 50_000 - // cachedProof is a single cached proof. type cachedProof []*universe.Proof @@ -140,9 +149,7 @@ type leafProofCache = *lru.Cache[ProofKey, *cachedProof] // newLeafCache creates a new leaf proof cache. func newLeafCache() leafProofCache { - return lru.NewCache[ProofKey, *cachedProof]( - numCachedProofs, - ) + return lru.NewCache[ProofKey, *cachedProof](numCachedProofs) } // treeID is used to uniquely identify a multiverse tree. @@ -383,11 +390,6 @@ func (c cachedLeafKeys) Size() (uint64, error) { return uint64(1), nil } -// numMaxCachedPages is the maximum number of pages we'll cache for a given -// page cache. Each page is 512 items, so we'll cache 10 of them, up to 5,120 -// for a given namespace. -const numMaxCachedPages = 1000 - // leafQuery is a wrapper around the existing UniverseLeafKeysQuery struct that // doesn't include a pointer so it can be safely used as a map key. type leafQuery struct { @@ -440,8 +442,8 @@ func newUniverseLeafCache() *universeLeafCache { } // fetchLeafKeys reads the cached leaf keys for the given ID. -func (u *universeLeafCache) fetchLeafKeys(q universe.UniverseLeafKeysQuery, -) []universe.LeafKey { +func (u *universeLeafCache) fetchLeafKeys( + q universe.UniverseLeafKeysQuery) []universe.LeafKey { idStr := treeID(q.Id.String()) @@ -484,7 +486,7 @@ func (u *universeLeafCache) cacheLeafKeys(q universe.UniverseLeafKeysQuery, _, _ = u.leafCache.Put(idStr, pageCache) } - // Add the to the page cache. + // Add the key to the page cache. _, _ = pageCache.Put(newLeafQuery(q), &cachedKeys) } @@ -495,6 +497,88 @@ func (u *universeLeafCache) wipeCache(id treeID) { u.leafCache.Delete(id) } +// multiverseQueryToKey converts a multiverse query to a cache key. +func multiverseQueryToKey(q QueryMultiverseLeaves) treeID { + var idBytes []byte + if len(q.GroupKey) > 0 { + idBytes = fn.ByteSlice(sha256.Sum256(q.GroupKey)) + } else { + idBytes = q.AssetID + } + return treeID(fmt.Sprintf("%s-%x", q.ProofType, idBytes)) +} + +// cachedMultiverseLeaf is used to cache the set of multiverse leaves. +type cachedMultiverseLeaf struct { + *universe.MultiverseLeaf +} + +// Size just returns 1, as we cache based on the total number of leaves. +func (c *cachedMultiverseLeaf) Size() (uint64, error) { + return uint64(1), nil +} + +// multiverseLeafCache is used to cache the set of multiverse leaves. +type multiverseLeafCache struct { + sync.Mutex + + leafCache *lru.Cache[treeID, *cachedMultiverseLeaf] + + *cacheLogger +} + +// newMultiverseLeafCache creates a new multiverse leaf cache. +func newMultiverseLeafCache() *multiverseLeafCache { + return &multiverseLeafCache{ + leafCache: lru.NewCache[treeID, *cachedMultiverseLeaf]( + numCachedMultiverseLeaves, + ), + cacheLogger: newCacheLogger("multiverse_leaves"), + } +} + +// fetchLeafKeys reads the cached leaf keys for the given ID. +func (u *multiverseLeafCache) fetchMultiverseLeaf( + q QueryMultiverseLeaves) *universe.MultiverseLeaf { + + // We only cache queries for specific leaves, so we'll return nothing + // if we don't have either an asset ID or group key. + if len(q.AssetID) == 0 && len(q.GroupKey) == 0 { + return nil + } + + cacheKey := multiverseQueryToKey(q) + + cachedLeaf, err := u.leafCache.Get(cacheKey) + if err == nil { + u.Hit() + return cachedLeaf.MultiverseLeaf + } + + u.Miss() + + return nil +} + +// cacheLeafKeys stores the given leaf keys in the cache. +func (u *multiverseLeafCache) cacheMultiverseLeaf(q QueryMultiverseLeaves, + leaf *universe.MultiverseLeaf) { + + cacheKey := multiverseQueryToKey(q) + + // Add the key to the page cache. + _, _ = u.leafCache.Put(cacheKey, &cachedMultiverseLeaf{ + MultiverseLeaf: leaf, + }) +} + +// wipeCache wipes the cache entry of multiverse leaves for a given universe ID. +func (u *multiverseLeafCache) wipeCache(id treeID) { + log.Debugf("wiping leaf keys for %x in cache", id) + + u.leafCache.Delete(id) +} + // MultiverseStore implements the persistent storage for a multiverse. // // NOTE: This implements the universe.MultiverseArchive interface. @@ -506,15 +590,18 @@ type MultiverseStore struct { proofCache *proofCache leafKeysCache *universeLeafCache + + multiverseLeafCache *multiverseLeafCache } // NewMultiverseStore creates a new multiverse DB store handle. func NewMultiverseStore(db BatchedMultiverse) *MultiverseStore { return &MultiverseStore{ - db: db, - rootNodeCache: newRootNodeCache(), - proofCache: newProofCache(), - leafKeysCache: newUniverseLeafCache(), + db: db, + rootNodeCache: newRootNodeCache(), + proofCache: newProofCache(), + leafKeysCache: newUniverseLeafCache(), + multiverseLeafCache: newMultiverseLeafCache(), } } @@ -929,6 +1016,7 @@ func (b *MultiverseStore) UpsertProofLeaf(ctx context.Context, b.rootNodeCache.wipeCache() b.proofCache.delProofsForAsset(id) b.leafKeysCache.wipeCache(idStr) + b.multiverseLeafCache.wipeCache(idStr) return issuanceProof, nil } @@ -984,6 +1072,7 @@ func (b *MultiverseStore) UpsertProofLeafBatch(ctx context.Context, for id := range idsToDelete { b.proofCache.Delete(id) b.leafKeysCache.wipeCache(id) + b.multiverseLeafCache.wipeCache(id) } return nil @@ -1023,6 +1112,7 @@ func (b *MultiverseStore) DeleteUniverse(ctx context.Context, idStr := treeID(id.String()) b.proofCache.Delete(idStr) b.leafKeysCache.wipeCache(idStr) + b.multiverseLeafCache.wipeCache(idStr) return id.String(), dbErr } @@ -1071,35 +1161,14 @@ func (b *MultiverseStore) FetchLeaves(ctx context.Context, leaves = nil for _, query := range queries { - dbLeaves, err := q.QueryMultiverseLeaves(ctx, query) + leavesForQuery, err := b.queryMultiverseLeaf( + ctx, proofType, query, q, + ) if err != nil { return err } - for _, leaf := range dbLeaves { - var id universe.Identifier - - id.ProofType = proofType - if len(leaf.AssetID) > 0 { - copy(id.AssetID[:], leaf.AssetID) - } - if len(leaf.GroupKey) > 0 { - id.GroupKey, err = schnorr.ParsePubKey( - leaf.GroupKey, - ) - if err != nil { - return err - } - } - - leaves = append(leaves, universe.MultiverseLeaf{ - ID: id, - LeafNode: mssmt.NewLeafNode( - leaf.UniverseRootHash, - uint64(leaf.UniverseRootSum), - ), - }) - } + leaves = append(leaves, leavesForQuery...) } return nil }) @@ -1109,3 +1178,68 @@ func (b *MultiverseStore) FetchLeaves(ctx context.Context, return leaves, nil } + +// queryMultiverseLeaf returns the multiverse leaves that match the given query, +// either from the cache or the backing database. +func (b *MultiverseStore) queryMultiverseLeaf(ctx context.Context, + proofType universe.ProofType, query QueryMultiverseLeaves, + q BaseMultiverseStore) ([]universe.MultiverseLeaf, error) { + + // Ask our cache first. + cachedLeaf := b.multiverseLeafCache.fetchMultiverseLeaf(query) + if cachedLeaf != nil { + // We know that the cache is only populated with a single leaf, + // so we can just return that. + return []universe.MultiverseLeaf{*cachedLeaf}, nil + } + + dbLeaves, err := q.QueryMultiverseLeaves(ctx, query) + if err != nil { + return nil, err + } + + b.multiverseLeafCache.Lock() + defer b.multiverseLeafCache.Unlock() + + // While we were waiting for the lock, the cache might have been + // populated, so we'll check that now. + cachedLeaf = b.multiverseLeafCache.fetchMultiverseLeaf(query) + if cachedLeaf != nil { + // We know that the cache is only populated with a single leaf, + // so we can just return that. + return []universe.MultiverseLeaf{*cachedLeaf}, nil + } + + var leaves []universe.MultiverseLeaf + for _, leaf := range dbLeaves { + var id universe.Identifier + + id.ProofType = proofType + if len(leaf.AssetID) > 0 { + copy(id.AssetID[:], leaf.AssetID) + } + if len(leaf.GroupKey) > 0 { + id.GroupKey, err = schnorr.ParsePubKey( + leaf.GroupKey, + ) + if err != nil { + return nil, err + } + } + + multiverseLeaf := universe.MultiverseLeaf{ + ID: id, + LeafNode: mssmt.NewLeafNode( + leaf.UniverseRootHash, + uint64(leaf.UniverseRootSum), + ), + } + b.multiverseLeafCache.cacheMultiverseLeaf( + query, &multiverseLeaf, + ) + + leaves = append(leaves, multiverseLeaf) + } + + return leaves, nil +} From 5600740b8bf37ffc1a05a849b9b4d507b434f453 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Mon, 18 Dec 2023 14:53:45 +0100 Subject: [PATCH 08/10] tapdb+universe: return ErrNoMultiverseRoot if no root exists --- tapdb/multiverse.go | 3 ++- universe/base.go | 3 +++ universe/interface.go | 3 ++- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/tapdb/multiverse.go b/tapdb/multiverse.go index 9c0f2ca89..d54c2cdb0 100644 --- a/tapdb/multiverse.go +++ b/tapdb/multiverse.go @@ -621,7 +621,8 @@ func namespaceForProof(proofType universe.ProofType) (string, error) { } // MultiverseRootNode returns the root multiverse node for the given proof -// type. +// type. If no multiverse root exists (yet), then ErrNoMultiverseRoot is +// returned. func (b *MultiverseStore) MultiverseRootNode(ctx context.Context, proofType universe.ProofType) (fn.Option[universe.MultiverseRoot], error) { diff --git a/universe/base.go b/universe/base.go index 4489f6d1b..65af1e99d 100644 --- a/universe/base.go +++ b/universe/base.go @@ -151,6 +151,9 @@ func (a *Archive) MultiverseRoot(ctx context.Context, proofType ProofType, rootNode, err := a.cfg.Multiverse.MultiverseRootNode( ctx, proofType, ) + if errors.Is(err, ErrNoUniverseProofFound) { + return none, nil + } if err != nil { return none, err } diff --git a/universe/interface.go b/universe/interface.go index f9bc864f3..6e04d1742 100644 --- a/universe/interface.go +++ b/universe/interface.go @@ -444,7 +444,8 @@ type MultiverseArchive interface { proofType ProofType) ([]MultiverseLeaf, error) // MultiverseRootNode returns the Multiverse root node for the given - // proof type. + // proof type. If no multiverse root exists (yet), then + // ErrNoMultiverseRoot is returned. MultiverseRootNode(ctx context.Context, proofType ProofType) (fn.Option[MultiverseRoot], error) } From f51712cc5d49ec3c322696bc4bebf6e7d77cfc6c Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Mon, 18 Dec 2023 14:54:34 +0100 Subject: [PATCH 09/10] universe: add MultiverseRoot to DiffEngine --- universe/interface.go | 6 ++++++ universe_rpc_diff.go | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/universe/interface.go b/universe/interface.go index 6e04d1742..52d368a65 100644 --- a/universe/interface.go +++ b/universe/interface.go @@ -660,6 +660,12 @@ type DiffEngine interface { // of diff FetchProofLeaf(ctx context.Context, id Identifier, key LeafKey) ([]*Proof, error) + + // MultiverseRoot returns the root node of the multiverse for the + // specified proof type. If the given list of universe IDs is non-empty, + // then the root will be calculated just for those universes. + MultiverseRoot(ctx context.Context, proofType ProofType, + filterByIDs []Identifier) (fn.Option[MultiverseRoot], error) } // Commitment is an on chain universe commitment. This includes the merkle diff --git a/universe_rpc_diff.go b/universe_rpc_diff.go index e1994e105..b9cbdbca1 100644 --- a/universe_rpc_diff.go +++ b/universe_rpc_diff.go @@ -5,6 +5,7 @@ import ( "context" "fmt" + "github.com/lightninglabs/taproot-assets/fn" "github.com/lightninglabs/taproot-assets/mssmt" "github.com/lightninglabs/taproot-assets/taprpc/universerpc" unirpc "github.com/lightninglabs/taproot-assets/taprpc/universerpc" @@ -210,6 +211,45 @@ func (r *RpcUniverseDiff) FetchProofLeaf(ctx context.Context, return []*universe.Proof{uniProof}, nil } +// MultiverseRoot returns the root node of the multiverse for the +// specified proof type. If the given list of universe IDs is non-empty, +// then the root will be calculated just for those universes. +func (r *RpcUniverseDiff) MultiverseRoot(ctx context.Context, + proofType universe.ProofType, + filterByIDs []universe.Identifier) (fn.Option[universe.MultiverseRoot], + error) { + + none := fn.None[universe.MultiverseRoot]() + + proofTypeRpc, err := MarshalUniProofType(proofType) + if err != nil { + return none, fmt.Errorf("unable to marshal proof type: %w", err) + } + + rpcIDs := make([]*unirpc.ID, len(filterByIDs)) + for i, id := range filterByIDs { + uniID, err := MarshalUniID(id) + if err != nil { + return none, err + } + + rpcIDs[i] = uniID + } + + root, err := r.conn.MultiverseRoot(ctx, &unirpc.MultiverseRootRequest{ + ProofType: proofTypeRpc, + SpecificIds: rpcIDs, + }) + if err != nil { + return none, err + } + + return fn.Some(universe.MultiverseRoot{ + ProofType: proofType, + Node: unmarshalMerkleSumNode(root.MultiverseRoot), + }), nil +} + // A compile time interface to ensure that RpcUniverseDiff implements the // universe.DiffEngine interface. var _ universe.DiffEngine = (*RpcUniverseDiff)(nil) From 6fd49e4a1d13f66ec24b96bc3bf39c3425c36fa7 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Mon, 18 Dec 2023 14:54:49 +0100 Subject: [PATCH 10/10] taprpc+universe: implement sparse universe sync --- taprpc/taprpc_utils.go | 17 +++- universe/syncer.go | 184 ++++++++++++++++++++++++++++++++++------- 2 files changed, 172 insertions(+), 29 deletions(-) diff --git a/taprpc/taprpc_utils.go b/taprpc/taprpc_utils.go index 24e39d2ca..2c888f486 100644 --- a/taprpc/taprpc_utils.go +++ b/taprpc/taprpc_utils.go @@ -1,6 +1,10 @@ package taprpc -import "google.golang.org/protobuf/encoding/protojson" +import ( + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/encoding/protojson" +) var ( // ProtoJSONMarshalOpts is a struct that holds the default marshal @@ -40,3 +44,14 @@ var ( UseHexForBytes: true, } ) + +// IsUnimplemented returns true if the error is a gRPC error with the code +// Unimplemented. +func IsUnimplemented(err error) bool { + s, ok := status.FromError(err) + if !ok { + return false + } + + return s.Code() == codes.Unimplemented +} diff --git a/universe/syncer.go b/universe/syncer.go index f54e6a4ce..a25f333b2 100644 --- a/universe/syncer.go +++ b/universe/syncer.go @@ -12,6 +12,7 @@ import ( "github.com/lightninglabs/taproot-assets/fn" "github.com/lightninglabs/taproot-assets/mssmt" "github.com/lightninglabs/taproot-assets/proof" + "github.com/lightninglabs/taproot-assets/taprpc" "golang.org/x/sync/errgroup" ) @@ -113,6 +114,19 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine, // only fetch roots for those universes. We won't filter out any // Universes here as we assume that the caller has already done so. case len(idsToSync) != 0: + // We attempt to bisect the set of IDs we really need to sync by + // using ephemeral multiverse trees and a bisect algorithm to + // find the diffs in the root nodes. This allows us to more + // efficiently find out which roots we need to sync compared to + // querying the remote server for each root individually. + idsToSync, err = s.bisectOutdatedRoots( + ctx, idsToSync, diffEngine, + ) + if err != nil { + return nil, fmt.Errorf("unable to bisect outdated "+ + "roots: %w", err) + } + targetRoots, err = fetchRootsForIDs(ctx, idsToSync, diffEngine) if err != nil { return nil, err @@ -123,6 +137,10 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine, case globalInsertEnabled: log.Infof("Fetching all roots for remote Universe server...") + // Since we're also interested in learning about _new_ universes + // in this case, we can't use the bisect algorithm to find the + // diffs in the root nodes. Instead, we'll just fetch all the + // roots from the remote server. targetRoots, err = s.fetchAllRoots(ctx, diffEngine) if err != nil { return nil, err @@ -140,18 +158,26 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine, // configs. default: var uniIDs []Identifier - for _, uniSyncCfg := range syncConfigs.UniSyncConfigs { // Check with the filter to ensure that the universe is // applicable for syncing. If not, we would have // retrieved the corresponding root in vain. if uniIdSyncFilter(uniSyncCfg.UniverseID) { - uniIDs = append( - uniIDs, uniSyncCfg.UniverseID, - ) + uniIDs = append(uniIDs, uniSyncCfg.UniverseID) } } + // We attempt to bisect the set of IDs we really need to sync by + // using ephemeral multiverse trees and a bisect algorithm to + // find the diffs in the root nodes. This allows us to more + // efficiently find out which roots we need to sync compared to + // querying the remote server for each root individually. + uniIDs, err = s.bisectOutdatedRoots(ctx, uniIDs, diffEngine) + if err != nil { + return nil, fmt.Errorf("unable to bisect outdated "+ + "roots: %w", err) + } + // Retrieve roots for the gathered set of universes. targetRoots, err = fetchRootsForIDs(ctx, uniIDs, diffEngine) if err != nil { @@ -190,8 +216,7 @@ func fetchRootsForIDs(ctx context.Context, idsToSync []Identifier, // as a series of parallel requests backed by a worker pool. rootsToSync := make(chan Root, len(idsToSync)) err := fn.ParSlice( - ctx, idsToSync, - func(ctx context.Context, id Identifier) error { + ctx, idsToSync, func(ctx context.Context, id Identifier) error { root, err := diffEngine.RootNode(ctx, id) if err != nil { return err @@ -209,6 +234,113 @@ func fetchRootsForIDs(ctx context.Context, idsToSync []Identifier, return fn.Collect(rootsToSync), nil } +// bisectOutdatedRoots attempts to bisect the set of IDs we need to sync by +// using ephemeral multiverse trees and a bisect algorithm to find the diffs in +// the root nodes. This allows us to more efficiently find out which roots we +// need to sync compared to querying the remote server for each root +// individually. If the server doesn't yet implement the MultiverseRoot RPC, we +// simply return the original set of IDs and the "legacy" sync algorithm will be +// used. +func (s *SimpleSyncer) bisectOutdatedRoots(ctx context.Context, + idsToSync []Identifier, diffEngine DiffEngine) ([]Identifier, error) { + + issuanceIDs := make([]Identifier, 0, len(idsToSync)) + transferIDs := make([]Identifier, 0, len(idsToSync)) + for _, id := range idsToSync { + switch id.ProofType { + case ProofTypeIssuance: + issuanceIDs = append(issuanceIDs, id) + + case ProofTypeTransfer: + transferIDs = append(transferIDs, id) + + case ProofTypeUnspecified: + issuanceID := id + issuanceID.ProofType = ProofTypeIssuance + issuanceIDs = append(issuanceIDs, issuanceID) + + transferID := id + transferID.ProofType = ProofTypeTransfer + transferIDs = append(transferIDs, transferID) + } + } + + targetIDs := make([]Identifier, 0, len(idsToSync)) + + // Compare the local and remote issuance trees. + if len(issuanceIDs) > 0 { + outdated, err := s.rootsOutdated( + ctx, ProofTypeIssuance, issuanceIDs, diffEngine, + ) + if err != nil { + return nil, err + } + + if outdated { + targetIDs = append(targetIDs, issuanceIDs...) + } + } + + // Compare the local and remote transfer trees. + if len(transferIDs) > 0 { + outdated, err := s.rootsOutdated( + ctx, ProofTypeTransfer, transferIDs, diffEngine, + ) + if err != nil { + return nil, err + } + + if outdated { + targetIDs = append(targetIDs, transferIDs...) + } + } + + return targetIDs, nil +} + +// rootsOutdated returns true if the roots for the given IDs are outdated and +// need to be synced. +func (s *SimpleSyncer) rootsOutdated(ctx context.Context, proofType ProofType, + idsToSync []Identifier, diffEngine DiffEngine) (bool, error) { + + var localRootNode, remoteRootNode mssmt.Node + localTree, err := s.cfg.LocalDiffEngine.MultiverseRoot( + ctx, proofType, idsToSync, + ) + if err != nil { + return false, fmt.Errorf("unable to fetch local multiverse "+ + "root: %w", err) + } + localTree.WhenSome(func(root MultiverseRoot) { + localRootNode = root.Node + }) + + remoteTree, err := diffEngine.MultiverseRoot(ctx, proofType, idsToSync) + + // Special case for when the server doesn't yet implement the + // MultiverseRoot RPC. In this case, we simply return the original set + // of IDs and the "legacy" sync algorithm will be used. + if err != nil && taprpc.IsUnimplemented(err) { + return true, nil + } else if err != nil { + return false, fmt.Errorf("unable to fetch remote multiverse "+ + "root: %w", err) + } + + // Compare the local and remote transfer trees. If they differ, + // we need to sync all the transfer proofs. + remoteTree.WhenSome(func(root MultiverseRoot) { + remoteRootNode = root.Node + }) + + // TODO(guggero): Do an actual bi-sect here if there is no match. + // Do we need to return the left and right hashes of the tree to make + // this faster, so we can do a binary search? Then we would need to + // sort/split the IDs by their position in the tree though. + + return !mssmt.IsEqualNode(localRootNode, remoteRootNode), nil +} + // syncRoot attempts to sync the local Universe with the remote diff engine for // a specific base root. func (s *SimpleSyncer) syncRoot(ctx context.Context, remoteRoot Root, @@ -336,7 +468,7 @@ func (s *SimpleSyncer) syncRoot(ctx context.Context, remoteRoot Root, return err } - // If this is a tranfer tree, then we'll collect all the items as we + // If this is a transfer tree, then we'll collect all the items as we // need to sort them to ensure we can validate them in dep order. if !isIssuanceTree { transferLeaves := fn.Collect(transferLeafProofs) @@ -348,17 +480,13 @@ func (s *SimpleSyncer) syncRoot(ctx context.Context, remoteRoot Root, iRecord := proof.BlockHeightRecord(&iBlockHeight) jRecord := proof.BlockHeightRecord(&jBlockHeight) - _ = proof.SparseDecode( - //nolint:lll - bytes.NewReader(transferLeaves[i].Leaf.RawProof), - iRecord, - ) + _ = proof.SparseDecode(bytes.NewReader( + transferLeaves[i].Leaf.RawProof, + ), iRecord) - _ = proof.SparseDecode( - //nolint:lll - bytes.NewReader(transferLeaves[j].Leaf.RawProof), - jRecord, - ) + _ = proof.SparseDecode(bytes.NewReader( + transferLeaves[j].Leaf.RawProof, + ), jRecord) return iBlockHeight < jBlockHeight }) @@ -468,22 +596,22 @@ func (s *SimpleSyncer) SyncUniverse(ctx context.Context, host ServerAddr, // fetchAllRoots fetches all the roots from the remote Universe. This function // is used in order to isolate any logic related to the specifics of how we // fetch the data from the universe server. -func (s *SimpleSyncer) fetchAllRoots(ctx context.Context, diffEngine DiffEngine) ([]Root, error) { +func (s *SimpleSyncer) fetchAllRoots(ctx context.Context, + diffEngine DiffEngine) ([]Root, error) { + offset := int32(0) pageSize := defaultPageSize - roots := make([]Root, 0) + var roots []Root for { log.Debugf("Fetching roots in range: %v to %v", offset, offset+pageSize) - tempRoots, err := diffEngine.RootNodes( - ctx, RootNodesQuery{ - WithAmountsById: false, - SortDirection: SortAscending, - Offset: offset, - Limit: pageSize, - }, - ) + tempRoots, err := diffEngine.RootNodes(ctx, RootNodesQuery{ + WithAmountsById: false, + SortDirection: SortAscending, + Offset: offset, + Limit: pageSize, + }) if err != nil { return nil, err @@ -509,8 +637,8 @@ func (s *SimpleSyncer) fetchAllLeafKeys(ctx context.Context, // Initialize the offset to be used for the pages. offset := int32(0) pageSize := defaultPageSize - leafKeys := make([]LeafKey, 0) + var leafKeys []LeafKey for { tempRemoteKeys, err := diffEngine.UniverseLeafKeys( ctx, UniverseLeafKeysQuery{