diff --git a/README.md b/README.md index 9f17e19..cd93ecc 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Go-KV +# Go-KV [![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/Tochemey/gokv/build.yml)]((https://github.com/Tochemey/gokv/actions/workflows/build.yml)) @@ -23,7 +23,6 @@ go get github.com/tochemey/gokv - Built-in [Client](./cluster/client.go) to interact with the distributed store via the following apis: - `Put`: create key/value pair that is eventually distributed in the cluster of nodes. The `key` is a string and the `value` is a byte array. - `Get`: retrieves the value of a given `key` from the cluster of nodes. - - `Delete`: delete a given `key` from the cluster. At the moment the `key` is marked to be `deleted`. - `Exists`: check the existence of a given `key` in the cluster. ## Builtin Discovery diff --git a/cluster/client.go b/cluster/client.go index 4cd3033..7542e54 100644 --- a/cluster/client.go +++ b/cluster/client.go @@ -41,11 +41,7 @@ type Client struct { // http client httpClient *nethttp.Client // host defines the host discoveryAddress - host string - // port defines the gRCP port for client connections - port int kvService internalpbconnect.KVServiceClient - connected *atomic.Bool } @@ -54,10 +50,11 @@ func (client *Client) Put(ctx context.Context, key string, value []byte) error { if !client.connected.Load() { return ErrClientNotConnected } - _, err := client.kvService.Put(ctx, connect.NewRequest(&internalpb.PutRequest{ - Key: key, - Value: value, - })) + _, err := client.kvService.Put(ctx, connect.NewRequest( + &internalpb.PutRequest{ + Key: key, + Value: value, + })) return err } @@ -66,9 +63,10 @@ func (client *Client) Get(ctx context.Context, key string) ([]byte, error) { if !client.connected.Load() { return nil, ErrClientNotConnected } - response, err := client.kvService.Get(ctx, connect.NewRequest(&internalpb.GetRequest{ - Key: key, - })) + response, err := client.kvService.Get(ctx, connect.NewRequest( + &internalpb.GetRequest{ + Key: key, + })) if err != nil { code := connect.CodeOf(err) @@ -78,17 +76,19 @@ func (client *Client) Get(ctx context.Context, key string) ([]byte, error) { return nil, err } - return response.Msg.GetKv().GetValue(), nil + return response.Msg.GetValue(), nil } // Delete deletes a given key from the cluster +// nolint func (client *Client) Delete(ctx context.Context, key string) error { if !client.connected.Load() { return ErrClientNotConnected } - _, err := client.kvService.Delete(ctx, connect.NewRequest(&internalpb.DeleteRequest{ - Key: key, - })) + _, err := client.kvService.Delete(ctx, connect.NewRequest( + &internalpb.DeleteRequest{ + Key: key, + })) return err } @@ -99,9 +99,10 @@ func (client *Client) Exists(ctx context.Context, key string) (bool, error) { return false, ErrClientNotConnected } - response, err := client.kvService.KeyExists(ctx, connect.NewRequest(&internalpb.KeyExistsRequest{ - Key: key, - })) + response, err := client.kvService.KeyExists(ctx, connect.NewRequest( + &internalpb.KeyExistsRequest{ + Key: key, + })) if err != nil { return false, err @@ -132,8 +133,6 @@ func newClient(host string, port int) *Client { return &Client{ httpClient: httpClient, kvService: kvService, - host: host, - port: port, connected: atomic.NewBool(true), } } diff --git a/cluster/config.go b/cluster/config.go index 29d4670..db40bf1 100644 --- a/cluster/config.go +++ b/cluster/config.go @@ -54,9 +54,9 @@ type Config struct { // specifies the host // This is the ip address of the running node. host string - // specifies the state sync interval - // This is the interval between complete state syncs. - // Complete state syncs are done with a single node over TCP and are + // specifies the delegate sync interval + // This is the interval between complete delegate syncs. + // Complete delegate syncs are done with a single node over TCP and are // quite expensive relative to standard gossiped messages. // Setting this interval lower (more frequent) will increase convergence // speeds across larger clusters at the expense of increased bandwidth usage. @@ -127,7 +127,7 @@ func (config *Config) WithHost(host string) *Config { return config } -// WithStateSyncInterval sets the state sync interval +// WithStateSyncInterval sets the delegate sync interval func (config *Config) WithStateSyncInterval(interval time.Duration) *Config { config.stateSyncInterval = interval return config diff --git a/cluster/delegate.go b/cluster/delegate.go new file mode 100644 index 0000000..bd19be7 --- /dev/null +++ b/cluster/delegate.go @@ -0,0 +1,262 @@ +/* + * MIT License + * + * Copyright (c) 2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package cluster + +import ( + "maps" + "sync" + + "github.com/hashicorp/memberlist" + "google.golang.org/protobuf/proto" + + "github.com/tochemey/gokv/internal/internalpb" + "github.com/tochemey/gokv/internal/lib" +) + +// Delegate defines a node delegate +type Delegate struct { + *sync.RWMutex + // node metadata shared in the cluster + // for instance the IP discoveryAddress of the node, the name of the node + // relevant information that can be known by the other peers in the cluster + nodeMeta *internalpb.NodeMeta + // holds the peers state + fsm *internalpb.FSM + + me string +} + +// enforce compilation error +var _ memberlist.Delegate = (*Delegate)(nil) + +// NodeMeta is used to retrieve meta-data about the current node +// when broadcasting an alive message. It's length is limited to +// the given byte size. This metadata is available in the Node structure. +// nolint +func (d *Delegate) NodeMeta(limit int) []byte { + d.Lock() + // no need to check the error + bytea, _ := proto.Marshal(d.nodeMeta) + d.Unlock() + return bytea +} + +// NotifyMsg is called when a user-data message is received. +// Care should be taken that this method does not block, since doing +// so would block the entire UDP packet receive loop. Additionally, the byte +// slice may be modified after the call returns, so it should be copied if needed +// nolint +func (d *Delegate) NotifyMsg(bytes []byte) { + // push/pull sync all the way +} + +// GetBroadcasts is called when user data messages can be broadcast. +// It can return a list of buffers to send. Each buffer should assume an +// overhead as provided with a limit on the total byte size allowed. +// The total byte size of the resulting data to send must not exceed +// the limit. Care should be taken that this method does not block, +// since doing so would block the entire UDP packet receive loop. +// nolint +func (d *Delegate) GetBroadcasts(overhead, limit int) [][]byte { + // nothing to broadcast + return nil +} + +// LocalState is used for a TCP Push/Pull. This is sent to +// the remote side in addition to the membership information. Any +// data can be sent here. See MergeRemoteState as well. The `join` +// boolean indicates this is for a join instead of a push/pull. +// nolint +func (d *Delegate) LocalState(join bool) []byte { + d.Lock() + bytea, _ := proto.Marshal(d.fsm) + d.Unlock() + return bytea +} + +// MergeRemoteState is invoked after a TCP Push/Pull. This is the +// delegate received from the remote side and is the result of the +// remote side's LocalState call. The 'join' +// boolean indicates this is for a join instead of a push/pull. +// nolint +func (d *Delegate) MergeRemoteState(buf []byte, join bool) { + d.Lock() + defer d.Unlock() + + remoteFSM := new(internalpb.FSM) + _ = proto.Unmarshal(buf, remoteFSM) + localFSM := d.fsm + + // build the map of all entries in the node local state + entries := make(map[string]map[string]*internalpb.Entry) + for _, nodeState := range localFSM.GetNodeStates() { + entries[nodeState.GetNodeId()] = nodeState.GetEntries() + } + + // iterate all the entries coming from the remote node + // 1. if there is corresponding node ID in the node local state, combine the local state entries for that nodeID with the remote node entries + // 2. if there is no corresponding node ID in the node local state, set the entries with the remote entries + for _, nodeState := range remoteFSM.GetNodeStates() { + localEntries, ok := entries[nodeState.GetNodeId()] + if !ok { + entries[nodeState.GetNodeId()] = nodeState.GetEntries() + continue + } + + if len(localEntries) == 0 { + localEntries = make(map[string]*internalpb.Entry) + } + + maps.Copy(localEntries, nodeState.GetEntries()) + entries[nodeState.GetNodeId()] = localEntries + } + + // iterate the entries and build the new nodeState list + nodeStates := make([]*internalpb.NodeState, 0, len(entries)) + for k, v := range entries { + nodeStates = append(nodeStates, &internalpb.NodeState{ + NodeId: k, + Entries: v, + }) + } + + // set the local node state with the new nodeState list + d.fsm.NodeStates = nodeStates +} + +// Put adds the key/value to the store +func (d *Delegate) Put(key string, value []byte) { + d.Lock() + defer d.Unlock() + + localState := d.fsm + keyExists := false + + // first check the key existence and overwrite when found + for _, nodeState := range localState.GetNodeStates() { + for k := range nodeState.GetEntries() { + if k == key { + nodeState.Entries[k] = &internalpb.Entry{ + Value: value, + Archived: nil, + } + keyExists = true + break + } + } + } + + // key has been found which means it has been overwritten + // just return + if keyExists { + return + } + + // key does not exist which means the given node is adding it as + // part of its key/value pair + for _, nodeState := range localState.GetNodeStates() { + if nodeState.GetNodeId() == d.me { + if len(nodeState.GetEntries()) == 0 { + nodeState.Entries = map[string]*internalpb.Entry{ + key: { + Value: value, + Archived: nil, + }, + } + return + } + + nodeState.GetEntries()[key] = &internalpb.Entry{ + Value: value, + Archived: nil, + } + return + } + } +} + +// Get returns the value of the given key +func (d *Delegate) Get(key string) []byte { + d.RLock() + defer d.RUnlock() + localState := d.fsm + for _, nodeState := range localState.GetNodeStates() { + for k, entry := range nodeState.GetEntries() { + if k == key { + return entry.GetValue() + } + } + } + return nil +} + +// Delete deletes the given key from the cluster +// One can only delete a key if the given node is the owner +func (d *Delegate) Delete(key string) { + d.Lock() + defer d.Unlock() + + for index, nodeState := range d.fsm.GetNodeStates() { + for k := range nodeState.GetEntries() { + if k == key && nodeState.GetNodeId() == d.me { + nodeState.Entries[key].Archived = lib.Ptr(true) + d.fsm.NodeStates[index] = nodeState + return + } + } + } +} + +// Exists checks whether a given exists +func (d *Delegate) Exists(key string) bool { + d.RLock() + defer d.RUnlock() + localState := d.fsm + for _, nodeState := range localState.GetNodeStates() { + for k, entry := range nodeState.GetEntries() { + if k == key { + return !entry.GetArchived() && len(entry.GetValue()) > 0 + } + } + } + return false +} + +// newDelegate creates a new Delegate +func newDelegate(name string, meta *internalpb.NodeMeta) *Delegate { + return &Delegate{ + RWMutex: &sync.RWMutex{}, + nodeMeta: meta, + me: name, + fsm: &internalpb.FSM{ + NodeStates: []*internalpb.NodeState{ + { + NodeId: name, + Entries: make(map[string]*internalpb.Entry, 10), + }, + }, + }, + } +} diff --git a/cluster/node.go b/cluster/node.go index f9a1990..eb609c9 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -38,7 +38,6 @@ import ( "connectrpc.com/connect" "github.com/hashicorp/memberlist" "go.uber.org/atomic" - "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "github.com/tochemey/gokv/internal/errorschain" @@ -55,9 +54,9 @@ type Node struct { config *Config - // state holds the node state - // through memberlist this state will be eventually gossiped to the rest of the cluster - state *State + // delegate holds the node delegate + // through memberlist this delegate will be eventually gossiped to the rest of the cluster + delegate *Delegate memberConfig *memberlist.Config memberlist *memberlist.Memberlist @@ -93,19 +92,21 @@ func NewNode(config *Config) *Node { DiscoveryPort: uint32(config.discoveryPort), CreationTime: timestamppb.New(time.Now().UTC()), } - state := newState(meta) - mconfig.Delegate = state + + discoveryAddr := lib.HostPort(config.host, int(config.discoveryPort)) + delegate := newDelegate(discoveryAddr, meta) + mconfig.Delegate = delegate return &Node{ mu: new(sync.Mutex), - state: state, + delegate: delegate, memberConfig: mconfig, started: atomic.NewBool(false), eventsChan: make(chan *Event, 1), stopEventsListener: make(chan struct{}, 1), eventsLock: new(sync.Mutex), config: config, - discoveryAddress: lib.HostPort(config.host, int(config.discoveryPort)), + discoveryAddress: discoveryAddr, } } @@ -177,7 +178,7 @@ func (node *Node) Stop(ctx context.Context) error { // Put is used to distribute a key/value pair across a cluster of nodes // nolint -func (node *Node) Put(ctx context.Context, request *connect.Request[internalpb.PutRequest]) (*connect.Response[internalpb.PutResponse], error) { +func (node *Node) Put(_ context.Context, request *connect.Request[internalpb.PutRequest]) (*connect.Response[internalpb.PutResponse], error) { node.mu.Lock() if !node.started.Load() { node.mu.Unlock() @@ -185,7 +186,7 @@ func (node *Node) Put(ctx context.Context, request *connect.Request[internalpb.P } req := request.Msg - node.state.Put(req.GetKey(), req.GetValue()) + node.delegate.Put(req.GetKey(), req.GetValue()) node.mu.Unlock() return connect.NewResponse(new(internalpb.PutResponse)), nil @@ -193,7 +194,7 @@ func (node *Node) Put(ctx context.Context, request *connect.Request[internalpb.P // Get is used to retrieve a key/value pair in a cluster of nodes // nolint -func (node *Node) Get(ctx context.Context, request *connect.Request[internalpb.GetRequest]) (*connect.Response[internalpb.GetResponse], error) { +func (node *Node) Get(_ context.Context, request *connect.Request[internalpb.GetRequest]) (*connect.Response[internalpb.GetResponse], error) { node.mu.Lock() if !node.started.Load() { node.mu.Unlock() @@ -201,21 +202,21 @@ func (node *Node) Get(ctx context.Context, request *connect.Request[internalpb.G } req := request.Msg - kv := node.state.Get(req.GetKey()) - if kv == nil || proto.Equal(kv, new(internalpb.KV)) { + entry := node.delegate.Get(req.GetKey()) + if len(entry) == 0 { node.mu.Unlock() return nil, connect.NewError(connect.CodeNotFound, ErrKeyNotFound) } node.mu.Unlock() return connect.NewResponse(&internalpb.GetResponse{ - Kv: kv, + Value: entry, }), nil } // Delete is used to remove a key/value pair from a cluster of nodes // nolint -func (node *Node) Delete(ctx context.Context, request *connect.Request[internalpb.DeleteRequest]) (*connect.Response[internalpb.DeleteResponse], error) { +func (node *Node) Delete(_ context.Context, request *connect.Request[internalpb.DeleteRequest]) (*connect.Response[internalpb.DeleteResponse], error) { node.mu.Lock() if !node.started.Load() { node.mu.Unlock() @@ -223,7 +224,7 @@ func (node *Node) Delete(ctx context.Context, request *connect.Request[internalp } req := request.Msg - node.state.Delete(req.GetKey()) + node.delegate.Delete(req.GetKey()) node.mu.Unlock() return connect.NewResponse(new(internalpb.DeleteResponse)), nil @@ -231,7 +232,7 @@ func (node *Node) Delete(ctx context.Context, request *connect.Request[internalp // KeyExists is used to check the existence of a given key in the cluster // nolint -func (node *Node) KeyExists(ctx context.Context, request *connect.Request[internalpb.KeyExistsRequest]) (*connect.Response[internalpb.KeyExistResponse], error) { +func (node *Node) KeyExists(_ context.Context, request *connect.Request[internalpb.KeyExistsRequest]) (*connect.Response[internalpb.KeyExistResponse], error) { node.mu.Lock() if !node.started.Load() { node.mu.Unlock() @@ -239,7 +240,7 @@ func (node *Node) KeyExists(ctx context.Context, request *connect.Request[intern } req := request.Msg - exists := node.state.Exists(req.GetKey()) + exists := node.delegate.Exists(req.GetKey()) node.mu.Unlock() return connect.NewResponse(&internalpb.KeyExistResponse{Exists: exists}), nil } diff --git a/cluster/node_test.go b/cluster/node_test.go index ae248db..0de7b93 100644 --- a/cluster/node_test.go +++ b/cluster/node_test.go @@ -84,15 +84,15 @@ func TestNodes(t *testing.T) { require.NotEmpty(t, actual) require.Equal(t, value, actual) - // let us remove the key - require.NoError(t, node1.Client().Delete(ctx, key)) + // // let us remove the key + // require.NoError(t, node2.Client().Delete(ctx, key)) - // wait a bit for consistency - lib.Pause(time.Second) + // // wait a bit for consistency + // lib.Pause(2 * time.Second) - exists, err = node3.Client().Exists(ctx, key) - require.NoError(t, err) - require.False(t, exists) + // exists, err = node3.Client().Exists(ctx, key) + // require.NoError(t, err) + // require.False(t, exists) lib.Pause(time.Second) diff --git a/cluster/state.go b/cluster/state.go deleted file mode 100644 index a0adab8..0000000 --- a/cluster/state.go +++ /dev/null @@ -1,173 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2024 Tochemey - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package cluster - -import ( - "sync" - "time" - - "github.com/hashicorp/memberlist" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/timestamppb" - - "github.com/tochemey/gokv/internal/internalpb" -) - -// State defines a node state -type State struct { - *sync.RWMutex - // node metadata shared in the cluster - // for instance the IP discoveryAddress of the node, the name of the node - // relevant information that can be known by the other peers in the cluster - nodeMeta *internalpb.NodeMeta - // node internal state - this is the actual config being gossiped - nodeState *internalpb.NodeState -} - -// enforce compilation error -var _ memberlist.Delegate = (*State)(nil) - -// NodeMeta is used to retrieve meta-data about the current node -// when broadcasting an alive message. It's length is limited to -// the given byte size. This metadata is available in the Node structure. -// nolint -func (state *State) NodeMeta(limit int) []byte { - state.Lock() - // no need to check the error - bytea, _ := proto.Marshal(state.nodeMeta) - state.Unlock() - return bytea -} - -// NotifyMsg is called when a user-data message is received. -// Care should be taken that this method does not block, since doing -// so would block the entire UDP packet receive loop. Additionally, the byte -// slice may be modified after the call returns, so it should be copied if needed -// nolint -func (state *State) NotifyMsg(bytes []byte) { - // push/pull sync all the way -} - -// GetBroadcasts is called when user data messages can be broadcast. -// It can return a list of buffers to send. Each buffer should assume an -// overhead as provided with a limit on the total byte size allowed. -// The total byte size of the resulting data to send must not exceed -// the limit. Care should be taken that this method does not block, -// since doing so would block the entire UDP packet receive loop. -// nolint -func (state *State) GetBroadcasts(overhead, limit int) [][]byte { - // nothing to broadcast - return nil -} - -// LocalState is used for a TCP Push/Pull. This is sent to -// the remote side in addition to the membership information. Any -// data can be sent here. See MergeRemoteState as well. The `join` -// boolean indicates this is for a join instead of a push/pull. -// nolint -func (state *State) LocalState(join bool) []byte { - state.Lock() - // no need to check the error - bytea, _ := proto.Marshal(state.nodeState) - state.Unlock() - return bytea -} - -// MergeRemoteState is invoked after a TCP Push/Pull. This is the -// state received from the remote side and is the result of the -// remote side's LocalState call. The 'join' -// boolean indicates this is for a join instead of a push/pull. -// nolint -func (state *State) MergeRemoteState(buf []byte, join bool) { - state.Lock() - remoteState := new(internalpb.NodeState) - // ignore the error for the meantime - // TODO: check it when necessary - _ = proto.Unmarshal(buf, remoteState) - - currentState := state.nodeState.GetState() - for key, value := range remoteState.GetState() { - if currentValue, ok := currentState[key]; ok { - if !currentValue.GetTombstone() && !proto.Equal(currentValue, value) { - state.nodeState.State[key] = value - } - continue - } - state.nodeState.State[key] = value - } - - state.Unlock() -} - -// Put adds the key/value to the store -func (state *State) Put(key string, value []byte) { - state.Lock() - state.nodeState.State[key] = &internalpb.KV{ - Key: key, - Value: value, - CreatedAt: timestamppb.New(time.Now().UTC()), - Tombstone: false, - } - state.Unlock() -} - -// Get returns the value of the given key -func (state *State) Get(key string) *internalpb.KV { - state.RLock() - kv := state.nodeState.GetState()[key] - state.RUnlock() - return kv -} - -// Delete deletes the given key from the cluster -func (state *State) Delete(key string) { - state.Lock() - kv, ok := state.nodeState.GetState()[key] - if !ok { - return - } - kv.Tombstone = true - state.nodeState.State[key] = kv - state.Unlock() -} - -// Exists checks whether a given exists -func (state *State) Exists(key string) bool { - state.RLock() - kv, ok := state.nodeState.GetState()[key] - state.RUnlock() - return ok && !kv.GetTombstone() -} - -// newState creates a new State -func newState(meta *internalpb.NodeMeta) *State { - return &State{ - RWMutex: &sync.RWMutex{}, - nodeMeta: meta, - nodeState: &internalpb.NodeState{ - State: make(map[string]*internalpb.KV), - }, - } -} diff --git a/internal/internalpb/gokv.pb.go b/internal/internalpb/gokv.pb.go index 630487b..c59e624 100644 --- a/internal/internalpb/gokv.pb.go +++ b/internal/internalpb/gokv.pb.go @@ -21,19 +21,20 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -// NodeState defines the node state -// This will be distributed in the cluster -type NodeState struct { +// Entry represents the key/value pair +type Entry struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // Specifies the node state - State map[string]*KV `protobuf:"bytes,1,rep,name=state,proto3" json:"state,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // Specifies the value + Value []byte `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` + // States whether it is archived or not + Archived *bool `protobuf:"varint,3,opt,name=archived,proto3,oneof" json:"archived,omitempty"` } -func (x *NodeState) Reset() { - *x = NodeState{} +func (x *Entry) Reset() { + *x = Entry{} if protoimpl.UnsafeEnabled { mi := &file_internal_gokv_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -41,13 +42,13 @@ func (x *NodeState) Reset() { } } -func (x *NodeState) String() string { +func (x *Entry) String() string { return protoimpl.X.MessageStringOf(x) } -func (*NodeState) ProtoMessage() {} +func (*Entry) ProtoMessage() {} -func (x *NodeState) ProtoReflect() protoreflect.Message { +func (x *Entry) ProtoReflect() protoreflect.Message { mi := &file_internal_gokv_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -59,38 +60,40 @@ func (x *NodeState) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use NodeState.ProtoReflect.Descriptor instead. -func (*NodeState) Descriptor() ([]byte, []int) { +// Deprecated: Use Entry.ProtoReflect.Descriptor instead. +func (*Entry) Descriptor() ([]byte, []int) { return file_internal_gokv_proto_rawDescGZIP(), []int{0} } -func (x *NodeState) GetState() map[string]*KV { +func (x *Entry) GetValue() []byte { if x != nil { - return x.State + return x.Value } return nil } -// NodeMeta defines the node metadata -type NodeMeta struct { +func (x *Entry) GetArchived() bool { + if x != nil && x.Archived != nil { + return *x.Archived + } + return false +} + +// NodeState defines the node state +// This will be distributed in the cluster +type NodeState struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // Specifies the node name - Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` - // Specifies the node host - Host string `protobuf:"bytes,2,opt,name=host,proto3" json:"host,omitempty"` - // Specifies the node port - Port uint32 `protobuf:"varint,3,opt,name=port,proto3" json:"port,omitempty"` - // Specifies the node discovery port - DiscoveryPort uint32 `protobuf:"varint,4,opt,name=discovery_port,json=discoveryPort,proto3" json:"discovery_port,omitempty"` - // Specifies the creation time - CreationTime *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=creation_time,json=creationTime,proto3" json:"creation_time,omitempty"` + // Specifies the nodeId + NodeId string `protobuf:"bytes,1,opt,name=node_id,json=nodeId,proto3" json:"node_id,omitempty"` + // Specifies the entries + Entries map[string]*Entry `protobuf:"bytes,2,rep,name=entries,proto3" json:"entries,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } -func (x *NodeMeta) Reset() { - *x = NodeMeta{} +func (x *NodeState) Reset() { + *x = NodeState{} if protoimpl.UnsafeEnabled { mi := &file_internal_gokv_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -98,13 +101,13 @@ func (x *NodeMeta) Reset() { } } -func (x *NodeMeta) String() string { +func (x *NodeState) String() string { return protoimpl.X.MessageStringOf(x) } -func (*NodeMeta) ProtoMessage() {} +func (*NodeState) ProtoMessage() {} -func (x *NodeMeta) ProtoReflect() protoreflect.Message { +func (x *NodeState) ProtoReflect() protoreflect.Message { mi := &file_internal_gokv_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -116,79 +119,109 @@ func (x *NodeMeta) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use NodeMeta.ProtoReflect.Descriptor instead. -func (*NodeMeta) Descriptor() ([]byte, []int) { +// Deprecated: Use NodeState.ProtoReflect.Descriptor instead. +func (*NodeState) Descriptor() ([]byte, []int) { return file_internal_gokv_proto_rawDescGZIP(), []int{1} } -func (x *NodeMeta) GetName() string { +func (x *NodeState) GetNodeId() string { if x != nil { - return x.Name + return x.NodeId } return "" } -func (x *NodeMeta) GetHost() string { +func (x *NodeState) GetEntries() map[string]*Entry { if x != nil { - return x.Host + return x.Entries } - return "" + return nil } -func (x *NodeMeta) GetPort() uint32 { - if x != nil { - return x.Port +// FSM defines the delegate FSM +type FSM struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Specifies the list of node states + NodeStates []*NodeState `protobuf:"bytes,1,rep,name=node_states,json=nodeStates,proto3" json:"node_states,omitempty"` +} + +func (x *FSM) Reset() { + *x = FSM{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_gokv_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } - return 0 } -func (x *NodeMeta) GetDiscoveryPort() uint32 { - if x != nil { - return x.DiscoveryPort +func (x *FSM) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FSM) ProtoMessage() {} + +func (x *FSM) ProtoReflect() protoreflect.Message { + mi := &file_internal_gokv_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms } - return 0 + return mi.MessageOf(x) } -func (x *NodeMeta) GetCreationTime() *timestamppb.Timestamp { +// Deprecated: Use FSM.ProtoReflect.Descriptor instead. +func (*FSM) Descriptor() ([]byte, []int) { + return file_internal_gokv_proto_rawDescGZIP(), []int{2} +} + +func (x *FSM) GetNodeStates() []*NodeState { if x != nil { - return x.CreationTime + return x.NodeStates } return nil } -// KV defines the key/value pair -type KV struct { +// NodeMeta defines the node metadata +type NodeMeta struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // Specifies the key - Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` - // Specifies the value - Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` - // Specifies the creation time timestamp - CreatedAt *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` - // States that given is marked for tombstone - Tombstone bool `protobuf:"varint,4,opt,name=tombstone,proto3" json:"tombstone,omitempty"` + // Specifies the node name + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + // Specifies the node host + Host string `protobuf:"bytes,2,opt,name=host,proto3" json:"host,omitempty"` + // Specifies the node port + Port uint32 `protobuf:"varint,3,opt,name=port,proto3" json:"port,omitempty"` + // Specifies the node discovery port + DiscoveryPort uint32 `protobuf:"varint,4,opt,name=discovery_port,json=discoveryPort,proto3" json:"discovery_port,omitempty"` + // Specifies the creation time + CreationTime *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=creation_time,json=creationTime,proto3" json:"creation_time,omitempty"` } -func (x *KV) Reset() { - *x = KV{} +func (x *NodeMeta) Reset() { + *x = NodeMeta{} if protoimpl.UnsafeEnabled { - mi := &file_internal_gokv_proto_msgTypes[2] + mi := &file_internal_gokv_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } -func (x *KV) String() string { +func (x *NodeMeta) String() string { return protoimpl.X.MessageStringOf(x) } -func (*KV) ProtoMessage() {} +func (*NodeMeta) ProtoMessage() {} -func (x *KV) ProtoReflect() protoreflect.Message { - mi := &file_internal_gokv_proto_msgTypes[2] +func (x *NodeMeta) ProtoReflect() protoreflect.Message { + mi := &file_internal_gokv_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -199,37 +232,44 @@ func (x *KV) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use KV.ProtoReflect.Descriptor instead. -func (*KV) Descriptor() ([]byte, []int) { - return file_internal_gokv_proto_rawDescGZIP(), []int{2} +// Deprecated: Use NodeMeta.ProtoReflect.Descriptor instead. +func (*NodeMeta) Descriptor() ([]byte, []int) { + return file_internal_gokv_proto_rawDescGZIP(), []int{3} } -func (x *KV) GetKey() string { +func (x *NodeMeta) GetName() string { if x != nil { - return x.Key + return x.Name } return "" } -func (x *KV) GetValue() []byte { +func (x *NodeMeta) GetHost() string { if x != nil { - return x.Value + return x.Host } - return nil + return "" } -func (x *KV) GetCreatedAt() *timestamppb.Timestamp { +func (x *NodeMeta) GetPort() uint32 { if x != nil { - return x.CreatedAt + return x.Port } - return nil + return 0 } -func (x *KV) GetTombstone() bool { +func (x *NodeMeta) GetDiscoveryPort() uint32 { if x != nil { - return x.Tombstone + return x.DiscoveryPort } - return false + return 0 +} + +func (x *NodeMeta) GetCreationTime() *timestamppb.Timestamp { + if x != nil { + return x.CreationTime + } + return nil } // GetRequest is used to fetch the value of a given key @@ -245,7 +285,7 @@ type GetRequest struct { func (x *GetRequest) Reset() { *x = GetRequest{} if protoimpl.UnsafeEnabled { - mi := &file_internal_gokv_proto_msgTypes[3] + mi := &file_internal_gokv_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -258,7 +298,7 @@ func (x *GetRequest) String() string { func (*GetRequest) ProtoMessage() {} func (x *GetRequest) ProtoReflect() protoreflect.Message { - mi := &file_internal_gokv_proto_msgTypes[3] + mi := &file_internal_gokv_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -271,7 +311,7 @@ func (x *GetRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use GetRequest.ProtoReflect.Descriptor instead. func (*GetRequest) Descriptor() ([]byte, []int) { - return file_internal_gokv_proto_rawDescGZIP(), []int{3} + return file_internal_gokv_proto_rawDescGZIP(), []int{4} } func (x *GetRequest) GetKey() string { @@ -288,13 +328,13 @@ type GetResponse struct { unknownFields protoimpl.UnknownFields // Specifies the KV - Kv *KV `protobuf:"bytes,1,opt,name=kv,proto3" json:"kv,omitempty"` + Value []byte `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` } func (x *GetResponse) Reset() { *x = GetResponse{} if protoimpl.UnsafeEnabled { - mi := &file_internal_gokv_proto_msgTypes[4] + mi := &file_internal_gokv_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -307,7 +347,7 @@ func (x *GetResponse) String() string { func (*GetResponse) ProtoMessage() {} func (x *GetResponse) ProtoReflect() protoreflect.Message { - mi := &file_internal_gokv_proto_msgTypes[4] + mi := &file_internal_gokv_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -320,12 +360,12 @@ func (x *GetResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use GetResponse.ProtoReflect.Descriptor instead. func (*GetResponse) Descriptor() ([]byte, []int) { - return file_internal_gokv_proto_rawDescGZIP(), []int{4} + return file_internal_gokv_proto_rawDescGZIP(), []int{5} } -func (x *GetResponse) GetKv() *KV { +func (x *GetResponse) GetValue() []byte { if x != nil { - return x.Kv + return x.Value } return nil } @@ -345,7 +385,7 @@ type PutRequest struct { func (x *PutRequest) Reset() { *x = PutRequest{} if protoimpl.UnsafeEnabled { - mi := &file_internal_gokv_proto_msgTypes[5] + mi := &file_internal_gokv_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -358,7 +398,7 @@ func (x *PutRequest) String() string { func (*PutRequest) ProtoMessage() {} func (x *PutRequest) ProtoReflect() protoreflect.Message { - mi := &file_internal_gokv_proto_msgTypes[5] + mi := &file_internal_gokv_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -371,7 +411,7 @@ func (x *PutRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use PutRequest.ProtoReflect.Descriptor instead. func (*PutRequest) Descriptor() ([]byte, []int) { - return file_internal_gokv_proto_rawDescGZIP(), []int{5} + return file_internal_gokv_proto_rawDescGZIP(), []int{6} } func (x *PutRequest) GetKey() string { @@ -398,7 +438,7 @@ type PutResponse struct { func (x *PutResponse) Reset() { *x = PutResponse{} if protoimpl.UnsafeEnabled { - mi := &file_internal_gokv_proto_msgTypes[6] + mi := &file_internal_gokv_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -411,7 +451,7 @@ func (x *PutResponse) String() string { func (*PutResponse) ProtoMessage() {} func (x *PutResponse) ProtoReflect() protoreflect.Message { - mi := &file_internal_gokv_proto_msgTypes[6] + mi := &file_internal_gokv_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -424,7 +464,7 @@ func (x *PutResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use PutResponse.ProtoReflect.Descriptor instead. func (*PutResponse) Descriptor() ([]byte, []int) { - return file_internal_gokv_proto_rawDescGZIP(), []int{6} + return file_internal_gokv_proto_rawDescGZIP(), []int{7} } // DeleteRequest is used to remove a distributed key from the cluster @@ -440,7 +480,7 @@ type DeleteRequest struct { func (x *DeleteRequest) Reset() { *x = DeleteRequest{} if protoimpl.UnsafeEnabled { - mi := &file_internal_gokv_proto_msgTypes[7] + mi := &file_internal_gokv_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -453,7 +493,7 @@ func (x *DeleteRequest) String() string { func (*DeleteRequest) ProtoMessage() {} func (x *DeleteRequest) ProtoReflect() protoreflect.Message { - mi := &file_internal_gokv_proto_msgTypes[7] + mi := &file_internal_gokv_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -466,7 +506,7 @@ func (x *DeleteRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use DeleteRequest.ProtoReflect.Descriptor instead. func (*DeleteRequest) Descriptor() ([]byte, []int) { - return file_internal_gokv_proto_rawDescGZIP(), []int{7} + return file_internal_gokv_proto_rawDescGZIP(), []int{8} } func (x *DeleteRequest) GetKey() string { @@ -486,7 +526,7 @@ type DeleteResponse struct { func (x *DeleteResponse) Reset() { *x = DeleteResponse{} if protoimpl.UnsafeEnabled { - mi := &file_internal_gokv_proto_msgTypes[8] + mi := &file_internal_gokv_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -499,7 +539,7 @@ func (x *DeleteResponse) String() string { func (*DeleteResponse) ProtoMessage() {} func (x *DeleteResponse) ProtoReflect() protoreflect.Message { - mi := &file_internal_gokv_proto_msgTypes[8] + mi := &file_internal_gokv_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -512,7 +552,7 @@ func (x *DeleteResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use DeleteResponse.ProtoReflect.Descriptor instead. func (*DeleteResponse) Descriptor() ([]byte, []int) { - return file_internal_gokv_proto_rawDescGZIP(), []int{8} + return file_internal_gokv_proto_rawDescGZIP(), []int{9} } // KeyExistsRequest is used to check the existence of a given key @@ -529,7 +569,7 @@ type KeyExistsRequest struct { func (x *KeyExistsRequest) Reset() { *x = KeyExistsRequest{} if protoimpl.UnsafeEnabled { - mi := &file_internal_gokv_proto_msgTypes[9] + mi := &file_internal_gokv_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -542,7 +582,7 @@ func (x *KeyExistsRequest) String() string { func (*KeyExistsRequest) ProtoMessage() {} func (x *KeyExistsRequest) ProtoReflect() protoreflect.Message { - mi := &file_internal_gokv_proto_msgTypes[9] + mi := &file_internal_gokv_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -555,7 +595,7 @@ func (x *KeyExistsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use KeyExistsRequest.ProtoReflect.Descriptor instead. func (*KeyExistsRequest) Descriptor() ([]byte, []int) { - return file_internal_gokv_proto_rawDescGZIP(), []int{9} + return file_internal_gokv_proto_rawDescGZIP(), []int{10} } func (x *KeyExistsRequest) GetKey() string { @@ -577,7 +617,7 @@ type KeyExistResponse struct { func (x *KeyExistResponse) Reset() { *x = KeyExistResponse{} if protoimpl.UnsafeEnabled { - mi := &file_internal_gokv_proto_msgTypes[10] + mi := &file_internal_gokv_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -590,7 +630,7 @@ func (x *KeyExistResponse) String() string { func (*KeyExistResponse) ProtoMessage() {} func (x *KeyExistResponse) ProtoReflect() protoreflect.Message { - mi := &file_internal_gokv_proto_msgTypes[10] + mi := &file_internal_gokv_proto_msgTypes[11] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -603,7 +643,7 @@ func (x *KeyExistResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use KeyExistResponse.ProtoReflect.Descriptor instead. func (*KeyExistResponse) Descriptor() ([]byte, []int) { - return file_internal_gokv_proto_rawDescGZIP(), []int{10} + return file_internal_gokv_proto_rawDescGZIP(), []int{11} } func (x *KeyExistResponse) GetExists() bool { @@ -620,16 +660,27 @@ var file_internal_gokv_proto_rawDesc = []byte{ 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x22, 0x8d, 0x01, 0x0a, 0x09, 0x4e, 0x6f, 0x64, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, - 0x12, 0x36, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x20, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x2e, 0x4e, 0x6f, 0x64, - 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, - 0x79, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x1a, 0x48, 0x0a, 0x0a, 0x53, 0x74, 0x61, 0x74, - 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x24, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, - 0x61, 0x6c, 0x70, 0x62, 0x2e, 0x4b, 0x56, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, - 0x38, 0x01, 0x22, 0xae, 0x01, 0x0a, 0x08, 0x4e, 0x6f, 0x64, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x12, + 0x74, 0x6f, 0x22, 0x4b, 0x0a, 0x05, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x12, 0x1f, 0x0a, 0x08, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x64, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x08, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x64, 0x88, + 0x01, 0x01, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x64, 0x22, + 0xb1, 0x01, 0x0a, 0x09, 0x4e, 0x6f, 0x64, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x17, 0x0a, + 0x07, 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, + 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, 0x3c, 0x0a, 0x07, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x65, + 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, + 0x61, 0x6c, 0x70, 0x62, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x2e, 0x45, + 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x65, 0x6e, 0x74, + 0x72, 0x69, 0x65, 0x73, 0x1a, 0x4d, 0x0a, 0x0c, 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x27, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, + 0x70, 0x62, 0x2e, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, + 0x02, 0x38, 0x01, 0x22, 0x3d, 0x0a, 0x03, 0x46, 0x53, 0x4d, 0x12, 0x36, 0x0a, 0x0b, 0x6e, 0x6f, + 0x64, 0x65, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x15, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x2e, 0x4e, 0x6f, 0x64, + 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x0a, 0x6e, 0x6f, 0x64, 0x65, 0x53, 0x74, 0x61, 0x74, + 0x65, 0x73, 0x22, 0xae, 0x01, 0x0a, 0x08, 0x4e, 0x6f, 0x64, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x18, @@ -640,60 +691,51 @@ var file_internal_gokv_proto_rawDesc = []byte{ 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0c, 0x63, 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, - 0x69, 0x6d, 0x65, 0x22, 0x85, 0x01, 0x0a, 0x02, 0x4b, 0x56, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, - 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, - 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, - 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, - 0x6d, 0x70, 0x52, 0x09, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x12, 0x1c, 0x0a, - 0x09, 0x74, 0x6f, 0x6d, 0x62, 0x73, 0x74, 0x6f, 0x6e, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, - 0x52, 0x09, 0x74, 0x6f, 0x6d, 0x62, 0x73, 0x74, 0x6f, 0x6e, 0x65, 0x22, 0x1e, 0x0a, 0x0a, 0x47, - 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x22, 0x2d, 0x0a, 0x0b, 0x47, - 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1e, 0x0a, 0x02, 0x6b, 0x76, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, - 0x6c, 0x70, 0x62, 0x2e, 0x4b, 0x56, 0x52, 0x02, 0x6b, 0x76, 0x22, 0x34, 0x0a, 0x0a, 0x50, 0x75, - 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, - 0x22, 0x0d, 0x0a, 0x0b, 0x50, 0x75, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x21, 0x0a, 0x0d, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, - 0x65, 0x79, 0x22, 0x10, 0x0a, 0x0e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x24, 0x0a, 0x10, 0x4b, 0x65, 0x79, 0x45, 0x78, 0x69, 0x73, 0x74, - 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x22, 0x2a, 0x0a, 0x10, 0x4b, 0x65, - 0x79, 0x45, 0x78, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, - 0x0a, 0x06, 0x65, 0x78, 0x69, 0x73, 0x74, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, - 0x65, 0x78, 0x69, 0x73, 0x74, 0x73, 0x32, 0x85, 0x02, 0x0a, 0x09, 0x4b, 0x56, 0x53, 0x65, 0x72, - 0x76, 0x69, 0x63, 0x65, 0x12, 0x36, 0x0a, 0x03, 0x50, 0x75, 0x74, 0x12, 0x16, 0x2e, 0x69, 0x6e, - 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, - 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x03, - 0x47, 0x65, 0x74, 0x12, 0x16, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, - 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x69, 0x6e, - 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3f, 0x0a, 0x06, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x12, 0x19, - 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x2e, 0x44, 0x65, 0x6c, 0x65, - 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x69, 0x6e, 0x74, 0x65, - 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x47, 0x0a, 0x09, 0x4b, 0x65, 0x79, 0x45, 0x78, 0x69, 0x73, - 0x74, 0x73, 0x12, 0x1c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x2e, - 0x4b, 0x65, 0x79, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x1c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x2e, 0x4b, 0x65, - 0x79, 0x45, 0x78, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x9e, - 0x01, 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, - 0x62, 0x42, 0x09, 0x47, 0x6f, 0x6b, 0x76, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x48, 0x02, 0x50, 0x01, - 0x5a, 0x37, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x74, 0x6f, 0x63, - 0x68, 0x65, 0x6d, 0x65, 0x79, 0x2f, 0x67, 0x6f, 0x6b, 0x76, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, - 0x6e, 0x61, 0x6c, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x3b, 0x69, - 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0xa2, 0x02, 0x03, 0x49, 0x58, 0x58, 0xaa, - 0x02, 0x0a, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0xca, 0x02, 0x0a, 0x49, - 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0xe2, 0x02, 0x16, 0x49, 0x6e, 0x74, 0x65, - 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, - 0x74, 0x61, 0xea, 0x02, 0x0a, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x69, 0x6d, 0x65, 0x22, 0x1e, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, + 0x6b, 0x65, 0x79, 0x22, 0x23, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x34, 0x0a, 0x0a, 0x50, 0x75, 0x74, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x0d, + 0x0a, 0x0b, 0x50, 0x75, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x21, 0x0a, + 0x0d, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, + 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, + 0x22, 0x10, 0x0a, 0x0e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x24, 0x0a, 0x10, 0x4b, 0x65, 0x79, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x22, 0x2a, 0x0a, 0x10, 0x4b, 0x65, 0x79, 0x45, + 0x78, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, + 0x65, 0x78, 0x69, 0x73, 0x74, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x65, 0x78, + 0x69, 0x73, 0x74, 0x73, 0x32, 0x85, 0x02, 0x0a, 0x09, 0x4b, 0x56, 0x53, 0x65, 0x72, 0x76, 0x69, + 0x63, 0x65, 0x12, 0x36, 0x0a, 0x03, 0x50, 0x75, 0x74, 0x12, 0x16, 0x2e, 0x69, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x17, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x2e, 0x50, + 0x75, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x03, 0x47, 0x65, + 0x74, 0x12, 0x16, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x2e, 0x47, + 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x69, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x3f, 0x0a, 0x06, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x12, 0x19, 0x2e, 0x69, + 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, + 0x61, 0x6c, 0x70, 0x62, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x47, 0x0a, 0x09, 0x4b, 0x65, 0x79, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, + 0x12, 0x1c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x2e, 0x4b, 0x65, + 0x79, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, + 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x2e, 0x4b, 0x65, 0x79, 0x45, + 0x78, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x9e, 0x01, 0x0a, + 0x0e, 0x63, 0x6f, 0x6d, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x42, + 0x09, 0x47, 0x6f, 0x6b, 0x76, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x48, 0x02, 0x50, 0x01, 0x5a, 0x37, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x74, 0x6f, 0x63, 0x68, 0x65, + 0x6d, 0x65, 0x79, 0x2f, 0x67, 0x6f, 0x6b, 0x76, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, + 0x6c, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x3b, 0x69, 0x6e, 0x74, + 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0xa2, 0x02, 0x03, 0x49, 0x58, 0x58, 0xaa, 0x02, 0x0a, + 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0xca, 0x02, 0x0a, 0x49, 0x6e, 0x74, + 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0xe2, 0x02, 0x16, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, + 0x61, 0x6c, 0x70, 0x62, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0xea, 0x02, 0x0a, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -708,41 +750,41 @@ func file_internal_gokv_proto_rawDescGZIP() []byte { return file_internal_gokv_proto_rawDescData } -var file_internal_gokv_proto_msgTypes = make([]protoimpl.MessageInfo, 12) +var file_internal_gokv_proto_msgTypes = make([]protoimpl.MessageInfo, 13) var file_internal_gokv_proto_goTypes = []any{ - (*NodeState)(nil), // 0: internalpb.NodeState - (*NodeMeta)(nil), // 1: internalpb.NodeMeta - (*KV)(nil), // 2: internalpb.KV - (*GetRequest)(nil), // 3: internalpb.GetRequest - (*GetResponse)(nil), // 4: internalpb.GetResponse - (*PutRequest)(nil), // 5: internalpb.PutRequest - (*PutResponse)(nil), // 6: internalpb.PutResponse - (*DeleteRequest)(nil), // 7: internalpb.DeleteRequest - (*DeleteResponse)(nil), // 8: internalpb.DeleteResponse - (*KeyExistsRequest)(nil), // 9: internalpb.KeyExistsRequest - (*KeyExistResponse)(nil), // 10: internalpb.KeyExistResponse - nil, // 11: internalpb.NodeState.StateEntry - (*timestamppb.Timestamp)(nil), // 12: google.protobuf.Timestamp + (*Entry)(nil), // 0: internalpb.Entry + (*NodeState)(nil), // 1: internalpb.NodeState + (*FSM)(nil), // 2: internalpb.FSM + (*NodeMeta)(nil), // 3: internalpb.NodeMeta + (*GetRequest)(nil), // 4: internalpb.GetRequest + (*GetResponse)(nil), // 5: internalpb.GetResponse + (*PutRequest)(nil), // 6: internalpb.PutRequest + (*PutResponse)(nil), // 7: internalpb.PutResponse + (*DeleteRequest)(nil), // 8: internalpb.DeleteRequest + (*DeleteResponse)(nil), // 9: internalpb.DeleteResponse + (*KeyExistsRequest)(nil), // 10: internalpb.KeyExistsRequest + (*KeyExistResponse)(nil), // 11: internalpb.KeyExistResponse + nil, // 12: internalpb.NodeState.EntriesEntry + (*timestamppb.Timestamp)(nil), // 13: google.protobuf.Timestamp } var file_internal_gokv_proto_depIdxs = []int32{ - 11, // 0: internalpb.NodeState.state:type_name -> internalpb.NodeState.StateEntry - 12, // 1: internalpb.NodeMeta.creation_time:type_name -> google.protobuf.Timestamp - 12, // 2: internalpb.KV.created_at:type_name -> google.protobuf.Timestamp - 2, // 3: internalpb.GetResponse.kv:type_name -> internalpb.KV - 2, // 4: internalpb.NodeState.StateEntry.value:type_name -> internalpb.KV - 5, // 5: internalpb.KVService.Put:input_type -> internalpb.PutRequest - 3, // 6: internalpb.KVService.Get:input_type -> internalpb.GetRequest - 7, // 7: internalpb.KVService.Delete:input_type -> internalpb.DeleteRequest - 9, // 8: internalpb.KVService.KeyExists:input_type -> internalpb.KeyExistsRequest - 6, // 9: internalpb.KVService.Put:output_type -> internalpb.PutResponse - 4, // 10: internalpb.KVService.Get:output_type -> internalpb.GetResponse - 8, // 11: internalpb.KVService.Delete:output_type -> internalpb.DeleteResponse - 10, // 12: internalpb.KVService.KeyExists:output_type -> internalpb.KeyExistResponse - 9, // [9:13] is the sub-list for method output_type - 5, // [5:9] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 12, // 0: internalpb.NodeState.entries:type_name -> internalpb.NodeState.EntriesEntry + 1, // 1: internalpb.FSM.node_states:type_name -> internalpb.NodeState + 13, // 2: internalpb.NodeMeta.creation_time:type_name -> google.protobuf.Timestamp + 0, // 3: internalpb.NodeState.EntriesEntry.value:type_name -> internalpb.Entry + 6, // 4: internalpb.KVService.Put:input_type -> internalpb.PutRequest + 4, // 5: internalpb.KVService.Get:input_type -> internalpb.GetRequest + 8, // 6: internalpb.KVService.Delete:input_type -> internalpb.DeleteRequest + 10, // 7: internalpb.KVService.KeyExists:input_type -> internalpb.KeyExistsRequest + 7, // 8: internalpb.KVService.Put:output_type -> internalpb.PutResponse + 5, // 9: internalpb.KVService.Get:output_type -> internalpb.GetResponse + 9, // 10: internalpb.KVService.Delete:output_type -> internalpb.DeleteResponse + 11, // 11: internalpb.KVService.KeyExists:output_type -> internalpb.KeyExistResponse + 8, // [8:12] is the sub-list for method output_type + 4, // [4:8] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name } func init() { file_internal_gokv_proto_init() } @@ -752,7 +794,7 @@ func file_internal_gokv_proto_init() { } if !protoimpl.UnsafeEnabled { file_internal_gokv_proto_msgTypes[0].Exporter = func(v any, i int) any { - switch v := v.(*NodeState); i { + switch v := v.(*Entry); i { case 0: return &v.state case 1: @@ -764,7 +806,7 @@ func file_internal_gokv_proto_init() { } } file_internal_gokv_proto_msgTypes[1].Exporter = func(v any, i int) any { - switch v := v.(*NodeMeta); i { + switch v := v.(*NodeState); i { case 0: return &v.state case 1: @@ -776,7 +818,7 @@ func file_internal_gokv_proto_init() { } } file_internal_gokv_proto_msgTypes[2].Exporter = func(v any, i int) any { - switch v := v.(*KV); i { + switch v := v.(*FSM); i { case 0: return &v.state case 1: @@ -788,7 +830,7 @@ func file_internal_gokv_proto_init() { } } file_internal_gokv_proto_msgTypes[3].Exporter = func(v any, i int) any { - switch v := v.(*GetRequest); i { + switch v := v.(*NodeMeta); i { case 0: return &v.state case 1: @@ -800,7 +842,7 @@ func file_internal_gokv_proto_init() { } } file_internal_gokv_proto_msgTypes[4].Exporter = func(v any, i int) any { - switch v := v.(*GetResponse); i { + switch v := v.(*GetRequest); i { case 0: return &v.state case 1: @@ -812,7 +854,7 @@ func file_internal_gokv_proto_init() { } } file_internal_gokv_proto_msgTypes[5].Exporter = func(v any, i int) any { - switch v := v.(*PutRequest); i { + switch v := v.(*GetResponse); i { case 0: return &v.state case 1: @@ -824,7 +866,7 @@ func file_internal_gokv_proto_init() { } } file_internal_gokv_proto_msgTypes[6].Exporter = func(v any, i int) any { - switch v := v.(*PutResponse); i { + switch v := v.(*PutRequest); i { case 0: return &v.state case 1: @@ -836,7 +878,7 @@ func file_internal_gokv_proto_init() { } } file_internal_gokv_proto_msgTypes[7].Exporter = func(v any, i int) any { - switch v := v.(*DeleteRequest); i { + switch v := v.(*PutResponse); i { case 0: return &v.state case 1: @@ -848,7 +890,7 @@ func file_internal_gokv_proto_init() { } } file_internal_gokv_proto_msgTypes[8].Exporter = func(v any, i int) any { - switch v := v.(*DeleteResponse); i { + switch v := v.(*DeleteRequest); i { case 0: return &v.state case 1: @@ -860,7 +902,7 @@ func file_internal_gokv_proto_init() { } } file_internal_gokv_proto_msgTypes[9].Exporter = func(v any, i int) any { - switch v := v.(*KeyExistsRequest); i { + switch v := v.(*DeleteResponse); i { case 0: return &v.state case 1: @@ -872,6 +914,18 @@ func file_internal_gokv_proto_init() { } } file_internal_gokv_proto_msgTypes[10].Exporter = func(v any, i int) any { + switch v := v.(*KeyExistsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_gokv_proto_msgTypes[11].Exporter = func(v any, i int) any { switch v := v.(*KeyExistResponse); i { case 0: return &v.state @@ -884,13 +938,14 @@ func file_internal_gokv_proto_init() { } } } + file_internal_gokv_proto_msgTypes[0].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_internal_gokv_proto_rawDesc, NumEnums: 0, - NumMessages: 12, + NumMessages: 13, NumExtensions: 0, NumServices: 1, }, diff --git a/internal/lib/lib.go b/internal/lib/lib.go index 1c4474f..002f076 100644 --- a/internal/lib/lib.go +++ b/internal/lib/lib.go @@ -46,3 +46,8 @@ func Pause(duration time.Duration) { func HostPort(host string, port int) string { return net.JoinHostPort(host, strconv.Itoa(port)) } + +// Ptr creates a pointer of a primitive +func Ptr[T any](v T) *T { + return &v +} diff --git a/protos/internal/gokv.proto b/protos/internal/gokv.proto index c3adb2e..5aeb5c1 100644 --- a/protos/internal/gokv.proto +++ b/protos/internal/gokv.proto @@ -18,11 +18,27 @@ service KVService { rpc KeyExists(KeyExistsRequest) returns (KeyExistResponse); } +// Entry represents the key/value pair +message Entry { + // Specifies the value + bytes value = 1; + // States whether it is archived or not + optional bool archived = 3; +} + // NodeState defines the node state // This will be distributed in the cluster message NodeState { - // Specifies the node state - map state = 1; + // Specifies the nodeId + string node_id = 1; + // Specifies the entries + map entries = 2; +} + +// FSM defines the delegate FSM +message FSM { + // Specifies the list of node states + repeated NodeState node_states = 1; } // NodeMeta defines the node metadata @@ -39,18 +55,6 @@ message NodeMeta { google.protobuf.Timestamp creation_time = 5; } -// KV defines the key/value pair -message KV { - // Specifies the key - string key = 1; - // Specifies the value - bytes value = 2; - // Specifies the creation time timestamp - google.protobuf.Timestamp created_at = 3; - // States that given is marked for tombstone - bool tombstone = 4; -} - // GetRequest is used to fetch the value of a given key message GetRequest { // Specifies the key @@ -60,7 +64,7 @@ message GetRequest { // GetResponse is the response to GetRequest message GetResponse { // Specifies the KV - KV kv = 1; + bytes value = 1; } // PutRequest is used to distribute a given key/value