Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor the remote state merging code #22

Merged
merged 1 commit into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
[![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/Tochemey/gokv/build.yml)]((https://github.com/Tochemey/gokv/actions/workflows/build.yml))

Simple Distributed in-memory key/value store. GoKV provides high availability and fault tolerance which makes it suitable large-scale applications system without sacrificing performance and reliability.
With GoKV, you can instantly create a fast, scalable, distributed system across a cluster of computers. Go-KV uses the push-pull anti-entropy method to replicate nodes' state across the cluster. This
approach makes Go-KV eventually consistent.
With GoKV, you can instantly create a fast, scalable, distributed system across a cluster of computers.

## Installation

Expand All @@ -14,6 +13,8 @@ go get github.com/tochemey/gokv

## Features

- Go-KV uses the push-pull anti-entropy method to replicate nodes' state across the cluster. This approach makes Go-KV eventually consistent.
One can set the [`stateSyncInterval`](./cluster/config.go) value to low for frequent state synchronisation at a network cost.
- Discovery API to implement custom nodes discovery provider. See: [Discovery API](./discovery/provider.go)
- Comes bundled with some discovery providers that can help you hit the ground running:
- [kubernetes](https://kubernetes.io/docs/home/) [api integration](./discovery/kubernetes) is fully functional
Expand All @@ -24,6 +25,15 @@ go get github.com/tochemey/gokv
- `Put`: create key/value pair that is eventually distributed in the cluster of nodes. The `key` is a string and the `value` is a byte array.
- `Get`: retrieves the value of a given `key` from the cluster of nodes.
- `Exists`: check the existence of a given `key` in the cluster.
- `Delete`: delete a given `key` from the cluster. At the moment the `key` is marked to be `archived`.

## Use Cases

- Distributed cache

## Example

There is an example on how to use it with NATs [here](./example/example.go)

## Builtin Discovery

Expand Down
47 changes: 37 additions & 10 deletions cluster/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
package cluster

import (
"maps"
"sync"
"time"

"github.com/hashicorp/memberlist"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/tochemey/gokv/internal/internalpb"
"github.com/tochemey/gokv/internal/lib"
Expand Down Expand Up @@ -118,22 +119,46 @@ func (d *Delegate) MergeRemoteState(buf []byte, join bool) {
// iterate all the entries coming from the remote node
// 1. if there is corresponding node ID in the node local state, combine the local state entries for that nodeID with the remote node entries
// 2. if there is no corresponding node ID in the node local state, set the entries with the remote entries
for _, nodeState := range remoteFSM.GetNodeStates() {
localEntries, ok := entries[nodeState.GetNodeId()]
for _, remoteNodeState := range remoteFSM.GetNodeStates() {
localEntries, ok := entries[remoteNodeState.GetNodeId()]
if !ok {
entries[nodeState.GetNodeId()] = nodeState.GetEntries()
entries[remoteNodeState.GetNodeId()] = remoteNodeState.GetEntries()
continue
}

// create entries when no entries are defined
if len(localEntries) == 0 {
localEntries = make(map[string]*internalpb.Entry)
}

maps.Copy(localEntries, nodeState.GetEntries())
entries[nodeState.GetNodeId()] = localEntries
/*******************************************************************************
small algorithm to merge incoming remote state entries with the local state entries
********************************************************************************/
// 1. iterate the incoming state entries
for key, remoteEntry := range remoteNodeState.GetEntries() {
// 2. check whether an incoming key already exists
localEntry, ok := localEntries[key]
// 3. if the key does not exist then add it as part of the existing entries
if !ok {
localEntries[key] = remoteEntry
continue
}

// 4. if the key entry exists then check its timestamp against the incoming entry
// 5. if the existing key entry is newer compared to the incoming entry ignore the incoming entry
if localEntry.GetTimestamp().AsTime().Unix() > remoteEntry.GetTimestamp().AsTime().Unix() {
continue
}

// 6. if the existing key entry is older compared to the incoming entry, t
// hen add the incoming entry as part of the existing entries
localEntries[key] = remoteEntry
}

entries[remoteNodeState.GetNodeId()] = localEntries
}

// iterate the entries and build the new nodeState list
// iterate the entries and build the new remoteNodeState list
nodeStates := make([]*internalpb.NodeState, 0, len(entries))
for k, v := range entries {
nodeStates = append(nodeStates, &internalpb.NodeState{
Expand All @@ -142,7 +167,7 @@ func (d *Delegate) MergeRemoteState(buf []byte, join bool) {
})
}

// set the local node state with the new nodeState list
// set the local node state with the new remoteNodeState list
d.fsm.NodeStates = nodeStates
}

Expand All @@ -159,8 +184,9 @@ func (d *Delegate) Put(key string, value []byte) {
for k := range nodeState.GetEntries() {
if k == key {
nodeState.Entries[k] = &internalpb.Entry{
Value: value,
Archived: nil,
Value: value,
Archived: nil,
Timestamp: timestamppb.New(time.Now().UTC()),
}
keyExists = true
break
Expand Down Expand Up @@ -222,6 +248,7 @@ func (d *Delegate) Delete(key string) {
for k := range nodeState.GetEntries() {
if k == key && nodeState.GetNodeId() == d.me {
nodeState.Entries[key].Archived = lib.Ptr(true)
nodeState.Entries[key].Timestamp = timestamppb.New(time.Now().UTC())
d.fsm.NodeStates[index] = nodeState
return
}
Expand Down
14 changes: 7 additions & 7 deletions cluster/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ func TestNodes(t *testing.T) {
require.NotEmpty(t, actual)
require.Equal(t, value, actual)

// // let us remove the key
// require.NoError(t, node2.Client().Delete(ctx, key))
// let us remove the key
require.NoError(t, node2.Client().Delete(ctx, key))

// // wait a bit for consistency
// lib.Pause(2 * time.Second)
// wait a bit for consistency
lib.Pause(time.Second)

// exists, err = node3.Client().Exists(ctx, key)
// require.NoError(t, err)
// require.False(t, exists)
exists, err = node3.Client().Exists(ctx, key)
require.NoError(t, err)
require.False(t, exists)

lib.Pause(time.Second)

Expand Down
Loading