diff --git a/README.md b/README.md index 0ee5233..50f8ed9 100644 --- a/README.md +++ b/README.md @@ -15,8 +15,8 @@ go get github.com/tochemey/gokv ### Features - Go-KV uses the push-pull anti-entropy method to replicate nodes' state across the cluster. This approach makes Go-KV eventually consistent. - One can set the [`stateSyncInterval`](./cluster/config.go) value to low for frequent state synchronisation at a network cost. -- Discovery API to implement custom nodes discovery provider. See: [Discovery API](./discovery/provider.go) + One can set the [`stateSyncInterval`](./cluster/config.go) value to low for frequent state synchronisation at a network cost. +- Discovery API to implement custom nodes discovery provider. See: [discovery](./discovery/provider.go) - Comes bundled with some discovery providers that can help you hit the ground running: - [kubernetes](https://kubernetes.io/docs/home/) [api integration](./discovery/kubernetes) is fully functional - [nats](https://nats.io/) [integration](./discovery/nats) is fully functional diff --git a/cluster/client.go b/cluster/client.go index c89ab6f..18a5941 100644 --- a/cluster/client.go +++ b/cluster/client.go @@ -39,6 +39,8 @@ import ( ) // Client defines the cluster client +// This client can connect to any Go-KV cluster node and retrieve data from other +// of the cluster. type Client struct { // http client httpClient *nethttp.Client @@ -150,8 +152,8 @@ func (client *Client) Exists(ctx context.Context, key string) (bool, error) { return response.Msg.GetExists(), nil } -// close closes the client connection to the cluster -func (client *Client) close() error { +// Close closes the client connection to the cluster +func (client *Client) Close() error { // no-op when the client is not connected if !client.connected.Load() { return nil @@ -161,8 +163,9 @@ func (client *Client) close() error { return nil } -// newClient creates an instance of the cluster Client -func newClient(host string, port int) *Client { +// NewClient creates an instance of the cluster Client +// host and port are a Go-KV cluster node host and port +func NewClient(host string, port int) *Client { httpClient := http.NewClient() kvService := internalpbconnect.NewKVServiceClient( httpClient, diff --git a/cluster/node.go b/cluster/node.go index f3bd0c7..bfc5589 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -136,7 +136,7 @@ func (node *Node) Start(ctx context.Context) error { node.memberConfig.Events = &memberlist.ChannelEventDelegate{ Ch: eventsCh, } - node.clusterClient = newClient(node.config.host, int(node.config.port)) + node.clusterClient = NewClient(node.config.host, int(node.config.port)) node.started.Store(true) node.mu.Unlock() @@ -167,7 +167,7 @@ func (node *Node) Stop(ctx context.Context) error { if err := errorschain. New(errorschain.ReturnFirst()). - AddError(node.clusterClient.close()). + AddError(node.clusterClient.Close()). AddError(node.memberlist.Leave(node.config.shutdownTimeout)). AddError(node.config.provider.Deregister()). AddError(node.config.provider.Close()). @@ -266,8 +266,8 @@ func (node *Node) Events() <-chan *Event { return ch } -// DiscoveryAddress returns the node discoveryAddress -func (node *Node) DiscoveryAddress() string { +// HostPort returns the node host:port address +func (node *Node) HostPort() string { node.mu.Lock() address := node.discoveryAddress node.mu.Unlock() @@ -285,7 +285,7 @@ func (node *Node) Peers() ([]*Member, error) { if err != nil { return nil, err } - if member != nil && member.DiscoveryAddress() != node.DiscoveryAddress() { + if member != nil && member.DiscoveryAddress() != node.HostPort() { members = append(members, member) } } @@ -359,7 +359,7 @@ func (node *Node) eventsListener(eventsCh chan memberlist.NodeEvent) { // skip this node if event.Node == nil { addr := net.JoinHostPort(event.Node.Addr.String(), strconv.Itoa(int(event.Node.Port))) - if addr == node.DiscoveryAddress() { + if addr == node.HostPort() { continue } } diff --git a/cluster/node_test.go b/cluster/node_test.go index 0417236..6cecbb4 100644 --- a/cluster/node_test.go +++ b/cluster/node_test.go @@ -143,11 +143,11 @@ L: require.NotNil(t, event) require.True(t, event.Type == NodeJoined) actualAddr := event.Member.DiscoveryAddress() - require.Equal(t, node2.DiscoveryAddress(), actualAddr) + require.Equal(t, node2.HostPort(), actualAddr) peers, err := node1.Peers() require.NoError(t, err) require.Len(t, peers, 1) - require.Equal(t, node2.DiscoveryAddress(), peers[0].DiscoveryAddress()) + require.Equal(t, node2.HostPort(), peers[0].DiscoveryAddress()) // wait for some time lib.Pause(time.Second) @@ -179,7 +179,7 @@ L2: require.NotNil(t, event) require.True(t, event.Type == NodeLeft) actualAddr = event.Member.DiscoveryAddress() - require.Equal(t, node2.DiscoveryAddress(), actualAddr) + require.Equal(t, node2.HostPort(), actualAddr) t.Cleanup(func() { assert.NoError(t, node1.Stop(ctx))