From 692345c33f1f4734b7ad8af81cc399e7f6c8817b Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Mon, 20 Nov 2023 14:09:26 +0100 Subject: [PATCH] feat: proxy all delegated routing --- go.mod | 2 +- server.go | 236 ++----------------------------- server_dht.go | 80 +++++++++++ server_routers.go | 349 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 442 insertions(+), 225 deletions(-) create mode 100644 server_dht.go create mode 100644 server_routers.go diff --git a/go.mod b/go.mod index a859958..15382d7 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/libp2p/go-libp2p v0.32.0 github.com/libp2p/go-libp2p-kad-dht v0.23.0 github.com/libp2p/go-libp2p-record v0.2.0 - github.com/libp2p/go-libp2p-routing-helpers v0.7.0 github.com/multiformats/go-multibase v0.2.0 github.com/urfave/cli/v2 v2.25.7 ) @@ -57,6 +56,7 @@ require ( github.com/libp2p/go-flow-metrics v0.1.0 // indirect github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect github.com/libp2p/go-libp2p-kbucket v0.5.0 // indirect + github.com/libp2p/go-libp2p-routing-helpers v0.7.0 // indirect github.com/libp2p/go-libp2p-xor v0.1.0 // indirect github.com/libp2p/go-msgio v0.3.0 // indirect github.com/libp2p/go-nat v0.2.0 // indirect diff --git a/server.go b/server.go index 0363452..a839717 100644 --- a/server.go +++ b/server.go @@ -5,22 +5,12 @@ import ( "log" "net/http" "strconv" - "time" - "github.com/ipfs/boxo/ipns" "github.com/ipfs/boxo/routing/http/client" - "github.com/ipfs/boxo/routing/http/contentrouter" "github.com/ipfs/boxo/routing/http/server" - "github.com/ipfs/boxo/routing/http/types" - "github.com/ipfs/boxo/routing/http/types/iter" - "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p" dht "github.com/libp2p/go-libp2p-kad-dht" - "github.com/libp2p/go-libp2p-kad-dht/fullrt" - record "github.com/libp2p/go-libp2p-record" - routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" ) @@ -33,7 +23,7 @@ func start(ctx context.Context, port int, runAcceleratedDHTClient bool, contentE var dhtRouting routing.Routing if runAcceleratedDHTClient { - wrappedDHT, err := newWrappedStandardAndAcceleratedDHTClient(ctx, h) + wrappedDHT, err := newBundledDHT(ctx, h) if err != nil { return err } @@ -61,16 +51,14 @@ func start(ctx context.Context, port int, runAcceleratedDHTClient bool, contentE return err } - proxy := &delegatedRoutingProxy{ - cr: crRouters, - pr: prRouters, - vs: ipnsRouters, - } - log.Printf("Listening on http://0.0.0.0:%d", port) log.Printf("Delegated Routing API on http://127.0.0.1:%d/routing/v1", port) - http.Handle("/", server.Handler(proxy)) + http.Handle("/", server.Handler(&composableRouter{ + providers: crRouters, + peers: prRouters, + ipns: ipnsRouters, + })) return http.ListenAndServe(":"+strconv.Itoa(port), nil) } @@ -108,222 +96,22 @@ func newHost(highOutboundLimits bool) (host.Host, error) { return h, nil } -type wrappedStandardAndAcceleratedDHTClient struct { - standard *dht.IpfsDHT - accelerated *fullrt.FullRT -} - -func newWrappedStandardAndAcceleratedDHTClient(ctx context.Context, h host.Host) (routing.Routing, error) { - standardDHT, err := dht.New(ctx, h, dht.Mode(dht.ModeClient), dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...)) - if err != nil { - return nil, err - } - - acceleratedDHT, err := fullrt.NewFullRT(h, "/ipfs", - fullrt.DHTOption( - dht.BucketSize(20), - dht.Validator(record.NamespacedValidator{ - "pk": record.PublicKeyValidator{}, - "ipns": ipns.Validator{}, - }), - dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...), - dht.Mode(dht.ModeClient), - )) - if err != nil { - return nil, err - } - - return &wrappedStandardAndAcceleratedDHTClient{ - standard: standardDHT, - accelerated: acceleratedDHT, - }, nil -} - -func (w *wrappedStandardAndAcceleratedDHTClient) Provide(ctx context.Context, c cid.Cid, b bool) error { - if w.accelerated.Ready() { - return w.accelerated.Provide(ctx, c, b) - } - return w.standard.Provide(ctx, c, b) -} - -func (w *wrappedStandardAndAcceleratedDHTClient) FindProvidersAsync(ctx context.Context, c cid.Cid, i int) <-chan peer.AddrInfo { - if w.accelerated.Ready() { - return w.accelerated.FindProvidersAsync(ctx, c, i) - } - return w.standard.FindProvidersAsync(ctx, c, i) -} - -func (w *wrappedStandardAndAcceleratedDHTClient) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) { - if w.accelerated.Ready() { - return w.accelerated.FindPeer(ctx, p) - } - return w.standard.FindPeer(ctx, p) -} - -func (w *wrappedStandardAndAcceleratedDHTClient) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) error { - if w.accelerated.Ready() { - return w.accelerated.PutValue(ctx, key, value, opts...) - } - return w.standard.PutValue(ctx, key, value, opts...) -} - -func (w *wrappedStandardAndAcceleratedDHTClient) GetValue(ctx context.Context, s string, opts ...routing.Option) ([]byte, error) { - if w.accelerated.Ready() { - return w.accelerated.GetValue(ctx, s, opts...) - } - return w.standard.GetValue(ctx, s, opts...) -} - -func (w *wrappedStandardAndAcceleratedDHTClient) SearchValue(ctx context.Context, s string, opts ...routing.Option) (<-chan []byte, error) { - if w.accelerated.Ready() { - return w.accelerated.SearchValue(ctx, s, opts...) - } - return w.standard.SearchValue(ctx, s, opts...) -} - -func (w *wrappedStandardAndAcceleratedDHTClient) Bootstrap(ctx context.Context) error { - return w.standard.Bootstrap(ctx) -} - -func getCombinedRouting(endpoints []string, dht routing.Routing) (routing.Routing, error) { +func getCombinedRouting(endpoints []string, dht routing.Routing) (server.ContentRouter, error) { if len(endpoints) == 0 { - return dht, nil + return dhtRouter{dht: dht}, nil } - var routers []routing.Routing + var routers []server.ContentRouter for _, endpoint := range endpoints { drclient, err := client.New(endpoint) if err != nil { return nil, err } - routers = append(routers, newWrappedDelegatedRouting(drclient)) + routers = append(routers, wrappedClient{Client: drclient}) } - return routinghelpers.Parallel{ - Routers: append(routers, dht), + return parallelRouter{ + routers: append(routers, dhtRouter{dht: dht}), }, nil } - -type wrappedDelegatedRouting struct { - routing.ValueStore - routing.PeerRouting - routing.ContentRouting -} - -func newWrappedDelegatedRouting(drc *client.Client) routing.Routing { - v := contentrouter.NewContentRoutingClient(drc) - - return &wrappedDelegatedRouting{ - ValueStore: v, - PeerRouting: v, - ContentRouting: v, - } -} - -func (c *wrappedDelegatedRouting) Bootstrap(ctx context.Context) error { - return routing.ErrNotSupported -} - -type delegatedRoutingProxy struct { - cr routing.ContentRouting - pr routing.PeerRouting - vs routing.ValueStore -} - -func (d *delegatedRoutingProxy) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) { - ctx, cancel := context.WithCancel(ctx) - ch := d.cr.FindProvidersAsync(ctx, key, limit) - return iter.ToResultIter[types.Record](&peerChanIter{ - ch: ch, - cancel: cancel, - }), nil -} - -//lint:ignore SA1019 // ignore staticcheck -func (d *delegatedRoutingProxy) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) { - return 0, routing.ErrNotSupported -} - -func (d *delegatedRoutingProxy) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - addr, err := d.pr.FindPeer(ctx, pid) - if err != nil { - return nil, err - } - - rec := &types.PeerRecord{ - Schema: types.SchemaPeer, - ID: &addr.ID, - } - - for _, addr := range addr.Addrs { - rec.Addrs = append(rec.Addrs, types.Multiaddr{Multiaddr: addr}) - } - - return iter.ToResultIter[*types.PeerRecord](iter.FromSlice[*types.PeerRecord]([]*types.PeerRecord{rec})), nil -} - -func (d *delegatedRoutingProxy) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - raw, err := d.vs.GetValue(ctx, string(name.RoutingKey())) - if err != nil { - return nil, err - } - - return ipns.UnmarshalRecord(raw) -} - -func (d *delegatedRoutingProxy) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - raw, err := ipns.MarshalRecord(record) - if err != nil { - return err - } - - return d.vs.PutValue(ctx, string(name.RoutingKey()), raw) -} - -type peerChanIter struct { - ch <-chan peer.AddrInfo - cancel context.CancelFunc - next *peer.AddrInfo -} - -func (it *peerChanIter) Next() bool { - addr, ok := <-it.ch - if ok { - it.next = &addr - return true - } - it.next = nil - return false -} - -func (it *peerChanIter) Val() types.Record { - if it.next == nil { - return nil - } - - rec := &types.PeerRecord{ - Schema: types.SchemaPeer, - ID: &it.next.ID, - } - - for _, addr := range it.next.Addrs { - rec.Addrs = append(rec.Addrs, types.Multiaddr{Multiaddr: addr}) - } - - return rec -} - -func (it *peerChanIter) Close() error { - it.cancel() - return nil -} diff --git a/server_dht.go b/server_dht.go new file mode 100644 index 0000000..70cf291 --- /dev/null +++ b/server_dht.go @@ -0,0 +1,80 @@ +package main + +import ( + "context" + + "github.com/ipfs/boxo/ipns" + "github.com/ipfs/go-cid" + dht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/libp2p/go-libp2p-kad-dht/fullrt" + record "github.com/libp2p/go-libp2p-record" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/routing" +) + +type bundledDHT struct { + standard *dht.IpfsDHT + fullRT *fullrt.FullRT +} + +func newBundledDHT(ctx context.Context, h host.Host) (routing.Routing, error) { + standardDHT, err := dht.New(ctx, h, dht.Mode(dht.ModeClient), dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...)) + if err != nil { + return nil, err + } + + fullRT, err := fullrt.NewFullRT(h, "/ipfs", + fullrt.DHTOption( + dht.BucketSize(20), + dht.Validator(record.NamespacedValidator{ + "pk": record.PublicKeyValidator{}, + "ipns": ipns.Validator{}, + }), + dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...), + dht.Mode(dht.ModeClient), + )) + if err != nil { + return nil, err + } + + return &bundledDHT{ + standard: standardDHT, + fullRT: fullRT, + }, nil +} + +func (b *bundledDHT) getDHT() routing.Routing { + if b.fullRT.Ready() { + return b.fullRT + } + return b.standard +} + +func (b *bundledDHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error { + return b.getDHT().Provide(ctx, c, brdcst) +} + +func (b *bundledDHT) FindProvidersAsync(ctx context.Context, c cid.Cid, i int) <-chan peer.AddrInfo { + return b.getDHT().FindProvidersAsync(ctx, c, i) +} + +func (b *bundledDHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { + return b.getDHT().FindPeer(ctx, id) +} + +func (b *bundledDHT) PutValue(ctx context.Context, k string, v []byte, option ...routing.Option) error { + return b.getDHT().PutValue(ctx, k, v, option...) +} + +func (b *bundledDHT) GetValue(ctx context.Context, s string, option ...routing.Option) ([]byte, error) { + return b.getDHT().GetValue(ctx, s, option...) +} + +func (b *bundledDHT) SearchValue(ctx context.Context, s string, option ...routing.Option) (<-chan []byte, error) { + return b.getDHT().SearchValue(ctx, s, option...) +} + +func (b *bundledDHT) Bootstrap(ctx context.Context) error { + return b.standard.Bootstrap(ctx) +} diff --git a/server_routers.go b/server_routers.go new file mode 100644 index 0000000..9285063 --- /dev/null +++ b/server_routers.go @@ -0,0 +1,349 @@ +package main + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/ipfs/boxo/ipns" + "github.com/ipfs/boxo/routing/http/client" + "github.com/ipfs/boxo/routing/http/server" + "github.com/ipfs/boxo/routing/http/types" + "github.com/ipfs/boxo/routing/http/types/iter" + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/routing" +) + +var _ server.ContentRouter = composableRouter{} + +type composableRouter struct { + providers server.ContentRouter + peers server.ContentRouter + ipns server.ContentRouter +} + +func (r composableRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) { + return r.providers.FindProviders(ctx, key, limit) +} + +//lint:ignore SA1019 // ignore staticcheck +func (r composableRouter) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) { + return 0, routing.ErrNotSupported +} + +func (r composableRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { + return r.providers.FindPeers(ctx, pid, limit) +} + +func (r composableRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { + return r.ipns.GetIPNS(ctx, name) +} + +func (r composableRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error { + return r.ipns.PutIPNS(ctx, name, record) +} + +var _ server.ContentRouter = parallelRouter{} + +type parallelRouter struct { + routers []server.ContentRouter +} + +func (r parallelRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) { + switch len(r.routers) { + case 0: + return iter.ToResultIter(iter.FromSlice([]types.Record{})), nil + case 1: + return r.routers[0].FindProviders(ctx, key, limit) + } + + its := make([]iter.ResultIter[types.Record], len(r.routers)) + for i, ri := range r.routers { + var err error + its[i], err = ri.FindProviders(ctx, key, limit) + if err != nil { + for _, it := range its { + if it != nil { + _ = it.Close() + } + } + return nil, err + } + } + return &manyIter[types.Record]{it: its}, nil +} + +type manyIter[T any] struct { + it []iter.ResultIter[T] + next int +} + +func (mi *manyIter[T]) Next() bool { + for i, it := range mi.it { + if it.Next() { + mi.next = i + return true + } + } + + mi.next = -1 + return false +} + +func (mi *manyIter[T]) Val() iter.Result[T] { + if mi.next == -1 { + return iter.Result[T]{Err: errors.New("no next value")} + } + return mi.it[mi.next].Val() +} + +func (mi *manyIter[T]) Close() error { + var err error + for _, it := range mi.it { + err = errors.Join(err, it.Close()) + } + return err +} + +//lint:ignore SA1019 // ignore staticcheck +func (r parallelRouter) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) { + return 0, routing.ErrNotSupported +} + +func (r parallelRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { + switch len(r.routers) { + case 0: + return iter.ToResultIter(iter.FromSlice([]*types.PeerRecord{})), nil + case 1: + return r.routers[0].FindPeers(ctx, pid, limit) + } + + its := make([]iter.ResultIter[*types.PeerRecord], len(r.routers)) + for i, ri := range r.routers { + var err error + its[i], err = ri.FindPeers(ctx, pid, limit) + if err != nil { + for _, it := range its { + if it != nil { + _ = it.Close() + } + } + return nil, err + } + } + return &manyIter[*types.PeerRecord]{it: its}, nil +} + +func (r parallelRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { + switch len(r.routers) { + case 0: + return nil, routing.ErrNotFound + case 1: + return r.routers[0].GetIPNS(ctx, name) + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + results := make(chan struct { + val *ipns.Record + err error + }) + for _, ri := range r.routers { + go func(ri server.ContentRouter) { + value, err := ri.GetIPNS(ctx, name) + select { + case results <- struct { + val *ipns.Record + err error + }{ + val: value, + err: err, + }: + case <-ctx.Done(): + } + }(ri) + } + + var errs error + + for range r.routers { + select { + case res := <-results: + switch res.err { + case nil: + return res.val, nil + case routing.ErrNotFound, routing.ErrNotSupported: + continue + } + // If the context has expired, just return that error + // and ignore the other errors. + if ctx.Err() != nil { + return nil, ctx.Err() + } + + errs = errors.Join(errs, res.err) + case <-ctx.Done(): + return nil, ctx.Err() + } + } + + if errs == nil { + return nil, routing.ErrNotFound + } + + return nil, errs +} + +func (r parallelRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error { + switch len(r.routers) { + case 0: + return nil + case 1: + return r.routers[0].PutIPNS(ctx, name, record) + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var wg sync.WaitGroup + results := make([]error, len(r.routers)) + wg.Add(len(r.routers)) + for i, ri := range r.routers { + go func(ri server.ContentRouter, i int) { + results[i] = ri.PutIPNS(ctx, name, record) + wg.Done() + }(ri, i) + } + wg.Wait() + + var errs error + for _, err := range results { + errs = errors.Join(errs, err) + } + return errs +} + +var _ server.ContentRouter = dhtRouter{} + +type dhtRouter struct { + dht routing.Routing +} + +func (d dhtRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) { + ctx, cancel := context.WithCancel(ctx) + ch := d.dht.FindProvidersAsync(ctx, key, limit) + return iter.ToResultIter[types.Record](&peerChanIter{ + ch: ch, + cancel: cancel, + }), nil +} + +func (d dhtRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + addr, err := d.dht.FindPeer(ctx, pid) + if err != nil { + return nil, err + } + + rec := &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &addr.ID, + } + + for _, addr := range addr.Addrs { + rec.Addrs = append(rec.Addrs, types.Multiaddr{Multiaddr: addr}) + } + + return iter.ToResultIter[*types.PeerRecord](iter.FromSlice[*types.PeerRecord]([]*types.PeerRecord{rec})), nil +} + +func (d dhtRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + raw, err := d.dht.GetValue(ctx, string(name.RoutingKey())) + if err != nil { + return nil, err + } + + return ipns.UnmarshalRecord(raw) +} + +func (d dhtRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + raw, err := ipns.MarshalRecord(record) + if err != nil { + return err + } + + return d.dht.PutValue(ctx, string(name.RoutingKey()), raw) +} + +//lint:ignore SA1019 // ignore staticcheck +func (d dhtRouter) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) { + return 0, routing.ErrNotSupported +} + +type peerChanIter struct { + ch <-chan peer.AddrInfo + cancel context.CancelFunc + next *peer.AddrInfo +} + +func (it *peerChanIter) Next() bool { + addr, ok := <-it.ch + if ok { + it.next = &addr + return true + } + it.next = nil + return false +} + +func (it *peerChanIter) Val() types.Record { + if it.next == nil { + return nil + } + + rec := &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &it.next.ID, + } + + for _, addr := range it.next.Addrs { + rec.Addrs = append(rec.Addrs, types.Multiaddr{Multiaddr: addr}) + } + + return rec +} + +func (it *peerChanIter) Close() error { + it.cancel() + return nil +} + +var _ server.ContentRouter = wrappedClient{} + +type wrappedClient struct { + *client.Client +} + +func (d wrappedClient) FindProviders(ctx context.Context, cid cid.Cid, limit int) (iter.ResultIter[types.Record], error) { + return d.Client.FindProviders(ctx, cid) +} + +func (d wrappedClient) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { + return d.Client.FindPeers(ctx, pid) +} + +//lint:ignore SA1019 // ignore staticcheck +func (d wrappedClient) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) { + return 0, routing.ErrNotSupported +}