Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add kubernetes discovery provider #18

Merged
merged 1 commit into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
Simple Distributed in-memory key/value store. GoKV provides high availability and fault tolerance which makes it suitable large-scale applications system without sacrificing performance and reliability.
With GoKV, you can instantly create a fast, scalable, distributed system across a cluster of computers.


## Installation

```bash
Expand All @@ -17,21 +16,19 @@ go get github.com/tochemey/gokv
- Robust APIs to manipulate key/value pairs. See: [APIs](#apis)
- Discovery API to implement custom nodes discovery provider. See: [Discovery API](./discovery/provider.go)
- Comes bundled with some discovery providers that can help you hit the ground running:
- Kubernetes provider [Soon]
- [kubernetes](https://kubernetes.io/docs/home/) [api integration](./discovery/kubernetes) is fully functional
- [NATS](https://nats.io/) [integration](./discovery/nats) is fully functional
- [Static](./discovery/static) is fully functional and for demo purpose
- [DNS](./discovery/dnssd) is fully functional
- Built-in [client](./cluster/client.go) to interact with the distributed store via the following apis:
- `Put`: create key/value pair that is eventually distributed in the cluster of nodes. The `key` is a string and the `value` is a byte array.
- `Get`: retrieves the value of a given `key` from the cluster of nodes.
- `Delete`: delete a given `key` from the cluster. At the moment the `key` is marked to be `deleted`.
- `Exists`: check the existence of a given `key` in the cluster.

## APIs

- `Put`: create key/value pair that is eventually distributed in the cluster of nodes. The `key` is a string and the `value` is a byte array.
- `Get`: retrieves the value of a given `key` from the cluster of nodes.
- `Delete`: delete a given `key` from the cluster. At the moment the `key` is marked to be `deleted`.
- `Exists`: check the existence of a given `key` in the cluster.

## Discovery Providers
## Builtin Discovery Providers

### NATS Discovery Provider Setup
### NATS

To use the NATS discovery provider one needs to provide the following:

Expand All @@ -43,7 +40,7 @@ To use the NATS discovery provider one needs to provide the following:
- `DiscoveryPort`: the discovery port of the running node
- `Host`: the host address of the running node

#### DNS Provider Setup
### DNS

This provider performs nodes discovery based upon the domain name provided. This is very useful when doing local development
using docker.
Expand All @@ -53,7 +50,16 @@ To use the DNS discovery provider one needs to provide the following:
- `DomainName`: the domain name
- `IPv6`: it states whether to lookup for IPv6 addresses.

#### Static Provider Setup
### Static

This provider performs nodes discovery based upon the list of static hosts addresses.
The address of each host is the form of `host:port` where `port` is the discovery port.
The address of each host is the form of `host:port` where `port` is the discovery port.

### Kubernetes

To get the kubernetes discovery working as expected, the following need to be set in the manifest files:

- `Namespace`: the kubernetes namespace
- `DiscoveryPortName`: the discovery port name
- `PortName`: the client port name. This port is used by the built-in cluster client for the various operations on the key/value pair distributed store
- `PodLabels`: the POD labels
49 changes: 49 additions & 0 deletions discovery/kubernetes/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* MIT License
*
* Copyright (c) 2022-2024 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package kubernetes

import "github.com/tochemey/gokv/internal/validation"

// Config defines the kubernetes discovery configuration
type Config struct {
// Namespace specifies the kubernetes namespace
Namespace string
// DiscoveryPortName specifies the discovery port name
DiscoveryPortName string
// PortName specifies the client port name
PortName string
// PodLabels specifies the pod labels
PodLabels map[string]string
}

// Validate checks whether the given discovery configuration is valid
func (x Config) Validate() error {
return validation.New(validation.FailFast()).
AddValidator(validation.NewEmptyStringValidator("Namespace", x.Namespace)).
AddValidator(validation.NewEmptyStringValidator("DiscoveryPortName", x.DiscoveryPortName)).
AddValidator(validation.NewEmptyStringValidator("PortName", x.PortName)).
AddAssertion(len(x.PodLabels) > 0, "PodLabels are required").
Validate()
}
52 changes: 52 additions & 0 deletions discovery/kubernetes/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* MIT License
*
* Copyright (c) 2022-2024 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package kubernetes

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestConfig(t *testing.T) {
t.Run("With valid configuration", func(t *testing.T) {
config := &Config{
Namespace: "namespace",
DiscoveryPortName: "discoveryPortName",
PortName: "portName",
PodLabels: map[string]string{
"label1": "value1",
},
}
assert.NoError(t, config.Validate())
})
t.Run("With invalid configuration", func(t *testing.T) {
config := &Config{
Namespace: "namespace",
DiscoveryPortName: "",
}
assert.Error(t, config.Validate())
})
}
182 changes: 182 additions & 0 deletions discovery/kubernetes/discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* MIT License
*
* Copyright (c) 2022-2024 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package kubernetes

import (
"context"
"fmt"
"net"
"strconv"
"sync"

goset "github.com/deckarep/golang-set/v2"
"go.uber.org/atomic"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/utils/strings/slices"

"github.com/tochemey/gokv/discovery"
)

// Discovery represents the kubernetes discovery
type Discovery struct {
config *Config
client kubernetes.Interface
mu sync.Mutex

stopChan chan struct{}
// states whether the actor system has started or not
initialized *atomic.Bool
}

// enforce compilation error
var _ discovery.Provider = &Discovery{}

// NewDiscovery returns an instance of the kubernetes discovery provider
func NewDiscovery(config *Config) *Discovery {
// create an instance of
discovery := &Discovery{
mu: sync.Mutex{},
stopChan: make(chan struct{}, 1),
initialized: atomic.NewBool(false),
config: config,
}

return discovery
}

// ID returns the discovery provider id
func (d *Discovery) ID() string {
return "kubernetes"
}

// Initialize initializes the plugin: registers some internal data structures, clients etc.
func (d *Discovery) Initialize() error {
d.mu.Lock()
defer d.mu.Unlock()

if d.initialized.Load() {
return discovery.ErrAlreadyInitialized
}

return d.config.Validate()
}

// Register registers this node to a service discovery directory.
func (d *Discovery) Register() error {
d.mu.Lock()
defer d.mu.Unlock()

if d.initialized.Load() {
return discovery.ErrAlreadyRegistered
}

config, err := rest.InClusterConfig()
if err != nil {
return fmt.Errorf("failed to get the in-cluster config of the kubernetes provider: %w", err)
}

client, err := kubernetes.NewForConfig(config)
if err != nil {
return fmt.Errorf("failed to create the kubernetes client api: %w", err)
}

d.client = client
d.initialized = atomic.NewBool(true)
return nil
}

// Deregister removes this node from a service discovery directory.
func (d *Discovery) Deregister() error {
d.mu.Lock()
defer d.mu.Unlock()

if !d.initialized.Load() {
return discovery.ErrNotInitialized
}
d.initialized = atomic.NewBool(false)
close(d.stopChan)
return nil
}

// DiscoverPeers returns a list of known nodes.
func (d *Discovery) DiscoverPeers() ([]string, error) {
if !d.initialized.Load() {
return nil, discovery.ErrNotInitialized
}

ctx := context.Background()

pods, err := d.client.CoreV1().Pods(d.config.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(d.config.PodLabels).String(),
})

if err != nil {
return nil, err
}

validPortNames := []string{d.config.DiscoveryPortName, d.config.PortName}

// define the addresses list
addresses := goset.NewSet[string]()

MainLoop:
for _, pod := range pods.Items {
pod := pod

if pod.Status.Phase != corev1.PodRunning {
continue MainLoop
}
// If there is a Ready condition available, we need that to be true.
// If no ready condition is set, then we accept this pod regardless.
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodReady && condition.Status != corev1.ConditionTrue {
continue MainLoop
}
}

// iterate the pod containers and find the named port
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if !slices.Contains(validPortNames, port.Name) {
continue
}

if port.Name == d.config.DiscoveryPortName {
addresses.Add(net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(int(port.ContainerPort))))
}
}
}
}
return addresses.ToSlice(), nil
}

// Close closes the provider
func (d *Discovery) Close() error {
return nil
}
Loading