Skip to content

Commit

Permalink
refactor: reimplement Put, Get and Exists (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Sep 15, 2024
1 parent 19b1135 commit caf02dd
Show file tree
Hide file tree
Showing 10 changed files with 604 additions and 452 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Go-KV
# Go-KV

[![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/Tochemey/gokv/build.yml)]((https://github.com/Tochemey/gokv/actions/workflows/build.yml))

Expand All @@ -23,7 +23,6 @@ go get github.com/tochemey/gokv
- Built-in [Client](./cluster/client.go) to interact with the distributed store via the following apis:
- `Put`: create key/value pair that is eventually distributed in the cluster of nodes. The `key` is a string and the `value` is a byte array.
- `Get`: retrieves the value of a given `key` from the cluster of nodes.
- `Delete`: delete a given `key` from the cluster. At the moment the `key` is marked to be `deleted`.
- `Exists`: check the existence of a given `key` in the cluster.

## Builtin Discovery
Expand Down
39 changes: 19 additions & 20 deletions cluster/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,7 @@ type Client struct {
// http client
httpClient *nethttp.Client
// host defines the host discoveryAddress
host string
// port defines the gRCP port for client connections
port int
kvService internalpbconnect.KVServiceClient

connected *atomic.Bool
}

Expand All @@ -54,10 +50,11 @@ func (client *Client) Put(ctx context.Context, key string, value []byte) error {
if !client.connected.Load() {
return ErrClientNotConnected
}
_, err := client.kvService.Put(ctx, connect.NewRequest(&internalpb.PutRequest{
Key: key,
Value: value,
}))
_, err := client.kvService.Put(ctx, connect.NewRequest(
&internalpb.PutRequest{
Key: key,
Value: value,
}))
return err
}

Expand All @@ -66,9 +63,10 @@ func (client *Client) Get(ctx context.Context, key string) ([]byte, error) {
if !client.connected.Load() {
return nil, ErrClientNotConnected
}
response, err := client.kvService.Get(ctx, connect.NewRequest(&internalpb.GetRequest{
Key: key,
}))
response, err := client.kvService.Get(ctx, connect.NewRequest(
&internalpb.GetRequest{
Key: key,
}))

if err != nil {
code := connect.CodeOf(err)
Expand All @@ -78,17 +76,19 @@ func (client *Client) Get(ctx context.Context, key string) ([]byte, error) {
return nil, err
}

return response.Msg.GetKv().GetValue(), nil
return response.Msg.GetValue(), nil
}

// Delete deletes a given key from the cluster
// nolint
func (client *Client) Delete(ctx context.Context, key string) error {
if !client.connected.Load() {
return ErrClientNotConnected
}
_, err := client.kvService.Delete(ctx, connect.NewRequest(&internalpb.DeleteRequest{
Key: key,
}))
_, err := client.kvService.Delete(ctx, connect.NewRequest(
&internalpb.DeleteRequest{
Key: key,
}))

return err
}
Expand All @@ -99,9 +99,10 @@ func (client *Client) Exists(ctx context.Context, key string) (bool, error) {
return false, ErrClientNotConnected
}

response, err := client.kvService.KeyExists(ctx, connect.NewRequest(&internalpb.KeyExistsRequest{
Key: key,
}))
response, err := client.kvService.KeyExists(ctx, connect.NewRequest(
&internalpb.KeyExistsRequest{
Key: key,
}))

if err != nil {
return false, err
Expand Down Expand Up @@ -132,8 +133,6 @@ func newClient(host string, port int) *Client {
return &Client{
httpClient: httpClient,
kvService: kvService,
host: host,
port: port,
connected: atomic.NewBool(true),
}
}
8 changes: 4 additions & 4 deletions cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ type Config struct {
// specifies the host
// This is the ip address of the running node.
host string
// specifies the state sync interval
// This is the interval between complete state syncs.
// Complete state syncs are done with a single node over TCP and are
// specifies the delegate sync interval
// This is the interval between complete delegate syncs.
// Complete delegate syncs are done with a single node over TCP and are
// quite expensive relative to standard gossiped messages.
// Setting this interval lower (more frequent) will increase convergence
// speeds across larger clusters at the expense of increased bandwidth usage.
Expand Down Expand Up @@ -127,7 +127,7 @@ func (config *Config) WithHost(host string) *Config {
return config
}

// WithStateSyncInterval sets the state sync interval
// WithStateSyncInterval sets the delegate sync interval
func (config *Config) WithStateSyncInterval(interval time.Duration) *Config {
config.stateSyncInterval = interval
return config
Expand Down
262 changes: 262 additions & 0 deletions cluster/delegate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
/*
* MIT License
*
* Copyright (c) 2024 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package cluster

import (
"maps"
"sync"

"github.com/hashicorp/memberlist"
"google.golang.org/protobuf/proto"

"github.com/tochemey/gokv/internal/internalpb"
"github.com/tochemey/gokv/internal/lib"
)

// Delegate defines a node delegate
type Delegate struct {
*sync.RWMutex
// node metadata shared in the cluster
// for instance the IP discoveryAddress of the node, the name of the node
// relevant information that can be known by the other peers in the cluster
nodeMeta *internalpb.NodeMeta
// holds the peers state
fsm *internalpb.FSM

me string
}

// enforce compilation error
var _ memberlist.Delegate = (*Delegate)(nil)

// NodeMeta is used to retrieve meta-data about the current node
// when broadcasting an alive message. It's length is limited to
// the given byte size. This metadata is available in the Node structure.
// nolint
func (d *Delegate) NodeMeta(limit int) []byte {
d.Lock()
// no need to check the error
bytea, _ := proto.Marshal(d.nodeMeta)
d.Unlock()
return bytea
}

// NotifyMsg is called when a user-data message is received.
// Care should be taken that this method does not block, since doing
// so would block the entire UDP packet receive loop. Additionally, the byte
// slice may be modified after the call returns, so it should be copied if needed
// nolint
func (d *Delegate) NotifyMsg(bytes []byte) {
// push/pull sync all the way
}

// GetBroadcasts is called when user data messages can be broadcast.
// It can return a list of buffers to send. Each buffer should assume an
// overhead as provided with a limit on the total byte size allowed.
// The total byte size of the resulting data to send must not exceed
// the limit. Care should be taken that this method does not block,
// since doing so would block the entire UDP packet receive loop.
// nolint
func (d *Delegate) GetBroadcasts(overhead, limit int) [][]byte {
// nothing to broadcast
return nil
}

// LocalState is used for a TCP Push/Pull. This is sent to
// the remote side in addition to the membership information. Any
// data can be sent here. See MergeRemoteState as well. The `join`
// boolean indicates this is for a join instead of a push/pull.
// nolint
func (d *Delegate) LocalState(join bool) []byte {
d.Lock()
bytea, _ := proto.Marshal(d.fsm)
d.Unlock()
return bytea
}

// MergeRemoteState is invoked after a TCP Push/Pull. This is the
// delegate received from the remote side and is the result of the
// remote side's LocalState call. The 'join'
// boolean indicates this is for a join instead of a push/pull.
// nolint
func (d *Delegate) MergeRemoteState(buf []byte, join bool) {
d.Lock()
defer d.Unlock()

remoteFSM := new(internalpb.FSM)
_ = proto.Unmarshal(buf, remoteFSM)
localFSM := d.fsm

// build the map of all entries in the node local state
entries := make(map[string]map[string]*internalpb.Entry)
for _, nodeState := range localFSM.GetNodeStates() {
entries[nodeState.GetNodeId()] = nodeState.GetEntries()
}

// iterate all the entries coming from the remote node
// 1. if there is corresponding node ID in the node local state, combine the local state entries for that nodeID with the remote node entries
// 2. if there is no corresponding node ID in the node local state, set the entries with the remote entries
for _, nodeState := range remoteFSM.GetNodeStates() {
localEntries, ok := entries[nodeState.GetNodeId()]
if !ok {
entries[nodeState.GetNodeId()] = nodeState.GetEntries()
continue
}

if len(localEntries) == 0 {
localEntries = make(map[string]*internalpb.Entry)
}

maps.Copy(localEntries, nodeState.GetEntries())
entries[nodeState.GetNodeId()] = localEntries
}

// iterate the entries and build the new nodeState list
nodeStates := make([]*internalpb.NodeState, 0, len(entries))
for k, v := range entries {
nodeStates = append(nodeStates, &internalpb.NodeState{
NodeId: k,
Entries: v,
})
}

// set the local node state with the new nodeState list
d.fsm.NodeStates = nodeStates
}

// Put adds the key/value to the store
func (d *Delegate) Put(key string, value []byte) {
d.Lock()
defer d.Unlock()

localState := d.fsm
keyExists := false

// first check the key existence and overwrite when found
for _, nodeState := range localState.GetNodeStates() {
for k := range nodeState.GetEntries() {
if k == key {
nodeState.Entries[k] = &internalpb.Entry{
Value: value,
Archived: nil,
}
keyExists = true
break
}
}
}

// key has been found which means it has been overwritten
// just return
if keyExists {
return
}

// key does not exist which means the given node is adding it as
// part of its key/value pair
for _, nodeState := range localState.GetNodeStates() {
if nodeState.GetNodeId() == d.me {
if len(nodeState.GetEntries()) == 0 {
nodeState.Entries = map[string]*internalpb.Entry{
key: {
Value: value,
Archived: nil,
},
}
return
}

nodeState.GetEntries()[key] = &internalpb.Entry{
Value: value,
Archived: nil,
}
return
}
}
}

// Get returns the value of the given key
func (d *Delegate) Get(key string) []byte {
d.RLock()
defer d.RUnlock()
localState := d.fsm
for _, nodeState := range localState.GetNodeStates() {
for k, entry := range nodeState.GetEntries() {
if k == key {
return entry.GetValue()
}
}
}
return nil
}

// Delete deletes the given key from the cluster
// One can only delete a key if the given node is the owner
func (d *Delegate) Delete(key string) {
d.Lock()
defer d.Unlock()

for index, nodeState := range d.fsm.GetNodeStates() {
for k := range nodeState.GetEntries() {
if k == key && nodeState.GetNodeId() == d.me {
nodeState.Entries[key].Archived = lib.Ptr(true)
d.fsm.NodeStates[index] = nodeState
return
}
}
}
}

// Exists checks whether a given exists
func (d *Delegate) Exists(key string) bool {
d.RLock()
defer d.RUnlock()
localState := d.fsm
for _, nodeState := range localState.GetNodeStates() {
for k, entry := range nodeState.GetEntries() {
if k == key {
return !entry.GetArchived() && len(entry.GetValue()) > 0
}
}
}
return false
}

// newDelegate creates a new Delegate
func newDelegate(name string, meta *internalpb.NodeMeta) *Delegate {
return &Delegate{
RWMutex: &sync.RWMutex{},
nodeMeta: meta,
me: name,
fsm: &internalpb.FSM{
NodeStates: []*internalpb.NodeState{
{
NodeId: name,
Entries: make(map[string]*internalpb.Entry, 10),
},
},
},
}
}
Loading

0 comments on commit caf02dd

Please sign in to comment.