From d74d77087e7aea119dd2dc1be9679f3022f1a1e8 Mon Sep 17 00:00:00 2001 From: Arsene Date: Mon, 16 Sep 2024 14:59:32 +0100 Subject: [PATCH] feat: add additional utility methods to client code (#26) --- README.md | 38 +++++++++++++++++------------- cluster/client.go | 60 +++++++++++++++++++++++++++++++++++++++++++++-- cluster/codec.go | 33 ++++++++++++++++++++++++++ cluster/node.go | 2 +- 4 files changed, 114 insertions(+), 19 deletions(-) create mode 100644 cluster/codec.go diff --git a/README.md b/README.md index 0180fc0..393930a 100644 --- a/README.md +++ b/README.md @@ -1,45 +1,51 @@ -# Go-KV +## Go-KV [![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/Tochemey/gokv/build.yml)]((https://github.com/Tochemey/gokv/actions/workflows/build.yml)) Simple Distributed in-memory key/value store. GoKV provides high availability and fault tolerance which makes it suitable large-scale applications system without sacrificing performance and reliability. With GoKV, you can instantly create a fast, scalable, distributed system across a cluster of computers. -## Installation +### Installation ```bash go get github.com/tochemey/gokv ``` -## Features +### Features - Go-KV uses the push-pull anti-entropy method to replicate nodes' state across the cluster. This approach makes Go-KV eventually consistent. One can set the [`stateSyncInterval`](./cluster/config.go) value to low for frequent state synchronisation at a network cost. - Discovery API to implement custom nodes discovery provider. See: [Discovery API](./discovery/provider.go) - Comes bundled with some discovery providers that can help you hit the ground running: - [kubernetes](https://kubernetes.io/docs/home/) [api integration](./discovery/kubernetes) is fully functional - - [NATS](https://nats.io/) [integration](./discovery/nats) is fully functional - - [Static](./discovery/static) is fully functional and for demo purpose - - [DNS](./discovery/dnssd) is fully functional -- Built-in [Client](./cluster/client.go) to interact with the distributed store via the following apis: + - [nats](https://nats.io/) [integration](./discovery/nats) is fully functional + - [static](./discovery/static) is fully functional and for demo purpose + - [dns](./discovery/dnssd) is fully functional +- Built-in [client](./cluster/client.go) to interact with the distributed store via the following apis: - `Put`: create key/value pair that is eventually distributed in the cluster of nodes. The `key` is a string and the `value` is a byte array. + - `PutProto`: to create a key/value pair where the value is a protocol buffer message + - `PutString`: to create a key/value pair where the value is a string + - `PutAny`: to create a key/value pair with a given [`Codec`](./cluster/codec.go) to encode the value type. - `Get`: retrieves the value of a given `key` from the cluster of nodes. + - `GetProto`: retrieves a protocol buffer message for a given `key`. This requires `PutProto` or `Put` to be used to set the value. + - `GetString`: retrieves a string value for a given `key`. This requires `PutString` or `Put` to be used to set the value. + - `GetAny`: retrieves any value type for a given `key`. This requires `PutAny` to be used to set the value. - `Exists`: check the existence of a given `key` in the cluster. - `Delete`: delete a given `key` from the cluster. At the moment the `key` is marked to be `archived`. -## Use Cases +### Use Cases - Distributed cache -## Example +### Example There is an example on how to use it with NATs [here](./example/example.go) -## Builtin Discovery +### Builtin Discovery -### NATS +#### nats -To use the NATS discovery provider one needs to provide the following: +To use the [nats](https://nats.io/) discovery provider one needs to provide the following: - `Server`: the NATS Server address - `Subject`: the NATS subject to use @@ -49,7 +55,7 @@ To use the NATS discovery provider one needs to provide the following: - `DiscoveryPort`: the discovery port of the running node - `Host`: the host address of the running node -### DNS +#### dns This provider performs nodes discovery based upon the domain name provided. This is very useful when doing local development using docker. @@ -59,14 +65,14 @@ To use the DNS discovery provider one needs to provide the following: - `DomainName`: the domain name - `IPv6`: it states whether to lookup for IPv6 addresses. -### Static +#### static This provider performs nodes discovery based upon the list of static hosts addresses. The address of each host is the form of `host:port` where `port` is the discovery port. -### Kubernetes +#### kubernetes -To get the kubernetes discovery working as expected, the following need to be set in the manifest files: +To get the [kubernetes](https://kubernetes.io/docs/home/) discovery working as expected, the following need to be set in the manifest files: - `Namespace`: the kubernetes namespace - `DiscoveryPortName`: the discovery port name diff --git a/cluster/client.go b/cluster/client.go index 7c24924..c021828 100644 --- a/cluster/client.go +++ b/cluster/client.go @@ -31,6 +31,7 @@ import ( "connectrpc.com/connect" "go.uber.org/atomic" + "google.golang.org/protobuf/proto" "github.com/tochemey/gokv/internal/http" "github.com/tochemey/gokv/internal/internalpb" @@ -61,6 +62,30 @@ func (client *Client) Put(ctx context.Context, key string, value []byte, expirat return err } +// PutProto creates a key/value pair where the value is a proto message and distributes in the cluster +func (client *Client) PutProto(ctx context.Context, key string, value proto.Message, expiration time.Duration) error { + bytea, err := proto.Marshal(value) + if err != nil { + return err + } + return client.Put(ctx, key, bytea, expiration) +} + +// PutString creates a key/value pair where the value is a string and distributes in the cluster +func (client *Client) PutString(ctx context.Context, key string, value string, expiration time.Duration) error { + return client.Put(ctx, key, []byte(value), expiration) +} + +// PutAny distributes the key/value pair in the cluster. +// A binary encoder is required to properly encode the value. +func (client *Client) PutAny(ctx context.Context, key string, value any, expiration time.Duration, codec Codec) error { + bytea, err := codec.Encode(value) + if err != nil { + return err + } + return client.Put(ctx, key, bytea, expiration) +} + // Get retrieves the value of the given key from the cluster func (client *Client) Get(ctx context.Context, key string) ([]byte, error) { if !client.connected.Load() { @@ -82,6 +107,37 @@ func (client *Client) Get(ctx context.Context, key string) ([]byte, error) { return response.Msg.GetValue(), nil } +// GetProto retrieves the value of the given from the cluster as protocol buffer message +// Prior to calling this method one must set a proto message as the value of the key +func (client *Client) GetProto(ctx context.Context, key string, dst proto.Message) error { + bytea, err := client.Get(ctx, key) + if err != nil { + return err + } + return proto.Unmarshal(bytea, dst) +} + +// GetString retrieves the value of the given from the cluster as a string +// Prior to calling this method one must set a string as the value of the key +func (client *Client) GetString(ctx context.Context, key string, dst string) error { + bytea, err := client.Get(ctx, key) + if err != nil { + return err + } + dst = string(bytea) + return nil +} + +// GetAny retrieves the value of the given from the cluster +// Prior to calling this method one must set a string as the value of the key +func (client *Client) GetAny(ctx context.Context, key string, codec Codec) (any, error) { + bytea, err := client.Get(ctx, key) + if err != nil { + return nil, err + } + return codec.Decode(bytea) +} + // Delete deletes a given key from the cluster // nolint func (client *Client) Delete(ctx context.Context, key string) error { @@ -114,8 +170,8 @@ func (client *Client) Exists(ctx context.Context, key string) (bool, error) { return response.Msg.GetExists(), nil } -// Close closes the client connection to the cluster -func (client *Client) Close() error { +// close closes the client connection to the cluster +func (client *Client) close() error { // no-op when the client is not connected if !client.connected.Load() { return nil diff --git a/cluster/codec.go b/cluster/codec.go new file mode 100644 index 0000000..9e79c67 --- /dev/null +++ b/cluster/codec.go @@ -0,0 +1,33 @@ +/* + * MIT License + * + * Copyright (c) 2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package cluster + +// Codec will be implemented to encode and decode message +type Codec interface { + // Encode encodes the receiver into a binary form and returns the result. + Encode(any) ([]byte, error) + // Decode decodes a binary message + Decode([]byte) (any, error) +} diff --git a/cluster/node.go b/cluster/node.go index 02ee1df..c381c5d 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -167,7 +167,7 @@ func (node *Node) Stop(ctx context.Context) error { if err := errorschain. New(errorschain.ReturnFirst()). - AddError(node.clusterClient.Close()). + AddError(node.clusterClient.close()). AddError(node.memberlist.Leave(node.config.shutdownTimeout)). AddError(node.config.provider.Deregister()). AddError(node.config.provider.Close()).