Skip to content

Commit

Permalink
dynamic host volumes: unique volume name per node (#24748)
Browse files Browse the repository at this point in the history
a node can have only one volume with a given name.

the scheduler prevents duplicates, but can only
do so after the server knows about the volume.
this prevents multiple concurrent creates being
called faster than the fingerprint/heartbeat interval.

users may still modify an existing volume only
if they set the `id` in the volume spec and
re-issue `nomad volume create`

if a *static* vol is added to config with a name
already being used by a dynamic volume, the
dynamic takes precedence, but log a warning.
  • Loading branch information
gulducat authored Jan 6, 2025
1 parent 4594539 commit a9ee66a
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 20 deletions.
1 change: 1 addition & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
}

c.batchNodeUpdates = newBatchNodeUpdates(
c.logger,
c.updateNodeFromDriver,
c.updateNodeFromDevices,
c.updateNodeFromCSI,
Expand Down
57 changes: 56 additions & 1 deletion client/hostvolumemanager/host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
var (
ErrPluginNotExists = errors.New("no such plugin")
ErrPluginNotExecutable = errors.New("plugin not executable")
ErrVolumeNameExists = errors.New("volume name already exists on this node")
)

// HostVolumeStateManager manages the lifecycle of volumes in client state.
Expand Down Expand Up @@ -53,6 +54,7 @@ type HostVolumeManager struct {
stateMgr HostVolumeStateManager
updateNodeVols HostVolumeNodeUpdater
builtIns map[string]HostVolumePlugin
locker *volLocker
log hclog.Logger
}

Expand All @@ -71,7 +73,8 @@ func NewHostVolumeManager(logger hclog.Logger, config Config) *HostVolumeManager
log: logger.With("plugin_id", HostVolumePluginMkdirID),
},
},
log: logger,
locker: &volLocker{},
log: logger,
}
}

Expand All @@ -85,8 +88,14 @@ func (hvm *HostVolumeManager) Create(ctx context.Context,
return nil, err
}

// can't have two of the same volume name w/ different IDs per client node
if err := hvm.locker.lock(req.Name, req.ID); err != nil {
return nil, err
}

pluginResp, err := plug.Create(ctx, req)
if err != nil {
hvm.locker.release(req.Name)
return nil, err
}

Expand All @@ -109,6 +118,8 @@ func (hvm *HostVolumeManager) Create(ctx context.Context,
hvm.log.Warn("error deleting volume after state store failure", "volume_id", req.ID, "error", delErr)
err = multierror.Append(err, delErr)
}
// free up the volume name whether delete succeeded or not.
hvm.locker.release(req.Name)
return nil, helper.FlattenMultierror(err)
}

Expand Down Expand Up @@ -144,6 +155,9 @@ func (hvm *HostVolumeManager) Delete(ctx context.Context,
return nil, err // bail so a user may retry
}

// free up volume name for reuse
hvm.locker.release(req.Name)

hvm.updateNodeVols(req.Name, nil)

resp := &cstructs.ClientHostVolumeDeleteResponse{
Expand Down Expand Up @@ -191,6 +205,21 @@ func (hvm *HostVolumeManager) restoreFromState(ctx context.Context) (VolumeMap,
return err
}

// lock the name so future creates can't produce duplicates.
err = hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID)
// state should never have duplicate vol names, and restore happens
// prior to node registration, so new creates shouldn't come in
// concurrently, but check for error just in case.
if err != nil {
hvm.log.Error("error during restore",
"volume_name", vol.CreateReq.Name,
"volume_id", vol.CreateReq.ID,
"error", err)
// don't stop the world if it does happen, because an admin
// couldn't do anything about it short of wiping client state.
return nil
}

resp, err := plug.Create(ctx, vol.CreateReq)
if err != nil {
// plugin execution errors are only logged
Expand Down Expand Up @@ -221,3 +250,29 @@ func genVolConfig(req *cstructs.ClientHostVolumeCreateRequest, resp *HostVolumeP
ReadOnly: false,
}
}

// volLocker is used to ensure that volumes on each node are unique by name.
// The volume scheduler will prevent this too, but only after node fingerprint,
// so we need to protect against concurrent duplicate creates.
type volLocker struct {
locks sync.Map
}

// lock the provided name, error if it was already locked with a different ID
func (l *volLocker) lock(name, id string) error {
current, exists := l.locks.LoadOrStore(name, id)
if exists && id != current.(string) {
return ErrVolumeNameExists
}
return nil
}

func (l *volLocker) release(name string) {
l.locks.Delete(name)
}

// only used in tests to assert lock state
func (l *volLocker) isLocked(name string) bool {
_, locked := l.locks.Load(name)
return locked
}
62 changes: 53 additions & 9 deletions client/hostvolumemanager/host_volumes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestHostVolumeManager(t *testing.T) {
tmp := t.TempDir()
errDB := &cstate.ErrDB{}
memDB := cstate.NewMemDB(log)
node := newFakeNode()
node := newFakeNode(t)

hvm := NewHostVolumeManager(log, Config{
PluginDir: "./test_fixtures",
Expand All @@ -43,9 +43,10 @@ func TestHostVolumeManager(t *testing.T) {

t.Run("create", func(t *testing.T) {
// plugin doesn't exist
name := "vol-name"
req := &cstructs.ClientHostVolumeCreateRequest{
Name: name,
ID: "vol-id",
Name: "vol-name",
PluginID: "nope",

RequestedCapacityMinBytes: 5,
Expand All @@ -58,62 +59,90 @@ func TestHostVolumeManager(t *testing.T) {
plug.createErr = errors.New("sad create")
_, err = hvm.Create(ctx, req)
must.ErrorIs(t, err, plug.createErr)
assertNotLocked(t, hvm, name)
plug.reset()

// error saving state, then error from cleanup attempt
plug.deleteErr = errors.New("sad delete")
_, err = hvm.Create(ctx, req)
must.ErrorIs(t, err, cstate.ErrDBError)
must.ErrorIs(t, err, plug.deleteErr)
assertNotLocked(t, hvm, name)
plug.reset()

// error saving state, successful cleanup
_, err = hvm.Create(ctx, req)
must.ErrorIs(t, err, cstate.ErrDBError)
must.Eq(t, "vol-id", plug.deleted)
assertNotLocked(t, hvm, name)
plug.reset()

// happy path
hvm.stateMgr = memDB
resp, err := hvm.Create(ctx, req)
must.NoError(t, err)
must.Eq(t, &cstructs.ClientHostVolumeCreateResponse{
expectResp := &cstructs.ClientHostVolumeCreateResponse{
VolumeName: "vol-name",
VolumeID: "vol-id",
HostPath: tmp,
CapacityBytes: 5,
}, resp)
}
must.Eq(t, expectResp, resp)
stateDBs, err := memDB.GetDynamicHostVolumes()
must.NoError(t, err)
// should be saved to state
must.Len(t, 1, stateDBs)
must.Eq(t, "vol-id", stateDBs[0].ID)
must.Eq(t, "vol-id", stateDBs[0].CreateReq.ID)
// should be registered with node
must.MapContainsKey(t, node.vols, "vol-name", must.Sprintf("no vol-name in %+v", node.vols))
must.MapContainsKey(t, node.vols, name, must.Sprintf("no %q in %+v", name, node.vols))
assertLocked(t, hvm, name)

// repeat create with same ID but different size may update the volume
req.RequestedCapacityMinBytes = 10
expectResp.CapacityBytes = 10
resp, err = hvm.Create(ctx, req)
must.NoError(t, err)
must.Eq(t, expectResp, resp)

// duplicate create with the same vol name but different ID should fail
_, err = hvm.Create(ctx, &cstructs.ClientHostVolumeCreateRequest{
Name: name,
ID: "different-vol-id",
PluginID: "test-plugin",
})
must.ErrorIs(t, err, ErrVolumeNameExists)
})

// despite being a subtest, this needs to run after "create"
t.Run("delete", func(t *testing.T) {
name := "vol-name"
// should be locked from "create" above
assertLocked(t, hvm, name)

// plugin doesn't exist
req := &cstructs.ClientHostVolumeDeleteRequest{
Name: name,
ID: "vol-id",
Name: "vol-name",
PluginID: "nope",
}
_, err := hvm.Delete(ctx, req)
must.ErrorIs(t, err, ErrPluginNotExists)
assertLocked(t, hvm, name)

// error from plugin
req.PluginID = "test-plugin"
plug.deleteErr = errors.New("sad delete")
_, err = hvm.Delete(ctx, req)
must.ErrorIs(t, err, plug.deleteErr)
assertLocked(t, hvm, name)
plug.reset()

// error saving state
hvm.stateMgr = errDB
_, err = hvm.Delete(ctx, req)
must.ErrorIs(t, err, cstate.ErrDBError)
assertLocked(t, hvm, name)

// happy path
// add stuff that should be deleted
Expand All @@ -136,6 +165,7 @@ func TestHostVolumeManager(t *testing.T) {
stateVols, err := memDB.GetDynamicHostVolumes()
must.NoError(t, err)
must.Nil(t, stateVols, must.Sprint("vols should be deleted from state"))
assertNotLocked(t, hvm, name)
})
}

Expand Down Expand Up @@ -181,6 +211,16 @@ func (p *fakePlugin) Delete(_ context.Context, req *cstructs.ClientHostVolumeDel
return nil
}

func assertLocked(t *testing.T, hvm *HostVolumeManager, name string) {
t.Helper()
must.True(t, hvm.locker.isLocked(name), must.Sprintf("vol name %q should be locked", name))
}

func assertNotLocked(t *testing.T, hvm *HostVolumeManager, name string) {
t.Helper()
must.False(t, hvm.locker.isLocked(name), must.Sprintf("vol name %q should not be locked", name))
}

func TestHostVolumeManager_restoreFromState(t *testing.T) {
log := testlog.HCLogger(t)
vol := &cstructs.HostVolumeState{
Expand All @@ -191,7 +231,7 @@ func TestHostVolumeManager_restoreFromState(t *testing.T) {
PluginID: "mkdir",
},
}
node := newFakeNode()
node := newFakeNode(t)

t.Run("no vols", func(t *testing.T) {
state := cstate.NewMemDB(log)
Expand Down Expand Up @@ -235,6 +275,8 @@ func TestHostVolumeManager_restoreFromState(t *testing.T) {
must.Eq(t, expect, vols)

must.DirExists(t, volPath)

assertLocked(t, hvm, "test-vol-name")
})

t.Run("state error", func(t *testing.T) {
Expand Down Expand Up @@ -290,15 +332,17 @@ func TestHostVolumeManager_restoreFromState(t *testing.T) {

type fakeNode struct {
vols VolumeMap
log hclog.Logger
}

func (n *fakeNode) updateVol(name string, volume *structs.ClientHostVolumeConfig) {
UpdateVolumeMap(n.vols, name, volume)
UpdateVolumeMap(n.log, n.vols, name, volume)
}

func newFakeNode() *fakeNode {
func newFakeNode(t *testing.T) *fakeNode {
return &fakeNode{
vols: make(VolumeMap),
log: testlog.HCLogger(t),
}
}

Expand Down
9 changes: 8 additions & 1 deletion client/hostvolumemanager/volume_fingerprint.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package hostvolumemanager
import (
"context"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand All @@ -24,14 +25,20 @@ type VolumeMap map[string]*structs.ClientHostVolumeConfig
//
// Since it may mutate the map, the caller should make a copy
// or acquire a lock as appropriate for their context.
func UpdateVolumeMap(volumes VolumeMap, name string, vol *structs.ClientHostVolumeConfig) (changed bool) {
func UpdateVolumeMap(log hclog.Logger, volumes VolumeMap, name string, vol *structs.ClientHostVolumeConfig) (changed bool) {
current, exists := volumes[name]
if vol == nil {
if exists {
delete(volumes, name)
changed = true
}
} else {
// if the volume already exists with no ID, it will be because it was
// added to client agent config after having been previously created
// as a dynamic vol. dynamic takes precedence, but log a warning.
if exists && current.ID == "" {
log.Warn("overriding static host volume with dynamic", "name", name, "id", vol.ID)
}
if !exists || !vol.Equal(current) {
volumes[name] = vol
changed = true
Expand Down
28 changes: 22 additions & 6 deletions client/hostvolumemanager/volume_fingerprint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ func TestUpdateVolumeMap(t *testing.T) {

expectMap VolumeMap
expectChange bool

expectLog string
}{
{
name: "delete absent",
Expand Down Expand Up @@ -57,20 +59,33 @@ func TestUpdateVolumeMap(t *testing.T) {
expectChange: false,
},
{
// this should not happen, but test anyway
// this should not happen with dynamic vols, but test anyway
name: "change present",
vols: VolumeMap{"changeme": {Path: "before"}},
vols: VolumeMap{"changeme": {ID: "before"}},
volName: "changeme",
vol: &structs.ClientHostVolumeConfig{Path: "after"},
expectMap: VolumeMap{"changeme": {Path: "after"}},
vol: &structs.ClientHostVolumeConfig{ID: "after"},
expectMap: VolumeMap{"changeme": {ID: "after"}},
expectChange: true,
},
{
// this should only happen during agent start, if a static vol has
// been added to config after a previous dynamic vol was created
// with the same name.
name: "override static",
vols: VolumeMap{"overrideme": {ID: ""}}, // static vols have no ID
volName: "overrideme",
vol: &structs.ClientHostVolumeConfig{ID: "dynamic-vol-id"},
expectMap: VolumeMap{"overrideme": {ID: "dynamic-vol-id"}},
expectChange: true,
expectLog: "overriding static host volume with dynamic: name=overrideme id=dynamic-vol-id",
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
log, getLogs := logRecorder(t)

changed := UpdateVolumeMap(tc.vols, tc.volName, tc.vol)
changed := UpdateVolumeMap(log, tc.vols, tc.volName, tc.vol)
must.Eq(t, tc.expectMap, tc.vols)

if tc.expectChange {
Expand All @@ -79,6 +94,7 @@ func TestUpdateVolumeMap(t *testing.T) {
must.False(t, changed, must.Sprint("expect volume not to have been changed"))
}

must.StrContains(t, getLogs(), tc.expectLog)
})
}
}
Expand All @@ -87,7 +103,7 @@ func TestWaitForFirstFingerprint(t *testing.T) {
log := testlog.HCLogger(t)
tmp := t.TempDir()
memDB := state.NewMemDB(log)
node := newFakeNode()
node := newFakeNode(t)
hvm := NewHostVolumeManager(log, Config{
PluginDir: "",
SharedMountDir: tmp,
Expand Down
Loading

0 comments on commit a9ee66a

Please sign in to comment.