Skip to content

Commit

Permalink
feat: proxy all delegated routing
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias committed Nov 27, 2023
1 parent 0d72338 commit 692345c
Show file tree
Hide file tree
Showing 4 changed files with 442 additions and 225 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
github.com/libp2p/go-libp2p v0.32.0
github.com/libp2p/go-libp2p-kad-dht v0.23.0
github.com/libp2p/go-libp2p-record v0.2.0
github.com/libp2p/go-libp2p-routing-helpers v0.7.0
github.com/multiformats/go-multibase v0.2.0
github.com/urfave/cli/v2 v2.25.7
)
Expand Down Expand Up @@ -57,6 +56,7 @@ require (
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect
github.com/libp2p/go-libp2p-kbucket v0.5.0 // indirect
github.com/libp2p/go-libp2p-routing-helpers v0.7.0 // indirect
github.com/libp2p/go-libp2p-xor v0.1.0 // indirect
github.com/libp2p/go-msgio v0.3.0 // indirect
github.com/libp2p/go-nat v0.2.0 // indirect
Expand Down
236 changes: 12 additions & 224 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,12 @@ import (
"log"
"net/http"
"strconv"
"time"

"github.com/ipfs/boxo/ipns"
"github.com/ipfs/boxo/routing/http/client"
"github.com/ipfs/boxo/routing/http/contentrouter"
"github.com/ipfs/boxo/routing/http/server"
"github.com/ipfs/boxo/routing/http/types"
"github.com/ipfs/boxo/routing/http/types/iter"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
record "github.com/libp2p/go-libp2p-record"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
)
Expand All @@ -33,7 +23,7 @@ func start(ctx context.Context, port int, runAcceleratedDHTClient bool, contentE

var dhtRouting routing.Routing
if runAcceleratedDHTClient {
wrappedDHT, err := newWrappedStandardAndAcceleratedDHTClient(ctx, h)
wrappedDHT, err := newBundledDHT(ctx, h)
if err != nil {
return err
}
Expand Down Expand Up @@ -61,16 +51,14 @@ func start(ctx context.Context, port int, runAcceleratedDHTClient bool, contentE
return err
}

proxy := &delegatedRoutingProxy{
cr: crRouters,
pr: prRouters,
vs: ipnsRouters,
}

log.Printf("Listening on http://0.0.0.0:%d", port)
log.Printf("Delegated Routing API on http://127.0.0.1:%d/routing/v1", port)

http.Handle("/", server.Handler(proxy))
http.Handle("/", server.Handler(&composableRouter{
providers: crRouters,
peers: prRouters,
ipns: ipnsRouters,
}))
return http.ListenAndServe(":"+strconv.Itoa(port), nil)
}

Expand Down Expand Up @@ -108,222 +96,22 @@ func newHost(highOutboundLimits bool) (host.Host, error) {
return h, nil
}

type wrappedStandardAndAcceleratedDHTClient struct {
standard *dht.IpfsDHT
accelerated *fullrt.FullRT
}

func newWrappedStandardAndAcceleratedDHTClient(ctx context.Context, h host.Host) (routing.Routing, error) {
standardDHT, err := dht.New(ctx, h, dht.Mode(dht.ModeClient), dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...))
if err != nil {
return nil, err
}

acceleratedDHT, err := fullrt.NewFullRT(h, "/ipfs",
fullrt.DHTOption(
dht.BucketSize(20),
dht.Validator(record.NamespacedValidator{
"pk": record.PublicKeyValidator{},
"ipns": ipns.Validator{},
}),
dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...),
dht.Mode(dht.ModeClient),
))
if err != nil {
return nil, err
}

return &wrappedStandardAndAcceleratedDHTClient{
standard: standardDHT,
accelerated: acceleratedDHT,
}, nil
}

func (w *wrappedStandardAndAcceleratedDHTClient) Provide(ctx context.Context, c cid.Cid, b bool) error {
if w.accelerated.Ready() {
return w.accelerated.Provide(ctx, c, b)
}
return w.standard.Provide(ctx, c, b)
}

func (w *wrappedStandardAndAcceleratedDHTClient) FindProvidersAsync(ctx context.Context, c cid.Cid, i int) <-chan peer.AddrInfo {
if w.accelerated.Ready() {
return w.accelerated.FindProvidersAsync(ctx, c, i)
}
return w.standard.FindProvidersAsync(ctx, c, i)
}

func (w *wrappedStandardAndAcceleratedDHTClient) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) {
if w.accelerated.Ready() {
return w.accelerated.FindPeer(ctx, p)
}
return w.standard.FindPeer(ctx, p)
}

func (w *wrappedStandardAndAcceleratedDHTClient) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) error {
if w.accelerated.Ready() {
return w.accelerated.PutValue(ctx, key, value, opts...)
}
return w.standard.PutValue(ctx, key, value, opts...)
}

func (w *wrappedStandardAndAcceleratedDHTClient) GetValue(ctx context.Context, s string, opts ...routing.Option) ([]byte, error) {
if w.accelerated.Ready() {
return w.accelerated.GetValue(ctx, s, opts...)
}
return w.standard.GetValue(ctx, s, opts...)
}

func (w *wrappedStandardAndAcceleratedDHTClient) SearchValue(ctx context.Context, s string, opts ...routing.Option) (<-chan []byte, error) {
if w.accelerated.Ready() {
return w.accelerated.SearchValue(ctx, s, opts...)
}
return w.standard.SearchValue(ctx, s, opts...)
}

func (w *wrappedStandardAndAcceleratedDHTClient) Bootstrap(ctx context.Context) error {
return w.standard.Bootstrap(ctx)
}

func getCombinedRouting(endpoints []string, dht routing.Routing) (routing.Routing, error) {
func getCombinedRouting(endpoints []string, dht routing.Routing) (server.ContentRouter, error) {
if len(endpoints) == 0 {
return dht, nil
return dhtRouter{dht: dht}, nil
}

var routers []routing.Routing
var routers []server.ContentRouter

for _, endpoint := range endpoints {
drclient, err := client.New(endpoint)
if err != nil {
return nil, err
}
routers = append(routers, newWrappedDelegatedRouting(drclient))
routers = append(routers, wrappedClient{Client: drclient})
}

return routinghelpers.Parallel{
Routers: append(routers, dht),
return parallelRouter{
routers: append(routers, dhtRouter{dht: dht}),
}, nil
}

type wrappedDelegatedRouting struct {
routing.ValueStore
routing.PeerRouting
routing.ContentRouting
}

func newWrappedDelegatedRouting(drc *client.Client) routing.Routing {
v := contentrouter.NewContentRoutingClient(drc)

return &wrappedDelegatedRouting{
ValueStore: v,
PeerRouting: v,
ContentRouting: v,
}
}

func (c *wrappedDelegatedRouting) Bootstrap(ctx context.Context) error {
return routing.ErrNotSupported
}

type delegatedRoutingProxy struct {
cr routing.ContentRouting
pr routing.PeerRouting
vs routing.ValueStore
}

func (d *delegatedRoutingProxy) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) {
ctx, cancel := context.WithCancel(ctx)
ch := d.cr.FindProvidersAsync(ctx, key, limit)
return iter.ToResultIter[types.Record](&peerChanIter{
ch: ch,
cancel: cancel,
}), nil
}

//lint:ignore SA1019 // ignore staticcheck
func (d *delegatedRoutingProxy) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) {
return 0, routing.ErrNotSupported
}

func (d *delegatedRoutingProxy) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

addr, err := d.pr.FindPeer(ctx, pid)
if err != nil {
return nil, err
}

rec := &types.PeerRecord{
Schema: types.SchemaPeer,
ID: &addr.ID,
}

for _, addr := range addr.Addrs {
rec.Addrs = append(rec.Addrs, types.Multiaddr{Multiaddr: addr})
}

return iter.ToResultIter[*types.PeerRecord](iter.FromSlice[*types.PeerRecord]([]*types.PeerRecord{rec})), nil
}

func (d *delegatedRoutingProxy) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

raw, err := d.vs.GetValue(ctx, string(name.RoutingKey()))
if err != nil {
return nil, err
}

return ipns.UnmarshalRecord(raw)
}

func (d *delegatedRoutingProxy) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

raw, err := ipns.MarshalRecord(record)
if err != nil {
return err
}

return d.vs.PutValue(ctx, string(name.RoutingKey()), raw)
}

type peerChanIter struct {
ch <-chan peer.AddrInfo
cancel context.CancelFunc
next *peer.AddrInfo
}

func (it *peerChanIter) Next() bool {
addr, ok := <-it.ch
if ok {
it.next = &addr
return true
}
it.next = nil
return false
}

func (it *peerChanIter) Val() types.Record {
if it.next == nil {
return nil
}

rec := &types.PeerRecord{
Schema: types.SchemaPeer,
ID: &it.next.ID,
}

for _, addr := range it.next.Addrs {
rec.Addrs = append(rec.Addrs, types.Multiaddr{Multiaddr: addr})
}

return rec
}

func (it *peerChanIter) Close() error {
it.cancel()
return nil
}
80 changes: 80 additions & 0 deletions server_dht.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package main

import (
"context"

"github.com/ipfs/boxo/ipns"
"github.com/ipfs/go-cid"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
)

type bundledDHT struct {
standard *dht.IpfsDHT
fullRT *fullrt.FullRT
}

func newBundledDHT(ctx context.Context, h host.Host) (routing.Routing, error) {
standardDHT, err := dht.New(ctx, h, dht.Mode(dht.ModeClient), dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...))
if err != nil {
return nil, err
}

fullRT, err := fullrt.NewFullRT(h, "/ipfs",
fullrt.DHTOption(
dht.BucketSize(20),
dht.Validator(record.NamespacedValidator{
"pk": record.PublicKeyValidator{},
"ipns": ipns.Validator{},
}),
dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...),
dht.Mode(dht.ModeClient),
))
if err != nil {
return nil, err
}

return &bundledDHT{
standard: standardDHT,
fullRT: fullRT,
}, nil
}

func (b *bundledDHT) getDHT() routing.Routing {
if b.fullRT.Ready() {
return b.fullRT
}
return b.standard
}

func (b *bundledDHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error {
return b.getDHT().Provide(ctx, c, brdcst)
}

func (b *bundledDHT) FindProvidersAsync(ctx context.Context, c cid.Cid, i int) <-chan peer.AddrInfo {
return b.getDHT().FindProvidersAsync(ctx, c, i)
}

func (b *bundledDHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) {
return b.getDHT().FindPeer(ctx, id)
}

func (b *bundledDHT) PutValue(ctx context.Context, k string, v []byte, option ...routing.Option) error {
return b.getDHT().PutValue(ctx, k, v, option...)
}

func (b *bundledDHT) GetValue(ctx context.Context, s string, option ...routing.Option) ([]byte, error) {
return b.getDHT().GetValue(ctx, s, option...)
}

func (b *bundledDHT) SearchValue(ctx context.Context, s string, option ...routing.Option) (<-chan []byte, error) {
return b.getDHT().SearchValue(ctx, s, option...)
}

func (b *bundledDHT) Bootstrap(ctx context.Context) error {
return b.standard.Bootstrap(ctx)
}
Loading

0 comments on commit 692345c

Please sign in to comment.