-
Notifications
You must be signed in to change notification settings - Fork 36
/
Copy pathp2pclient.go
131 lines (107 loc) · 3.19 KB
/
p2pclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package p2pclient
import (
"errors"
"sync"
"github.com/libp2p/go-libp2p/core/peer"
ggio "github.com/gogo/protobuf/io"
logging "github.com/ipfs/go-log"
pb "github.com/libp2p/go-libp2p-daemon/pb"
multiaddr "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
var log = logging.Logger("p2pclient")
// MessageSizeMax is cribbed from github.com/libp2p/go-libp2p-net
const MessageSizeMax = 1 << 22 // 4 MB
// Client is the struct that manages a connection to a libp2p daemon.
type Client struct {
controlMaddr multiaddr.Multiaddr
listenMaddr multiaddr.Multiaddr
listener manet.Listener
mhandlers sync.Mutex
handlers map[string]StreamHandlerFunc
}
// NewClient creates a new libp2p daemon client, connecting to a daemon
// listening on a multi-addr at controlMaddr, and establishing an inbound
// listening multi-address at listenMaddr
func NewClient(controlMaddr, listenMaddr multiaddr.Multiaddr) (*Client, error) {
client := &Client{
controlMaddr: controlMaddr,
handlers: make(map[string]StreamHandlerFunc),
}
if err := client.listen(listenMaddr); err != nil {
return nil, err
}
return client, nil
}
func (c *Client) newControlConn() (manet.Conn, error) {
return manet.Dial(c.controlMaddr)
}
// Identify queries the daemon for its peer ID and listen addresses.
func (c *Client) Identify() (peer.ID, []multiaddr.Multiaddr, error) {
control, err := c.newControlConn()
if err != nil {
return peer.ID(""), nil, err
}
defer control.Close()
r := ggio.NewDelimitedReader(control, MessageSizeMax)
w := ggio.NewDelimitedWriter(control)
req := &pb.Request{Type: pb.Request_IDENTIFY.Enum()}
if err = w.WriteMsg(req); err != nil {
return peer.ID(""), nil, err
}
res := &pb.Response{}
if err = r.ReadMsg(res); err != nil {
return peer.ID(""), nil, err
}
if reserr := res.GetError(); reserr != nil {
return peer.ID(""), nil, errors.New(reserr.GetMsg())
}
idres := res.GetIdentify()
id, err := peer.IDFromBytes(idres.Id)
if err != nil {
return peer.ID(""), nil, err
}
addrs := make([]multiaddr.Multiaddr, 0, len(idres.Addrs))
for i, addrbytes := range idres.Addrs {
addr, err := multiaddr.NewMultiaddrBytes(addrbytes)
if err != nil {
log.Errorf("failed to parse multiaddr in position %d in response to identify request", i)
continue
}
addrs = append(addrs, addr)
}
return id, addrs, nil
}
// Connect establishes a connection to a peer after populating the Peerstore
// entry for said peer with a list of addresses.
func (c *Client) Connect(p peer.ID, addrs []multiaddr.Multiaddr) error {
control, err := c.newControlConn()
if err != nil {
return err
}
defer control.Close()
r := ggio.NewDelimitedReader(control, MessageSizeMax)
w := ggio.NewDelimitedWriter(control)
addrbytes := make([][]byte, len(addrs))
for i, addr := range addrs {
addrbytes[i] = addr.Bytes()
}
req := &pb.Request{
Type: pb.Request_CONNECT.Enum(),
Connect: &pb.ConnectRequest{
Peer: []byte(p),
Addrs: addrbytes,
},
}
if err := w.WriteMsg(req); err != nil {
return err
}
res := &pb.Response{}
if err := r.ReadMsg(res); err != nil {
return err
}
if err := res.GetError(); err != nil {
return errors.New(err.GetMsg())
}
return nil
}