From a9ee66a6ef358097783d1bf745051124cc0f14f2 Mon Sep 17 00:00:00 2001 From: Daniel Bennett Date: Mon, 6 Jan 2025 15:37:20 -0600 Subject: [PATCH] dynamic host volumes: unique volume name per node (#24748) a node can have only one volume with a given name. the scheduler prevents duplicates, but can only do so after the server knows about the volume. this prevents multiple concurrent creates being called faster than the fingerprint/heartbeat interval. users may still modify an existing volume only if they set the `id` in the volume spec and re-issue `nomad volume create` if a *static* vol is added to config with a name already being used by a dynamic volume, the dynamic takes precedence, but log a warning. --- client/client.go | 1 + client/hostvolumemanager/host_volumes.go | 57 ++++++++++++++++- client/hostvolumemanager/host_volumes_test.go | 62 ++++++++++++++++--- .../hostvolumemanager/volume_fingerprint.go | 9 ++- .../volume_fingerprint_test.go | 28 +++++++-- client/node_updater.go | 14 ++++- 6 files changed, 151 insertions(+), 20 deletions(-) diff --git a/client/client.go b/client/client.go index 40453f1ab77..11d066b924f 100644 --- a/client/client.go +++ b/client/client.go @@ -408,6 +408,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie } c.batchNodeUpdates = newBatchNodeUpdates( + c.logger, c.updateNodeFromDriver, c.updateNodeFromDevices, c.updateNodeFromCSI, diff --git a/client/hostvolumemanager/host_volumes.go b/client/hostvolumemanager/host_volumes.go index aea84ee89c8..0516e5c1c10 100644 --- a/client/hostvolumemanager/host_volumes.go +++ b/client/hostvolumemanager/host_volumes.go @@ -19,6 +19,7 @@ import ( var ( ErrPluginNotExists = errors.New("no such plugin") ErrPluginNotExecutable = errors.New("plugin not executable") + ErrVolumeNameExists = errors.New("volume name already exists on this node") ) // HostVolumeStateManager manages the lifecycle of volumes in client state. @@ -53,6 +54,7 @@ type HostVolumeManager struct { stateMgr HostVolumeStateManager updateNodeVols HostVolumeNodeUpdater builtIns map[string]HostVolumePlugin + locker *volLocker log hclog.Logger } @@ -71,7 +73,8 @@ func NewHostVolumeManager(logger hclog.Logger, config Config) *HostVolumeManager log: logger.With("plugin_id", HostVolumePluginMkdirID), }, }, - log: logger, + locker: &volLocker{}, + log: logger, } } @@ -85,8 +88,14 @@ func (hvm *HostVolumeManager) Create(ctx context.Context, return nil, err } + // can't have two of the same volume name w/ different IDs per client node + if err := hvm.locker.lock(req.Name, req.ID); err != nil { + return nil, err + } + pluginResp, err := plug.Create(ctx, req) if err != nil { + hvm.locker.release(req.Name) return nil, err } @@ -109,6 +118,8 @@ func (hvm *HostVolumeManager) Create(ctx context.Context, hvm.log.Warn("error deleting volume after state store failure", "volume_id", req.ID, "error", delErr) err = multierror.Append(err, delErr) } + // free up the volume name whether delete succeeded or not. + hvm.locker.release(req.Name) return nil, helper.FlattenMultierror(err) } @@ -144,6 +155,9 @@ func (hvm *HostVolumeManager) Delete(ctx context.Context, return nil, err // bail so a user may retry } + // free up volume name for reuse + hvm.locker.release(req.Name) + hvm.updateNodeVols(req.Name, nil) resp := &cstructs.ClientHostVolumeDeleteResponse{ @@ -191,6 +205,21 @@ func (hvm *HostVolumeManager) restoreFromState(ctx context.Context) (VolumeMap, return err } + // lock the name so future creates can't produce duplicates. + err = hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID) + // state should never have duplicate vol names, and restore happens + // prior to node registration, so new creates shouldn't come in + // concurrently, but check for error just in case. + if err != nil { + hvm.log.Error("error during restore", + "volume_name", vol.CreateReq.Name, + "volume_id", vol.CreateReq.ID, + "error", err) + // don't stop the world if it does happen, because an admin + // couldn't do anything about it short of wiping client state. + return nil + } + resp, err := plug.Create(ctx, vol.CreateReq) if err != nil { // plugin execution errors are only logged @@ -221,3 +250,29 @@ func genVolConfig(req *cstructs.ClientHostVolumeCreateRequest, resp *HostVolumeP ReadOnly: false, } } + +// volLocker is used to ensure that volumes on each node are unique by name. +// The volume scheduler will prevent this too, but only after node fingerprint, +// so we need to protect against concurrent duplicate creates. +type volLocker struct { + locks sync.Map +} + +// lock the provided name, error if it was already locked with a different ID +func (l *volLocker) lock(name, id string) error { + current, exists := l.locks.LoadOrStore(name, id) + if exists && id != current.(string) { + return ErrVolumeNameExists + } + return nil +} + +func (l *volLocker) release(name string) { + l.locks.Delete(name) +} + +// only used in tests to assert lock state +func (l *volLocker) isLocked(name string) bool { + _, locked := l.locks.Load(name) + return locked +} diff --git a/client/hostvolumemanager/host_volumes_test.go b/client/hostvolumemanager/host_volumes_test.go index 1b76be152ca..fa4f57af5b3 100644 --- a/client/hostvolumemanager/host_volumes_test.go +++ b/client/hostvolumemanager/host_volumes_test.go @@ -27,7 +27,7 @@ func TestHostVolumeManager(t *testing.T) { tmp := t.TempDir() errDB := &cstate.ErrDB{} memDB := cstate.NewMemDB(log) - node := newFakeNode() + node := newFakeNode(t) hvm := NewHostVolumeManager(log, Config{ PluginDir: "./test_fixtures", @@ -43,9 +43,10 @@ func TestHostVolumeManager(t *testing.T) { t.Run("create", func(t *testing.T) { // plugin doesn't exist + name := "vol-name" req := &cstructs.ClientHostVolumeCreateRequest{ + Name: name, ID: "vol-id", - Name: "vol-name", PluginID: "nope", RequestedCapacityMinBytes: 5, @@ -58,6 +59,7 @@ func TestHostVolumeManager(t *testing.T) { plug.createErr = errors.New("sad create") _, err = hvm.Create(ctx, req) must.ErrorIs(t, err, plug.createErr) + assertNotLocked(t, hvm, name) plug.reset() // error saving state, then error from cleanup attempt @@ -65,24 +67,27 @@ func TestHostVolumeManager(t *testing.T) { _, err = hvm.Create(ctx, req) must.ErrorIs(t, err, cstate.ErrDBError) must.ErrorIs(t, err, plug.deleteErr) + assertNotLocked(t, hvm, name) plug.reset() // error saving state, successful cleanup _, err = hvm.Create(ctx, req) must.ErrorIs(t, err, cstate.ErrDBError) must.Eq(t, "vol-id", plug.deleted) + assertNotLocked(t, hvm, name) plug.reset() // happy path hvm.stateMgr = memDB resp, err := hvm.Create(ctx, req) must.NoError(t, err) - must.Eq(t, &cstructs.ClientHostVolumeCreateResponse{ + expectResp := &cstructs.ClientHostVolumeCreateResponse{ VolumeName: "vol-name", VolumeID: "vol-id", HostPath: tmp, CapacityBytes: 5, - }, resp) + } + must.Eq(t, expectResp, resp) stateDBs, err := memDB.GetDynamicHostVolumes() must.NoError(t, err) // should be saved to state @@ -90,30 +95,54 @@ func TestHostVolumeManager(t *testing.T) { must.Eq(t, "vol-id", stateDBs[0].ID) must.Eq(t, "vol-id", stateDBs[0].CreateReq.ID) // should be registered with node - must.MapContainsKey(t, node.vols, "vol-name", must.Sprintf("no vol-name in %+v", node.vols)) + must.MapContainsKey(t, node.vols, name, must.Sprintf("no %q in %+v", name, node.vols)) + assertLocked(t, hvm, name) + + // repeat create with same ID but different size may update the volume + req.RequestedCapacityMinBytes = 10 + expectResp.CapacityBytes = 10 + resp, err = hvm.Create(ctx, req) + must.NoError(t, err) + must.Eq(t, expectResp, resp) + + // duplicate create with the same vol name but different ID should fail + _, err = hvm.Create(ctx, &cstructs.ClientHostVolumeCreateRequest{ + Name: name, + ID: "different-vol-id", + PluginID: "test-plugin", + }) + must.ErrorIs(t, err, ErrVolumeNameExists) }) + // despite being a subtest, this needs to run after "create" t.Run("delete", func(t *testing.T) { + name := "vol-name" + // should be locked from "create" above + assertLocked(t, hvm, name) + // plugin doesn't exist req := &cstructs.ClientHostVolumeDeleteRequest{ + Name: name, ID: "vol-id", - Name: "vol-name", PluginID: "nope", } _, err := hvm.Delete(ctx, req) must.ErrorIs(t, err, ErrPluginNotExists) + assertLocked(t, hvm, name) // error from plugin req.PluginID = "test-plugin" plug.deleteErr = errors.New("sad delete") _, err = hvm.Delete(ctx, req) must.ErrorIs(t, err, plug.deleteErr) + assertLocked(t, hvm, name) plug.reset() // error saving state hvm.stateMgr = errDB _, err = hvm.Delete(ctx, req) must.ErrorIs(t, err, cstate.ErrDBError) + assertLocked(t, hvm, name) // happy path // add stuff that should be deleted @@ -136,6 +165,7 @@ func TestHostVolumeManager(t *testing.T) { stateVols, err := memDB.GetDynamicHostVolumes() must.NoError(t, err) must.Nil(t, stateVols, must.Sprint("vols should be deleted from state")) + assertNotLocked(t, hvm, name) }) } @@ -181,6 +211,16 @@ func (p *fakePlugin) Delete(_ context.Context, req *cstructs.ClientHostVolumeDel return nil } +func assertLocked(t *testing.T, hvm *HostVolumeManager, name string) { + t.Helper() + must.True(t, hvm.locker.isLocked(name), must.Sprintf("vol name %q should be locked", name)) +} + +func assertNotLocked(t *testing.T, hvm *HostVolumeManager, name string) { + t.Helper() + must.False(t, hvm.locker.isLocked(name), must.Sprintf("vol name %q should not be locked", name)) +} + func TestHostVolumeManager_restoreFromState(t *testing.T) { log := testlog.HCLogger(t) vol := &cstructs.HostVolumeState{ @@ -191,7 +231,7 @@ func TestHostVolumeManager_restoreFromState(t *testing.T) { PluginID: "mkdir", }, } - node := newFakeNode() + node := newFakeNode(t) t.Run("no vols", func(t *testing.T) { state := cstate.NewMemDB(log) @@ -235,6 +275,8 @@ func TestHostVolumeManager_restoreFromState(t *testing.T) { must.Eq(t, expect, vols) must.DirExists(t, volPath) + + assertLocked(t, hvm, "test-vol-name") }) t.Run("state error", func(t *testing.T) { @@ -290,15 +332,17 @@ func TestHostVolumeManager_restoreFromState(t *testing.T) { type fakeNode struct { vols VolumeMap + log hclog.Logger } func (n *fakeNode) updateVol(name string, volume *structs.ClientHostVolumeConfig) { - UpdateVolumeMap(n.vols, name, volume) + UpdateVolumeMap(n.log, n.vols, name, volume) } -func newFakeNode() *fakeNode { +func newFakeNode(t *testing.T) *fakeNode { return &fakeNode{ vols: make(VolumeMap), + log: testlog.HCLogger(t), } } diff --git a/client/hostvolumemanager/volume_fingerprint.go b/client/hostvolumemanager/volume_fingerprint.go index a4e1713686e..dff87a77749 100644 --- a/client/hostvolumemanager/volume_fingerprint.go +++ b/client/hostvolumemanager/volume_fingerprint.go @@ -6,6 +6,7 @@ package hostvolumemanager import ( "context" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/nomad/structs" ) @@ -24,7 +25,7 @@ type VolumeMap map[string]*structs.ClientHostVolumeConfig // // Since it may mutate the map, the caller should make a copy // or acquire a lock as appropriate for their context. -func UpdateVolumeMap(volumes VolumeMap, name string, vol *structs.ClientHostVolumeConfig) (changed bool) { +func UpdateVolumeMap(log hclog.Logger, volumes VolumeMap, name string, vol *structs.ClientHostVolumeConfig) (changed bool) { current, exists := volumes[name] if vol == nil { if exists { @@ -32,6 +33,12 @@ func UpdateVolumeMap(volumes VolumeMap, name string, vol *structs.ClientHostVolu changed = true } } else { + // if the volume already exists with no ID, it will be because it was + // added to client agent config after having been previously created + // as a dynamic vol. dynamic takes precedence, but log a warning. + if exists && current.ID == "" { + log.Warn("overriding static host volume with dynamic", "name", name, "id", vol.ID) + } if !exists || !vol.Equal(current) { volumes[name] = vol changed = true diff --git a/client/hostvolumemanager/volume_fingerprint_test.go b/client/hostvolumemanager/volume_fingerprint_test.go index 3cda6a54091..ce9ea429555 100644 --- a/client/hostvolumemanager/volume_fingerprint_test.go +++ b/client/hostvolumemanager/volume_fingerprint_test.go @@ -23,6 +23,8 @@ func TestUpdateVolumeMap(t *testing.T) { expectMap VolumeMap expectChange bool + + expectLog string }{ { name: "delete absent", @@ -57,20 +59,33 @@ func TestUpdateVolumeMap(t *testing.T) { expectChange: false, }, { - // this should not happen, but test anyway + // this should not happen with dynamic vols, but test anyway name: "change present", - vols: VolumeMap{"changeme": {Path: "before"}}, + vols: VolumeMap{"changeme": {ID: "before"}}, volName: "changeme", - vol: &structs.ClientHostVolumeConfig{Path: "after"}, - expectMap: VolumeMap{"changeme": {Path: "after"}}, + vol: &structs.ClientHostVolumeConfig{ID: "after"}, + expectMap: VolumeMap{"changeme": {ID: "after"}}, + expectChange: true, + }, + { + // this should only happen during agent start, if a static vol has + // been added to config after a previous dynamic vol was created + // with the same name. + name: "override static", + vols: VolumeMap{"overrideme": {ID: ""}}, // static vols have no ID + volName: "overrideme", + vol: &structs.ClientHostVolumeConfig{ID: "dynamic-vol-id"}, + expectMap: VolumeMap{"overrideme": {ID: "dynamic-vol-id"}}, expectChange: true, + expectLog: "overriding static host volume with dynamic: name=overrideme id=dynamic-vol-id", }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { + log, getLogs := logRecorder(t) - changed := UpdateVolumeMap(tc.vols, tc.volName, tc.vol) + changed := UpdateVolumeMap(log, tc.vols, tc.volName, tc.vol) must.Eq(t, tc.expectMap, tc.vols) if tc.expectChange { @@ -79,6 +94,7 @@ func TestUpdateVolumeMap(t *testing.T) { must.False(t, changed, must.Sprint("expect volume not to have been changed")) } + must.StrContains(t, getLogs(), tc.expectLog) }) } } @@ -87,7 +103,7 @@ func TestWaitForFirstFingerprint(t *testing.T) { log := testlog.HCLogger(t) tmp := t.TempDir() memDB := state.NewMemDB(log) - node := newFakeNode() + node := newFakeNode(t) hvm := NewHostVolumeManager(log, Config{ PluginDir: "", SharedMountDir: tmp, diff --git a/client/node_updater.go b/client/node_updater.go index c02a2dd9950..36de5663a94 100644 --- a/client/node_updater.go +++ b/client/node_updater.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/devicemanager" hvm "github.com/hashicorp/nomad/client/hostvolumemanager" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" @@ -50,7 +51,8 @@ SEND_BATCH: // host volume updates var hostVolChanged bool c.batchNodeUpdates.batchHostVolumeUpdates(func(name string, vol *structs.ClientHostVolumeConfig) { - hostVolChanged = hvm.UpdateVolumeMap(newConfig.Node.HostVolumes, name, vol) + hostVolChanged = hvm.UpdateVolumeMap(c.logger.Named("node_updater").With("method", "batchFirstFingerprint"), + newConfig.Node.HostVolumes, name, vol) }) // csi updates @@ -140,7 +142,8 @@ func (c *Client) updateNodeFromHostVol(name string, vol *structs.ClientHostVolum newConfig.Node.HostVolumes = make(map[string]*structs.ClientHostVolumeConfig) } - changed := hvm.UpdateVolumeMap(newConfig.Node.HostVolumes, name, vol) + changed := hvm.UpdateVolumeMap(c.logger.Named("node_updater").With("method", "updateNodeFromHostVol"), + newConfig.Node.HostVolumes, name, vol) if changed { c.config = newConfig c.updateNode() @@ -342,6 +345,8 @@ func (c *Client) updateNodeFromDevicesLocked(devices []*structs.NodeDeviceResour // Once ready, the batches can be flushed and toggled to stop batching and forward // all updates to a configured callback to be performed incrementally type batchNodeUpdates struct { + logger hclog.Logger + // access to driver fields must hold driversMu lock drivers map[string]*structs.DriverInfo driversBatched bool @@ -368,12 +373,14 @@ type batchNodeUpdates struct { } func newBatchNodeUpdates( + logger hclog.Logger, driverCB drivermanager.UpdateNodeDriverInfoFn, devicesCB devicemanager.UpdateNodeDevicesFn, csiCB csimanager.UpdateNodeCSIInfoFunc, hostVolumeCB hvm.HostVolumeNodeUpdater) *batchNodeUpdates { return &batchNodeUpdates{ + logger: logger, drivers: make(map[string]*structs.DriverInfo), driverCB: driverCB, devices: []*structs.NodeDeviceResource{}, @@ -394,7 +401,8 @@ func (b *batchNodeUpdates) updateNodeFromHostVolume(name string, vol *structs.Cl b.hostVolumeCB(name, vol) // => Client.updateNodeFromHostVol() return } - hvm.UpdateVolumeMap(b.hostVolumes, name, vol) + hvm.UpdateVolumeMap(b.logger.Named("node_updater").With("method", "updateNodeFromHostVolume"), + b.hostVolumes, name, vol) } // this one runs on client start