Skip to content

Commit

Permalink
taprpc+universe: implement sparse universe sync
Browse files Browse the repository at this point in the history
  • Loading branch information
guggero committed Dec 18, 2023
1 parent f51712c commit 6fd49e4
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 29 deletions.
17 changes: 16 additions & 1 deletion taprpc/taprpc_utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package taprpc

import "google.golang.org/protobuf/encoding/protojson"
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
)

var (
// ProtoJSONMarshalOpts is a struct that holds the default marshal
Expand Down Expand Up @@ -40,3 +44,14 @@ var (
UseHexForBytes: true,
}
)

// IsUnimplemented returns true if the error is a gRPC error with the code
// Unimplemented.
func IsUnimplemented(err error) bool {
s, ok := status.FromError(err)
if !ok {
return false
}

return s.Code() == codes.Unimplemented
}
184 changes: 156 additions & 28 deletions universe/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/lightninglabs/taproot-assets/fn"
"github.com/lightninglabs/taproot-assets/mssmt"
"github.com/lightninglabs/taproot-assets/proof"
"github.com/lightninglabs/taproot-assets/taprpc"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -113,6 +114,19 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine,
// only fetch roots for those universes. We won't filter out any
// Universes here as we assume that the caller has already done so.
case len(idsToSync) != 0:
// We attempt to bisect the set of IDs we really need to sync by
// using ephemeral multiverse trees and a bisect algorithm to
// find the diffs in the root nodes. This allows us to more
// efficiently find out which roots we need to sync compared to
// querying the remote server for each root individually.
idsToSync, err = s.bisectOutdatedRoots(
ctx, idsToSync, diffEngine,
)
if err != nil {
return nil, fmt.Errorf("unable to bisect outdated "+
"roots: %w", err)
}

targetRoots, err = fetchRootsForIDs(ctx, idsToSync, diffEngine)
if err != nil {
return nil, err
Expand All @@ -123,6 +137,10 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine,
case globalInsertEnabled:
log.Infof("Fetching all roots for remote Universe server...")

// Since we're also interested in learning about _new_ universes
// in this case, we can't use the bisect algorithm to find the
// diffs in the root nodes. Instead, we'll just fetch all the
// roots from the remote server.
targetRoots, err = s.fetchAllRoots(ctx, diffEngine)
if err != nil {
return nil, err
Expand All @@ -140,18 +158,26 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine,
// configs.
default:
var uniIDs []Identifier

for _, uniSyncCfg := range syncConfigs.UniSyncConfigs {
// Check with the filter to ensure that the universe is
// applicable for syncing. If not, we would have
// retrieved the corresponding root in vain.
if uniIdSyncFilter(uniSyncCfg.UniverseID) {
uniIDs = append(
uniIDs, uniSyncCfg.UniverseID,
)
uniIDs = append(uniIDs, uniSyncCfg.UniverseID)
}
}

// We attempt to bisect the set of IDs we really need to sync by
// using ephemeral multiverse trees and a bisect algorithm to
// find the diffs in the root nodes. This allows us to more
// efficiently find out which roots we need to sync compared to
// querying the remote server for each root individually.
uniIDs, err = s.bisectOutdatedRoots(ctx, uniIDs, diffEngine)
if err != nil {
return nil, fmt.Errorf("unable to bisect outdated "+
"roots: %w", err)
}

// Retrieve roots for the gathered set of universes.
targetRoots, err = fetchRootsForIDs(ctx, uniIDs, diffEngine)
if err != nil {
Expand Down Expand Up @@ -190,8 +216,7 @@ func fetchRootsForIDs(ctx context.Context, idsToSync []Identifier,
// as a series of parallel requests backed by a worker pool.
rootsToSync := make(chan Root, len(idsToSync))
err := fn.ParSlice(
ctx, idsToSync,
func(ctx context.Context, id Identifier) error {
ctx, idsToSync, func(ctx context.Context, id Identifier) error {
root, err := diffEngine.RootNode(ctx, id)
if err != nil {
return err
Expand All @@ -209,6 +234,113 @@ func fetchRootsForIDs(ctx context.Context, idsToSync []Identifier,
return fn.Collect(rootsToSync), nil
}

// bisectOutdatedRoots attempts to bisect the set of IDs we need to sync by
// using ephemeral multiverse trees and a bisect algorithm to find the diffs in
// the root nodes. This allows us to more efficiently find out which roots we
// need to sync compared to querying the remote server for each root
// individually. If the server doesn't yet implement the MultiverseRoot RPC, we
// simply return the original set of IDs and the "legacy" sync algorithm will be
// used.
func (s *SimpleSyncer) bisectOutdatedRoots(ctx context.Context,
idsToSync []Identifier, diffEngine DiffEngine) ([]Identifier, error) {

issuanceIDs := make([]Identifier, 0, len(idsToSync))
transferIDs := make([]Identifier, 0, len(idsToSync))
for _, id := range idsToSync {
switch id.ProofType {
case ProofTypeIssuance:
issuanceIDs = append(issuanceIDs, id)

case ProofTypeTransfer:
transferIDs = append(transferIDs, id)

case ProofTypeUnspecified:
issuanceID := id
issuanceID.ProofType = ProofTypeIssuance
issuanceIDs = append(issuanceIDs, issuanceID)

transferID := id
transferID.ProofType = ProofTypeTransfer
transferIDs = append(transferIDs, transferID)
}
}

targetIDs := make([]Identifier, 0, len(idsToSync))

// Compare the local and remote issuance trees.
if len(issuanceIDs) > 0 {
outdated, err := s.rootsOutdated(
ctx, ProofTypeIssuance, issuanceIDs, diffEngine,
)
if err != nil {
return nil, err
}

if outdated {
targetIDs = append(targetIDs, issuanceIDs...)
}
}

// Compare the local and remote transfer trees.
if len(transferIDs) > 0 {
outdated, err := s.rootsOutdated(
ctx, ProofTypeTransfer, transferIDs, diffEngine,
)
if err != nil {
return nil, err
}

if outdated {
targetIDs = append(targetIDs, transferIDs...)
}
}

return targetIDs, nil
}

// rootsOutdated returns true if the roots for the given IDs are outdated and
// need to be synced.
func (s *SimpleSyncer) rootsOutdated(ctx context.Context, proofType ProofType,
idsToSync []Identifier, diffEngine DiffEngine) (bool, error) {

var localRootNode, remoteRootNode mssmt.Node
localTree, err := s.cfg.LocalDiffEngine.MultiverseRoot(
ctx, proofType, idsToSync,
)
if err != nil {
return false, fmt.Errorf("unable to fetch local multiverse "+
"root: %w", err)
}
localTree.WhenSome(func(root MultiverseRoot) {
localRootNode = root.Node
})

remoteTree, err := diffEngine.MultiverseRoot(ctx, proofType, idsToSync)

// Special case for when the server doesn't yet implement the
// MultiverseRoot RPC. In this case, we simply return the original set
// of IDs and the "legacy" sync algorithm will be used.
if err != nil && taprpc.IsUnimplemented(err) {
return true, nil
} else if err != nil {
return false, fmt.Errorf("unable to fetch remote multiverse "+
"root: %w", err)
}

// Compare the local and remote transfer trees. If they differ,
// we need to sync all the transfer proofs.
remoteTree.WhenSome(func(root MultiverseRoot) {
remoteRootNode = root.Node
})

// TODO(guggero): Do an actual bi-sect here if there is no match.
// Do we need to return the left and right hashes of the tree to make
// this faster, so we can do a binary search? Then we would need to
// sort/split the IDs by their position in the tree though.

return !mssmt.IsEqualNode(localRootNode, remoteRootNode), nil
}

// syncRoot attempts to sync the local Universe with the remote diff engine for
// a specific base root.
func (s *SimpleSyncer) syncRoot(ctx context.Context, remoteRoot Root,
Expand Down Expand Up @@ -336,7 +468,7 @@ func (s *SimpleSyncer) syncRoot(ctx context.Context, remoteRoot Root,
return err
}

// If this is a tranfer tree, then we'll collect all the items as we
// If this is a transfer tree, then we'll collect all the items as we
// need to sort them to ensure we can validate them in dep order.
if !isIssuanceTree {
transferLeaves := fn.Collect(transferLeafProofs)
Expand All @@ -348,17 +480,13 @@ func (s *SimpleSyncer) syncRoot(ctx context.Context, remoteRoot Root,
iRecord := proof.BlockHeightRecord(&iBlockHeight)
jRecord := proof.BlockHeightRecord(&jBlockHeight)

_ = proof.SparseDecode(
//nolint:lll
bytes.NewReader(transferLeaves[i].Leaf.RawProof),
iRecord,
)
_ = proof.SparseDecode(bytes.NewReader(
transferLeaves[i].Leaf.RawProof,
), iRecord)

_ = proof.SparseDecode(
//nolint:lll
bytes.NewReader(transferLeaves[j].Leaf.RawProof),
jRecord,
)
_ = proof.SparseDecode(bytes.NewReader(
transferLeaves[j].Leaf.RawProof,
), jRecord)

return iBlockHeight < jBlockHeight
})
Expand Down Expand Up @@ -468,22 +596,22 @@ func (s *SimpleSyncer) SyncUniverse(ctx context.Context, host ServerAddr,
// fetchAllRoots fetches all the roots from the remote Universe. This function
// is used in order to isolate any logic related to the specifics of how we
// fetch the data from the universe server.
func (s *SimpleSyncer) fetchAllRoots(ctx context.Context, diffEngine DiffEngine) ([]Root, error) {
func (s *SimpleSyncer) fetchAllRoots(ctx context.Context,
diffEngine DiffEngine) ([]Root, error) {

offset := int32(0)
pageSize := defaultPageSize
roots := make([]Root, 0)

var roots []Root
for {
log.Debugf("Fetching roots in range: %v to %v", offset,
offset+pageSize)
tempRoots, err := diffEngine.RootNodes(
ctx, RootNodesQuery{
WithAmountsById: false,
SortDirection: SortAscending,
Offset: offset,
Limit: pageSize,
},
)
tempRoots, err := diffEngine.RootNodes(ctx, RootNodesQuery{
WithAmountsById: false,
SortDirection: SortAscending,
Offset: offset,
Limit: pageSize,
})

if err != nil {
return nil, err
Expand All @@ -509,8 +637,8 @@ func (s *SimpleSyncer) fetchAllLeafKeys(ctx context.Context,
// Initialize the offset to be used for the pages.
offset := int32(0)
pageSize := defaultPageSize
leafKeys := make([]LeafKey, 0)

var leafKeys []LeafKey
for {
tempRemoteKeys, err := diffEngine.UniverseLeafKeys(
ctx, UniverseLeafKeysQuery{
Expand Down

0 comments on commit 6fd49e4

Please sign in to comment.