Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

do not rely on explicit healtchecks/heartbeats #17

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,3 +375,5 @@ by adding account management and it is one of the easiest way to start.
- [ ] UDP support
- [ ] Use quic-go tracer, instead of ping (and duration estimation)
- [ ] Stateless reset key for the server
- [ ] Name access restrictions for clients
- [ ] Optimize global IP restrictions - check earlier
38 changes: 0 additions & 38 deletions client/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,6 @@ func (d *Destination) runDestinationErr(ctx context.Context, stream quic.Stream)
switch {
case req.Connect != nil:
return d.runConnect(ctx, stream)
case req.Heartbeat != nil:
return d.heartbeat(ctx, stream, req.Heartbeat)
default:
err := pb.NewError(pb.Error_RequestUnknown, "unknown request: %v", req)
if err := pb.Write(stream, &pbc.Response{Error: err}); err != nil {
Expand Down Expand Up @@ -187,42 +185,6 @@ func (d *Destination) runConnect(ctx context.Context, stream quic.Stream) error
return nil
}

func (d *Destination) heartbeat(ctx context.Context, stream quic.Stream, hbt *pbc.Heartbeat) error {
if err := pb.Write(stream, &pbc.Response{Heartbeat: hbt}); err != nil {
return err
}

g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
<-ctx.Done()
stream.CancelRead(0)
return nil
})

g.Go(func() error {
for {
req, err := pbc.ReadRequest(stream)
if err != nil {
return err
}
if req.Heartbeat == nil {
respErr := pb.NewError(pb.Error_RequestUnknown, "unexpected request")
if err := pb.Write(stream, &pbc.Response{Error: respErr}); err != nil {
return kleverr.Ret(err)
}
return respErr
}

if err := pb.Write(stream, &pbc.Response{Heartbeat: req.Heartbeat}); err != nil {
return err
}
}
})

return g.Wait()
}

func (d *Destination) RunControl(ctx context.Context, conn quic.Connection) error {
return (&peerControl{
local: d.peer,
Expand Down
122 changes: 27 additions & 95 deletions client/peer_direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@ import (
"github.com/connet-dev/connet/netc"
"github.com/connet-dev/connet/notify"
"github.com/connet-dev/connet/pb"
"github.com/connet-dev/connet/pbc"
"github.com/connet-dev/connet/pbs"
"github.com/klev-dev/kleverr"
"github.com/quic-go/quic-go"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/timestamppb"
)

type directPeer struct {
Expand Down Expand Up @@ -149,7 +146,7 @@ func newDirectPeerIncoming(ctx context.Context, parent *directPeer, clientCert *
func (p *directPeerIncoming) run(ctx context.Context) {
boff := netc.MinBackoff
for {
conn, stream, err := p.connect(ctx)
conn, err := p.connect(ctx)
if err != nil {
p.parent.logger.Debug("could not connect incoming", "err", err)
switch {
Expand All @@ -171,7 +168,7 @@ func (p *directPeerIncoming) run(ctx context.Context) {
}
boff = netc.MinBackoff

if err := p.keepalive(ctx, conn, stream); err != nil {
if err := p.keepalive(ctx, conn); err != nil {
p.parent.logger.Debug("incoming keepalive failed", "err", err)
switch {
case errors.Is(err, context.Canceled):
Expand All @@ -183,70 +180,34 @@ func (p *directPeerIncoming) run(ctx context.Context) {
}
}

func (p *directPeerIncoming) connect(ctx context.Context) (quic.Connection, quic.Stream, error) {
func (p *directPeerIncoming) connect(ctx context.Context) (quic.Connection, error) {
ch, cancel := p.parent.local.direct.expect(p.parent.local.serverCert, p.clientCert)
select {
case <-ctx.Done():
cancel()
return nil, nil, ctx.Err()
return nil, ctx.Err()
case <-p.closer:
cancel()
return nil, nil, errClosed
return nil, errClosed
case conn := <-ch:
stream, err := conn.AcceptStream(ctx)
if err != nil {
return nil, nil, err
}
if err := p.heartbeat(stream); err != nil {
return nil, nil, err
}
return conn, stream, nil
return conn, nil
}
}

func (p *directPeerIncoming) keepalive(ctx context.Context, conn quic.Connection, stream quic.Stream) error {
func (p *directPeerIncoming) keepalive(ctx context.Context, conn quic.Connection) error {
defer conn.CloseWithError(quic.ApplicationErrorCode(pb.Error_DirectKeepaliveClosed), "keepalive closed")
defer stream.Close()

p.parent.local.addActiveConn(p.parent.remoteID, peerIncoming, "", conn)
defer p.parent.local.removeActiveConn(p.parent.remoteID, peerIncoming, "")

g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
case <-p.closer:
return errClosed
}
})

g.Go(func() error {
for {
if err := p.heartbeat(stream); err != nil {
return err
}
}
})

return g.Wait()
}

func (p *directPeerIncoming) heartbeat(stream quic.Stream) error {
req, err := pbc.ReadRequest(stream)
switch {
case err != nil:
return err
case req.Heartbeat == nil:
respErr := pb.NewError(pb.Error_RequestUnknown, "unexpected request")
if err := pb.Write(stream, &pbc.Response{Error: respErr}); err != nil {
return kleverr.Ret(err)
}
return respErr
select {
case <-ctx.Done():
return ctx.Err()
case <-conn.Context().Done():
return context.Cause(conn.Context())
case <-p.closer:
return errClosed
}

return pb.Write(stream, &pbc.Response{Heartbeat: req.Heartbeat})
}

type directPeerOutgoing struct {
Expand All @@ -270,7 +231,7 @@ func newDirectPeerOutgoing(ctx context.Context, parent *directPeer, serverConfg
func (p *directPeerOutgoing) run(ctx context.Context) {
boff := netc.MinBackoff
for {
conn, stream, err := p.connect(ctx)
conn, err := p.connect(ctx)
if err != nil {
p.parent.logger.Debug("could not connect direct", "err", err)
if errors.Is(err, context.Canceled) {
Expand All @@ -289,7 +250,7 @@ func (p *directPeerOutgoing) run(ctx context.Context) {
}
boff = netc.MinBackoff

if err := p.keepalive(ctx, conn, stream); err != nil {
if err := p.keepalive(ctx, conn); err != nil {
p.parent.logger.Debug("disonnected peer", "err", err)
switch {
case errors.Is(err, context.Canceled):
Expand All @@ -301,7 +262,7 @@ func (p *directPeerOutgoing) run(ctx context.Context) {
}
}

func (p *directPeerOutgoing) connect(ctx context.Context) (quic.Connection, quic.Stream, error) {
func (p *directPeerOutgoing) connect(ctx context.Context) (quic.Connection, error) {
var errs []error
for paddr := range p.addrs {
addr := net.UDPAddrFromAddrPort(paddr)
Expand All @@ -320,53 +281,24 @@ func (p *directPeerOutgoing) connect(ctx context.Context) (quic.Connection, quic
continue
}

stream, err := conn.OpenStreamSync(ctx)
if err != nil {
errs = append(errs, err)
continue
}
if err := p.heartbeat(ctx, stream); err != nil {
errs = append(errs, err)
continue
}
return conn, stream, nil
return conn, nil
}
return nil, nil, errors.Join(errs...)
return nil, errors.Join(errs...)
}

func (p *directPeerOutgoing) keepalive(ctx context.Context, conn quic.Connection, stream quic.Stream) error {
func (p *directPeerOutgoing) keepalive(ctx context.Context, conn quic.Connection) error {
defer conn.CloseWithError(quic.ApplicationErrorCode(pb.Error_DirectKeepaliveClosed), "keepalive closed")
defer stream.Close()

p.parent.local.addActiveConn(p.parent.remoteID, peerOutgoing, "", conn)
defer p.parent.local.removeActiveConn(p.parent.remoteID, peerOutgoing, "")

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-p.closer:
return errClosed
case <-time.After(10 * time.Second):
}
if err := p.heartbeat(ctx, stream); err != nil {
return err
}
}
}

func (p *directPeerOutgoing) heartbeat(_ context.Context, stream quic.Stream) error {
// TODO setDeadline as additional assurance we are not blocked
req := &pbc.Heartbeat{Time: timestamppb.Now()}
if err := pb.Write(stream, &pbc.Request{Heartbeat: req}); err != nil {
return err
}
if resp, err := pbc.ReadResponse(stream); err != nil {
return err
} else {
dur := time.Since(resp.Heartbeat.Time.AsTime())
p.parent.logger.Debug("direct heartbeat", "dur", dur)
return nil
select {
case <-ctx.Done():
return ctx.Err()
case <-conn.Context().Done():
return context.Cause(conn.Context())
case <-p.closer:
return errClosed
}
}

Expand Down
38 changes: 2 additions & 36 deletions client/peer_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@ import (

"github.com/connet-dev/connet/model"
"github.com/connet-dev/connet/netc"
"github.com/connet-dev/connet/pb"
"github.com/connet-dev/connet/pbc"
"github.com/quic-go/quic-go"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/timestamppb"
)

type relayPeer struct {
Expand Down Expand Up @@ -99,42 +96,11 @@ func (r *relayPeer) connect(ctx context.Context) (quic.Connection, error) {
}

func (r *relayPeer) keepalive(ctx context.Context, conn quic.Connection) error {
stream, err := conn.OpenStreamSync(ctx)
if err != nil {
return err
}
if err := r.heartbeat(ctx, stream); err != nil {
return err
}

r.local.addRelayConn(r.serverHostport, conn)
defer r.local.removeRelayConn(r.serverHostport)

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(10 * time.Second):
}
if err := r.heartbeat(ctx, stream); err != nil {
return err
}
}
}

func (r *relayPeer) heartbeat(_ context.Context, stream quic.Stream) error {
// TODO setDeadline as additional assurance we are not blocked
req := &pbc.Heartbeat{Time: timestamppb.Now()}
if err := pb.Write(stream, &pbc.Request{Heartbeat: req}); err != nil {
return err
}
if resp, err := pbc.ReadResponse(stream); err != nil {
return err
} else {
dur := time.Since(resp.Heartbeat.Time.AsTime())
r.logger.Debug("relay heartbeat", "dur", dur)
return nil
}
<-conn.Context().Done()
return context.Cause(conn.Context())
}

func (r *relayPeer) stop() {
Expand Down
Loading
Loading