Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: use ephemeral multiverse trees for sparse sync #742

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions perms/perms.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ var (
// gain a layer of DoS defense.
defaultMacaroonWhitelist = map[string]struct{}{
"/universerpc.Universe/AssetRoots": {},
"/universerpc.Universe/MultiverseRoot": {},
"/universerpc.Universe/QueryAssetRoots": {},
"/universerpc.Universe/AssetLeafKeys": {},
"/universerpc.Universe/AssetLeaves": {},
Expand Down
223 changes: 179 additions & 44 deletions tapdb/multiverse.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ const (
// transferMultiverseNS is the namespace used for the multiverse
// issuance proofs.
transferMultiverseNS = "multiverse-transfer"

// numCachedProofs is the number of universe proofs we'll cache.
numCachedProofs = 50_000

// numMaxCachedPages is the maximum number of pages we'll cache for a
// given page cache. Each page is 512 items, so we'll cache 10 of them,
// up to 5,120 for a given namespace.
numMaxCachedPages = 1000

// numCachedMultiverseLeaves is the number of multiverse leaves we'll
// cache.
numCachedMultiverseLeaves = 10_000
)

var (
Expand Down Expand Up @@ -123,9 +135,6 @@ func NewProofKey(id universe.Identifier, key universe.LeafKey) ProofKey {
return fn.ToArray[ProofKey](h.Sum(nil))
}

// numCachedProofs is the number of universe proofs we'll cache.
const numCachedProofs = 50_000

// cachedProof is a single cached proof.
type cachedProof []*universe.Proof

Expand All @@ -140,9 +149,7 @@ type leafProofCache = *lru.Cache[ProofKey, *cachedProof]

// newLeafCache creates a new leaf proof cache.
func newLeafCache() leafProofCache {
return lru.NewCache[ProofKey, *cachedProof](
numCachedProofs,
)
return lru.NewCache[ProofKey, *cachedProof](numCachedProofs)
}

// treeID is used to uniquely identify a multiverse tree.
Expand Down Expand Up @@ -383,11 +390,6 @@ func (c cachedLeafKeys) Size() (uint64, error) {
return uint64(1), nil
}

// numMaxCachedPages is the maximum number of pages we'll cache for a given
// page cache. Each page is 512 items, so we'll cache 10 of them, up to 5,120
// for a given namespace.
const numMaxCachedPages = 1000

// leafQuery is a wrapper around the existing UniverseLeafKeysQuery struct that
// doesn't include a pointer so it can be safely used as a map key.
type leafQuery struct {
Expand Down Expand Up @@ -440,8 +442,8 @@ func newUniverseLeafCache() *universeLeafCache {
}

// fetchLeafKeys reads the cached leaf keys for the given ID.
func (u *universeLeafCache) fetchLeafKeys(q universe.UniverseLeafKeysQuery,
) []universe.LeafKey {
func (u *universeLeafCache) fetchLeafKeys(
q universe.UniverseLeafKeysQuery) []universe.LeafKey {

idStr := treeID(q.Id.String())

Expand Down Expand Up @@ -484,7 +486,7 @@ func (u *universeLeafCache) cacheLeafKeys(q universe.UniverseLeafKeysQuery,
_, _ = u.leafCache.Put(idStr, pageCache)
}

// Add the to the page cache.
// Add the key to the page cache.
_, _ = pageCache.Put(newLeafQuery(q), &cachedKeys)
}

Expand All @@ -495,6 +497,88 @@ func (u *universeLeafCache) wipeCache(id treeID) {
u.leafCache.Delete(id)
}

// multiverseQueryToKey converts a multiverse query to a cache key.
func multiverseQueryToKey(q QueryMultiverseLeaves) treeID {
var idBytes []byte
if len(q.GroupKey) > 0 {
idBytes = fn.ByteSlice(sha256.Sum256(q.GroupKey))
} else {
idBytes = q.AssetID
}
return treeID(fmt.Sprintf("%s-%x", q.ProofType, idBytes))
}

// cachedMultiverseLeaf is used to cache the set of multiverse leaves.
type cachedMultiverseLeaf struct {
*universe.MultiverseLeaf
}

// Size just returns 1, as we cache based on the total number of leaves.
func (c *cachedMultiverseLeaf) Size() (uint64, error) {
return uint64(1), nil
}

// multiverseLeafCache is used to cache the set of multiverse leaves.
type multiverseLeafCache struct {
sync.Mutex

leafCache *lru.Cache[treeID, *cachedMultiverseLeaf]

*cacheLogger
}

// newMultiverseLeafCache creates a new multiverse leaf cache.
func newMultiverseLeafCache() *multiverseLeafCache {
return &multiverseLeafCache{
leafCache: lru.NewCache[treeID, *cachedMultiverseLeaf](
numCachedMultiverseLeaves,
),
cacheLogger: newCacheLogger("multiverse_leaves"),
}
}

// fetchLeafKeys reads the cached leaf keys for the given ID.
func (u *multiverseLeafCache) fetchMultiverseLeaf(
q QueryMultiverseLeaves) *universe.MultiverseLeaf {

// We only cache queries for specific leaves, so we'll return nothing
// if we don't have either an asset ID or group key.
if len(q.AssetID) == 0 && len(q.GroupKey) == 0 {
return nil
}

cacheKey := multiverseQueryToKey(q)

cachedLeaf, err := u.leafCache.Get(cacheKey)
if err == nil {
u.Hit()
return cachedLeaf.MultiverseLeaf
}

u.Miss()

return nil
}

// cacheLeafKeys stores the given leaf keys in the cache.
func (u *multiverseLeafCache) cacheMultiverseLeaf(q QueryMultiverseLeaves,
leaf *universe.MultiverseLeaf) {

cacheKey := multiverseQueryToKey(q)

// Add the key to the page cache.
_, _ = u.leafCache.Put(cacheKey, &cachedMultiverseLeaf{
MultiverseLeaf: leaf,
})
}

// wipeCache wipes the cache entry of multiverse leaves for a given universe ID.
func (u *multiverseLeafCache) wipeCache(id treeID) {
log.Debugf("wiping leaf keys for %x in cache", id)

u.leafCache.Delete(id)
}

// MultiverseStore implements the persistent storage for a multiverse.
//
// NOTE: This implements the universe.MultiverseArchive interface.
Expand All @@ -506,15 +590,18 @@ type MultiverseStore struct {
proofCache *proofCache

leafKeysCache *universeLeafCache

multiverseLeafCache *multiverseLeafCache
}

// NewMultiverseStore creates a new multiverse DB store handle.
func NewMultiverseStore(db BatchedMultiverse) *MultiverseStore {
return &MultiverseStore{
db: db,
rootNodeCache: newRootNodeCache(),
proofCache: newProofCache(),
leafKeysCache: newUniverseLeafCache(),
db: db,
rootNodeCache: newRootNodeCache(),
proofCache: newProofCache(),
leafKeysCache: newUniverseLeafCache(),
multiverseLeafCache: newMultiverseLeafCache(),
}
}

Expand All @@ -534,7 +621,8 @@ func namespaceForProof(proofType universe.ProofType) (string, error) {
}

// MultiverseRootNode returns the root multiverse node for the given proof
// type.
// type. If no multiverse root exists (yet), then ErrNoMultiverseRoot is
// returned.
func (b *MultiverseStore) MultiverseRootNode(ctx context.Context,
proofType universe.ProofType) (fn.Option[universe.MultiverseRoot],
error) {
Expand Down Expand Up @@ -929,6 +1017,7 @@ func (b *MultiverseStore) UpsertProofLeaf(ctx context.Context,
b.rootNodeCache.wipeCache()
b.proofCache.delProofsForAsset(id)
b.leafKeysCache.wipeCache(idStr)
b.multiverseLeafCache.wipeCache(idStr)

return issuanceProof, nil
}
Expand Down Expand Up @@ -984,6 +1073,7 @@ func (b *MultiverseStore) UpsertProofLeafBatch(ctx context.Context,
for id := range idsToDelete {
b.proofCache.Delete(id)
b.leafKeysCache.wipeCache(id)
b.multiverseLeafCache.wipeCache(id)
}

return nil
Expand Down Expand Up @@ -1023,6 +1113,7 @@ func (b *MultiverseStore) DeleteUniverse(ctx context.Context,
idStr := treeID(id.String())
b.proofCache.Delete(idStr)
b.leafKeysCache.wipeCache(idStr)
b.multiverseLeafCache.wipeCache(idStr)

return id.String(), dbErr
}
Expand Down Expand Up @@ -1071,35 +1162,14 @@ func (b *MultiverseStore) FetchLeaves(ctx context.Context,
leaves = nil

for _, query := range queries {
dbLeaves, err := q.QueryMultiverseLeaves(ctx, query)
leavesForQuery, err := b.queryMultiverseLeaf(
ctx, proofType, query, q,
)
if err != nil {
return err
}

for _, leaf := range dbLeaves {
var id universe.Identifier

id.ProofType = proofType
if len(leaf.AssetID) > 0 {
copy(id.AssetID[:], leaf.AssetID)
}
if len(leaf.GroupKey) > 0 {
id.GroupKey, err = schnorr.ParsePubKey(
leaf.GroupKey,
)
if err != nil {
return err
}
}

leaves = append(leaves, universe.MultiverseLeaf{
ID: id,
LeafNode: mssmt.NewLeafNode(
leaf.UniverseRootHash,
uint64(leaf.UniverseRootSum),
),
})
}
leaves = append(leaves, leavesForQuery...)
}
return nil
})
Expand All @@ -1109,3 +1179,68 @@ func (b *MultiverseStore) FetchLeaves(ctx context.Context,

return leaves, nil
}

// queryMultiverseLeaf returns the multiverse leaves that match the given query,
// either from the cache or the backing database.
func (b *MultiverseStore) queryMultiverseLeaf(ctx context.Context,
proofType universe.ProofType, query QueryMultiverseLeaves,
q BaseMultiverseStore) ([]universe.MultiverseLeaf, error) {

// Ask our cache first.
cachedLeaf := b.multiverseLeafCache.fetchMultiverseLeaf(query)
if cachedLeaf != nil {
// We know that the cache is only populated with a single leaf,
// so we can just return that.
return []universe.MultiverseLeaf{*cachedLeaf}, nil
}

dbLeaves, err := q.QueryMultiverseLeaves(ctx, query)
if err != nil {
return nil, err
}

b.multiverseLeafCache.Lock()
defer b.multiverseLeafCache.Unlock()

// While we were waiting for the lock, the cache might have been
// populated, so we'll check that now.
cachedLeaf = b.multiverseLeafCache.fetchMultiverseLeaf(query)
if cachedLeaf != nil {
// We know that the cache is only populated with a single leaf,
// so we can just return that.
return []universe.MultiverseLeaf{*cachedLeaf}, nil
}

var leaves []universe.MultiverseLeaf
for _, leaf := range dbLeaves {
var id universe.Identifier

id.ProofType = proofType
if len(leaf.AssetID) > 0 {
copy(id.AssetID[:], leaf.AssetID)
}
if len(leaf.GroupKey) > 0 {
id.GroupKey, err = schnorr.ParsePubKey(
leaf.GroupKey,
)
if err != nil {
return nil, err
}
}

multiverseLeaf := universe.MultiverseLeaf{
ID: id,
LeafNode: mssmt.NewLeafNode(
leaf.UniverseRootHash,
uint64(leaf.UniverseRootSum),
),
}
b.multiverseLeafCache.cacheMultiverseLeaf(
query, &multiverseLeaf,
)

leaves = append(leaves, multiverseLeaf)
}

return leaves, nil
}
4 changes: 2 additions & 2 deletions tapdb/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (s *PostgresStore) ExecuteMigrations(target MigrationTarget) error {

// NewTestPostgresDB is a helper function that creates a Postgres database for
// testing.
func NewTestPostgresDB(t *testing.T) *PostgresStore {
func NewTestPostgresDB(t testing.TB) *PostgresStore {
t.Helper()

t.Logf("Creating new Postgres DB for testing")
Expand All @@ -175,7 +175,7 @@ func NewTestPostgresDB(t *testing.T) *PostgresStore {

// NewTestPostgresDBWithVersion is a helper function that creates a Postgres
// database for testing and migrates it to the given version.
func NewTestPostgresDBWithVersion(t *testing.T, version uint) *PostgresStore {
func NewTestPostgresDBWithVersion(t testing.TB, version uint) *PostgresStore {
t.Helper()

t.Logf("Creating new Postgres DB for testing, migrating to version %d",
Expand Down
6 changes: 3 additions & 3 deletions tapdb/postgres_fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type TestPgFixture struct {
// NewTestPgFixture constructs a new TestPgFixture starting up a docker
// container running Postgres 11. The started container will expire in after
// the passed duration.
func NewTestPgFixture(t *testing.T, expiry time.Duration,
func NewTestPgFixture(t testing.TB, expiry time.Duration,
autoRemove bool) *TestPgFixture {

// Use a sensible default on Windows (tcp/http) and linux/osx (socket)
Expand Down Expand Up @@ -77,7 +77,7 @@ func NewTestPgFixture(t *testing.T, expiry time.Duration,
port: int(port),
}
databaseURL := fixture.GetDSN()
log.Infof("Connecting to Postgres fixture: %v\n", databaseURL)
log.Infof("Connecting to Postgres fixture: %v", databaseURL)

// Tell docker to hard kill the container in "expiry" seconds.
require.NoError(t, resource.Expire(uint(expiry.Seconds())))
Expand Down Expand Up @@ -122,7 +122,7 @@ func (f *TestPgFixture) GetConfig() *PostgresConfig {
}

// TearDown stops the underlying docker container.
func (f *TestPgFixture) TearDown(t *testing.T) {
func (f *TestPgFixture) TearDown(t testing.TB) {
err := f.pool.Purge(f.resource)
require.NoError(t, err, "Could not purge resource")
}
Expand Down
9 changes: 9 additions & 0 deletions tapdb/sqlc/migrations/000015_multiverse_indexes.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
DROP INDEX IF EXISTS mssmt_nodes_key_idx;

DROP INDEX IF EXISTS multiverse_roots_namespace_root_idx;
DROP INDEX IF EXISTS multiverse_roots_proof_type_idx;

DROP INDEX IF EXISTS multiverse_leaves_multiverse_root_id_idx;

DROP INDEX IF EXISTS multiverse_leaves_asset_id_idx;
DROP INDEX IF EXISTS multiverse_leaves_group_key_idx;
9 changes: 9 additions & 0 deletions tapdb/sqlc/migrations/000015_multiverse_indexes.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE INDEX IF NOT EXISTS mssmt_nodes_key_idx ON mssmt_nodes(key);

CREATE INDEX IF NOT EXISTS multiverse_roots_namespace_root_idx ON multiverse_roots(namespace_root);
CREATE INDEX IF NOT EXISTS multiverse_roots_proof_type_idx ON multiverse_roots(proof_type);

CREATE INDEX IF NOT EXISTS multiverse_leaves_multiverse_root_id_idx ON multiverse_leaves(multiverse_root_id);

CREATE INDEX IF NOT EXISTS multiverse_leaves_asset_id_idx ON multiverse_leaves(asset_id);
CREATE INDEX IF NOT EXISTS multiverse_leaves_group_key_idx ON multiverse_leaves(group_key);
Loading
Loading