From 582e4827aed6ee301ceade9bc4484187bca06187 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Sun, 15 Aug 2021 15:51:34 +0200 Subject: [PATCH] channeldb: node channels cache [poc] --- channeldb/channel_cache.go | 53 +++++++++++++++++++++++++++++++------- channeldb/graph.go | 47 +++++++++++++++++++++++++++++++-- 2 files changed, 88 insertions(+), 12 deletions(-) diff --git a/channeldb/channel_cache.go b/channeldb/channel_cache.go index 2f26c185fb..0e6ce44c87 100644 --- a/channeldb/channel_cache.go +++ b/channeldb/channel_cache.go @@ -1,11 +1,15 @@ package channeldb +import "github.com/lightningnetwork/lnd/routing/route" + // channelCache is an in-memory cache used to improve the performance of // ChanUpdatesInHorizon. It caches the chan info and edge policies for a // particular channel. type channelCache struct { n int channels map[uint64]ChannelEdge + + nodeChannels map[route.Vertex][]ChannelEdge } // newChannelCache creates a new channelCache with maximum capacity of n @@ -14,6 +18,8 @@ func newChannelCache(n int) *channelCache { return &channelCache{ n: n, channels: make(map[uint64]ChannelEdge), + + nodeChannels: make(map[route.Vertex][]ChannelEdge), } } @@ -23,28 +29,55 @@ func (c *channelCache) get(chanid uint64) (ChannelEdge, bool) { return channel, ok } +func (c *channelCache) getNodeChannels(node route.Vertex) ([]ChannelEdge, bool) { + channels, ok := c.nodeChannels[node] + return channels, ok +} + +func (c *channelCache) insertNodeChannels(node route.Vertex, channels []ChannelEdge) { + // TODO: Manage cache capacity. + + c.nodeChannels[node] = channels +} + // insert adds the entry to the channel cache. If an entry for chanid already // exists, it will be replaced with the new entry. If the entry doesn't exist, // it will be inserted to the cache, performing a random eviction if the cache // is at capacity. func (c *channelCache) insert(chanid uint64, channel ChannelEdge) { - // If entry exists, replace it. - if _, ok := c.channels[chanid]; ok { - c.channels[chanid] = channel - return - } + if _, ok := c.channels[chanid]; !ok { + if len(c.channels) == c.n { + for id := range c.channels { + channel := c.channels[id] - // Otherwise, evict an entry at random and insert. - if len(c.channels) == c.n { - for id := range c.channels { - delete(c.channels, id) - break + delete(c.channels, id) + + // Invalidate node channels cache. + delete(c.nodeChannels, channel.Info.NodeKey1Bytes) + delete(c.nodeChannels, channel.Info.NodeKey2Bytes) + + break + } } } + c.channels[chanid] = channel + + // Invalidate node channels cache. + delete(c.nodeChannels, channel.Info.NodeKey1Bytes) + delete(c.nodeChannels, channel.Info.NodeKey2Bytes) } // remove deletes an edge for chanid from the cache, if it exists. func (c *channelCache) remove(chanid uint64) { + channel, ok := c.channels[chanid] + if !ok { + return + } + delete(c.channels, chanid) + + // Invalidate node channels cache. + delete(c.nodeChannels, channel.Info.NodeKey1Bytes) + delete(c.nodeChannels, channel.Info.NodeKey2Bytes) } diff --git a/channeldb/graph.go b/channeldb/graph.go index 678b7ac06c..cc60c7477a 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -284,9 +284,48 @@ func (c *ChannelGraph) ForEachNodeChannel(tx kvdb.RTx, nodePub []byte, cb func(kvdb.RTx, *ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy) error) error { - db := c.db + var channels []ChannelEdge - return nodeTraversal(tx, nodePub, db, cb) + node, err := route.NewVertexFromBytes(nodePub) + if err != nil { + return err + } + + // Retrieve channels from cache if possible. + var ok bool + channels, ok = c.chanCache.getNodeChannels(node) + ok = false + if !ok { + // Cache miss, retrieve from database. + db := c.db + + add := func(_ kvdb.RTx, info *ChannelEdgeInfo, + policy1 *ChannelEdgePolicy, policy2 *ChannelEdgePolicy) error { + + channels = append(channels, ChannelEdge{ + Info: info, + Policy1: policy1, + Policy2: policy2, + }) + + return nil + } + if err := nodeTraversal(tx, nodePub, db, add); err != nil { + return err + } + + // Store in cache. + c.chanCache.insertNodeChannels(node, channels) + } + + // Execute callback. + for _, channel := range channels { + err := cb(tx, channel.Info, channel.Policy1, channel.Policy2) + if err != nil { + return err + } + } + return nil } // DisabledChannelIDs returns the channel ids of disabled channels. @@ -2132,6 +2171,10 @@ func (c *ChannelGraph) updateEdgeCache(e *ChannelEdgePolicy, isUpdate1 bool) { } c.chanCache.insert(e.ChannelID, channel) } + + // Short-cut: invalidate all cached node channels. This needs to be + // replaced by more selective invalidation or an update. + c.chanCache.nodeChannels = make(map[route.Vertex][]ChannelEdge) } // updateEdgePolicy attempts to update an edge's policy within the relevant