Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scale allocator member ItemLimit to the relative share of ItemSlots, and refine suspension eligibility #416

Merged
merged 2 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 51 additions & 13 deletions allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,27 @@ func Allocate(args AllocateArgs) error {
// Do we need to re-solve for a maximum assignment?
if state.NetworkHash != lastNetworkHash {
var startTime = time.Now()
desired = solveDesiredAssignments(state, desired[:0])
desired = SolveDesiredAssignments(state, desired[:0])
var dur = time.Since(startTime)
allocatorMaxFlowRuntimeSeconds.Observe(dur.Seconds())

var added, removed, unchanged = ChangeSummary(state.Assignments, desired)

log.WithFields(log.Fields{
"dur": dur,
"hash": state.NetworkHash,
"itemSlots": state.ItemSlots,
"items": len(state.Items),
"lastHash": lastNetworkHash,
"memberSlots": state.MemberSlots,
"members": len(state.Members),
"nextAssignments": len(desired),
"prevAssignments": len(state.Assignments),
"rev": state.KS.Header.Revision,
"root": state.KS.Root,
"asn.last": len(state.Assignments),
"asn.next": len(desired),
"asn.next.add": added,
"asn.next.rem": removed,
"asn.next.same": unchanged,
"dur": dur,
"hash.last": lastNetworkHash,
"hash.next": state.NetworkHash,
"item.slots": state.ItemSlots,
"item.total": len(state.Items),
"mem.slots": state.MemberSlots,
"mem.total": len(state.Members),
"rev": state.KS.Header.Revision,
"root": state.KS.Root,
}).Info("solved for maximum assignment")

if len(desired) < state.ItemSlots {
Expand Down Expand Up @@ -205,7 +210,7 @@ func removeDeadAssignments(txn checkpointTxn, ks *keyspace.KeySpace, asn keyspac
return nil
}

func solveDesiredAssignments(s *State, desired []Assignment) []Assignment {
func SolveDesiredAssignments(s *State, desired []Assignment) []Assignment {
// Number of items to lump into each invocation of push/relabel.
// This is an arbitrary number which is empirically fast to solve,
// but is large enough that we're unlikely to see further improvements
Expand All @@ -230,6 +235,39 @@ func solveDesiredAssignments(s *State, desired []Assignment) []Assignment {
return desired
}

// Compute the total number of additions, removals, and unchanged assignments
// if `current` assignments are shifted to `desired`.
func ChangeSummary(current keyspace.KeyValues, desired []Assignment) (added, removed, unchanged int) {
for lhs, rhs := current, desired; len(lhs) != 0 || len(rhs) != 0; {
var cmp int

if len(lhs) == 0 {
cmp = 1
} else if len(rhs) == 0 {
cmp = -1
} else if lh, rh := lhs[0].Decoded.(Assignment), rhs[0]; lh.ItemID != rh.ItemID {
cmp = strings.Compare(lh.ItemID, rh.ItemID)
} else if lh.MemberZone != rh.MemberZone {
cmp = strings.Compare(lh.MemberZone, rh.MemberZone)
} else {
cmp = strings.Compare(lh.MemberSuffix, rh.MemberSuffix)
}

switch cmp {
case -1:
removed += 1
lhs = lhs[1:]
case 1:
added += 1
rhs = rhs[1:]
case 0:
unchanged += 1
lhs, rhs = lhs[1:], rhs[1:]
}
}
return
}

// modRevisionUnchanged returns a Cmp which verifies the key has not changed
// from the current KeyValue.
func modRevisionUnchanged(kv keyspace.KeyValue) clientv3.Cmp {
Expand Down
118 changes: 118 additions & 0 deletions allocator/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"math/rand/v2"
"testing"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -28,6 +29,7 @@ func (s *BenchmarkHealthSuite) TestBenchmarkHealth(c *gc.C) {
var fakeB = testing.B{N: 1}

benchmarkSimulatedDeploy(&fakeB)
BenchmarkChangingReplication(&fakeB)
}

var _ = gc.Suite(&BenchmarkHealthSuite{})
Expand Down Expand Up @@ -149,6 +151,122 @@ func benchmarkSimulatedDeploy(b *testing.B) {
}).Info("final metrics")
}

func BenchmarkChangingReplication(b *testing.B) {
var client = etcdtest.TestClient()
defer etcdtest.Cleanup()

var ctx = context.Background()
var ks = NewAllocatorKeySpace("/root", testAllocDecoder{})
var state = NewObservedState(ks, MemberKey(ks, "zone", "leader"), isConsistent)

var NMembers = 10
var NItems = 323 // Or: 32303
var NMaxRun = 200
var rng = rand.NewPCG(8675, 309)

b.Logf("Benchmarking with %d items, %d members", NItems, NMembers)

// fill inserts (if `asInsert`) or modifies keys/values defined by `kvcb` and the range [begin, end).
var fill = func(begin, end int, asInsert bool, kvcb func(i int) (string, string)) {
var kv = make([]string, 0, 2*(end-begin))

for i := begin; i != end; i++ {
var k, v = kvcb(i)
kv = append(kv, k)
kv = append(kv, v)
}
if asInsert {
require.NoError(b, insert(ctx, client, kv...))
} else {
require.NoError(b, update(ctx, client, kv...))
}
}

// Insert a Member key which will act as the leader.
require.NoError(b, insert(ctx, client, state.LocalKey, `{"R": 1}`))

// Announce all Members.
fill(0, NMembers, true, func(i int) (string, string) {
return MemberKey(ks, "zone-a", fmt.Sprintf("m%05d", i)), `{"R": 10000}`
})
// Announce all Items with full replication.
fill(0, NItems, true, func(i int) (string, string) {
return ItemKey(ks, fmt.Sprintf("i%05d", i)), `{"R":2}`
})

var testState = struct {
step int
total int
}{step: 0, total: 0}

var testHook = func(round int, idle bool) {
if !idle {
return
} else if err := markAllConsistent(ctx, client, ks, ""); err == nil {
return
} else if err == io.ErrNoProgress {
// Continue the next test step below.
} else {
log.WithField("err", err).Warn("failed to mark all consistent (will retry)")
return
}

// Pick a run of contiguous items, and update each to a random replication.
// This strategy is designed to excercise imbalances of the number of
// replication slots across allocation sub-problems.
var r = rng.Uint64() % 3
var run = 1 + int(rng.Uint64()%(uint64(NMaxRun)-1))
var start = int(rng.Uint64() % uint64((NItems - run)))

if r == 2 {
r = 3 // All items begin with R=2.
}

log.WithFields(log.Fields{
"r": r,
"run": run,
"start": start,
"step": testState.step,
}).Info("next test step")

if testState.step == b.N {
// Begin a graceful exit.
update(ctx, client, state.LocalKey, `{"R": 0}`)
return
}
testState.step += 1
testState.total += run

var value = fmt.Sprintf(`{"R":%d}`, r)
fill(start, start+run, false, func(i int) (string, string) {
return ItemKey(ks, fmt.Sprintf("i%05d", i)), value
})
}

require.NoError(b, ks.Load(ctx, client, 0))
go ks.Watch(ctx, client)

require.NoError(b, Allocate(AllocateArgs{
Context: ctx,
Etcd: client,
State: state,
TestHook: testHook,
}))

var adds = counterVal(allocatorAssignmentAddedTotal)
var packs = counterVal(allocatorAssignmentPackedTotal)
var removes = counterVal(allocatorAssignmentRemovedTotal)
var ratio = (float64(adds-2*float64(NItems)) + float64(removes)) / float64(testState.total)

log.WithFields(log.Fields{
"adds": adds,
"packs": packs,
"removes": removes,
"run.ratio": ratio,
"run.total": testState.total,
}).Info("final metrics")
}

func benchMemberKey(ks *keyspace.KeySpace, i int) string {
var zone string

Expand Down
12 changes: 9 additions & 3 deletions allocator/sparse_flow_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ import (
// relaxed until a maximal assignment is achieved.
type sparseFlowNetwork struct {
*State
myItems keyspace.KeyValues // Slice of State.Items included in this network.
myItems keyspace.KeyValues // Slice of State.Items included in this network.
myItemSlots int // Summed of replication slots attributable just to myItems.

firstItemNodeID pr.NodeID // First Item NodeID in the graph.
firstZoneItemNodeID pr.NodeID // First Zone-Item NodeID in the graph.
Expand Down Expand Up @@ -141,6 +142,7 @@ func newSparseFlowNetwork(s *State, myItems keyspace.KeyValues) *sparseFlowNetwo
// Accelerate our left-join by skipping to the first assignment of `myItems` via binary search.
var pivot, _ = s.Assignments.Search(ItemAssignmentsPrefix(s.KS, itemAt(myItems, 0).ID))
var myAssignments = s.Assignments[pivot:]
var myItemSlots int

var it = LeftJoin{
LenL: len(myItems),
Expand All @@ -152,6 +154,7 @@ func newSparseFlowNetwork(s *State, myItems keyspace.KeyValues) *sparseFlowNetwo
for cur, ok := it.Next(); ok; cur, ok = it.Next() {
var item = cur.Left
var assignments = myAssignments[cur.RightBegin:cur.RightEnd]
myItemSlots += myItems[item].Decoded.(Item).DesiredReplication()

// Left-join zones with |assignments| of this |item|.
var it2 = LeftJoin{
Expand Down Expand Up @@ -196,6 +199,7 @@ func newSparseFlowNetwork(s *State, myItems keyspace.KeyValues) *sparseFlowNetwo
var fs = &sparseFlowNetwork{
State: s,
myItems: myItems,
myItemSlots: myItemSlots,
firstItemNodeID: firstItemNodeID,
firstZoneItemNodeID: firstZoneItemNodeID,
firstMemberNodeID: firstMemberNodeID,
Expand Down Expand Up @@ -348,8 +352,10 @@ func (fs *sparseFlowNetwork) buildCurrentItemArcs(item int, bound int) []pr.Arc
// buildMemberArc from member `member` to the sink.
func (fs *sparseFlowNetwork) buildMemberArc(mf *pr.MaxFlow, id pr.NodeID, member int) []pr.Arc {
var c = memberAt(fs.Members, member).ItemLimit()
// Constrain to the scaled ItemLimit for our portion of the global assignment problem.
c = scaleAndRound(c, len(fs.myItems), len(fs.Items))

// Scale ItemLimit by the relative share of ItemSlots within
// our subset of the global assignment problem.
c = scaleAndRound(c, fs.myItemSlots, fs.ItemSlots)

if mf.RelativeHeight(id) < memberOverflowThreshold {
// Further scale to our relative "fair share" items.
Expand Down
20 changes: 19 additions & 1 deletion broker/append_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"hash"
"io"
"time"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -435,9 +436,26 @@ func (b *appendFSM) onValidatePreconditions() {
// reflected in our index, if a commit wasn't accepted by all peers.
// Such writes are reported as failed to the client and are retried
// (this failure mode is what makes journals at-least-once).
var indexMin, indexMax, indexDirty = b.resolved.replica.index.OffsetRange()
var indexMin, indexMax, indexModTime = b.resolved.replica.index.Summary()
var suspend = b.resolved.journalSpec.Suspend

// The index is "clean" if all fragments have been remote for the journal's
// flush interval, where the flush interval is interpreted as an upper-bound
// expectation of the period between appends if the journal remains "in use".
// Thus, if a journal doesn't recieve an append for more than its interval,
// it's presumed to be idle and is eligible for suspension.
//
// To see why, consider a group of journals which are appended to at midnight,
// configured with a 24h flush interval. These journals will not auto-suspend
// ordinarily. If we instead used a more aggressive policy, they might trigger
// storms of suspensions and re-activations which could impact other journals
// due to assignment churn.
var flushInterval = int64(b.resolved.journalSpec.Fragment.FlushInterval.Seconds())
if flushInterval == 0 {
flushInterval = 24 * 60 * 60 // Default to 24h.
}
var indexDirty = indexModTime == 0 || indexModTime > time.Now().Unix()-flushInterval

var maxOffset = b.pln.spool.End
if indexMax > maxOffset {
maxOffset = indexMax
Expand Down
12 changes: 8 additions & 4 deletions broker/fragment/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,17 @@ func (fi *Index) Query(ctx context.Context, req *pb.ReadRequest) (*pb.ReadRespon
}
}

// OffsetRange returns the [Begin, End) offset range of all Fragments in the index,
// and a boolean indicating if the index has local-only fragments.
func (fi *Index) OffsetRange() (int64, int64, bool) {
// Summary returns the [Begin, End) offset range of all Fragments in the index,
// and the persisted ModTime of the last Fragment (or zero, if it's local).
func (fi *Index) Summary() (int64, int64, int64) {
defer fi.mu.RUnlock()
fi.mu.RLock()

return fi.set.BeginOffset(), fi.set.EndOffset(), len(fi.local) != 0
if l := len(fi.set); l == 0 {
return 0, 0, 0
} else {
return fi.set[0].Begin, fi.set[l-1].End, fi.set[l-1].ModTime
}
}

// SpoolCommit adds local Spool Fragment |frag| to the index.
Expand Down
4 changes: 2 additions & 2 deletions broker/fragment/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (s *IndexSuite) TestWalkStoresAndURLSigning(c *gc.C) {
<-ind.FirstRefreshCh()

c.Check(ind.set, gc.HasLen, 3)
var bo, eo, _ = ind.OffsetRange()
var bo, eo, _ = ind.Summary()
c.Check(bo, gc.Equals, int64(0x0))
c.Check(eo, gc.Equals, int64(0x255))

Expand All @@ -306,7 +306,7 @@ func (s *IndexSuite) TestWalkStoresAndURLSigning(c *gc.C) {
ind.ReplaceRemote(set)

c.Check(ind.set, gc.HasLen, 4) // Combined Fragments are reflected.
bo, eo, _ = ind.OffsetRange()
bo, eo, _ = ind.Summary()
c.Check(bo, gc.Equals, int64(0x0))
c.Check(eo, gc.Equals, int64(0x555))

Expand Down
4 changes: 2 additions & 2 deletions broker/replicate_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestReplicateStreamAndCommit(t *testing.T) {
require.NoError(t, stream.Send(&pb.ReplicateRequest{Content: []byte("bazbing"), ContentDelta: 6}))

// Precondition: content not observable in the Fragment index.
var _, eo, _ = broker.replica("a/journal").index.OffsetRange()
var _, eo, _ = broker.replica("a/journal").index.Summary()
require.Equal(t, int64(0), eo)

// Commit.
Expand All @@ -58,7 +58,7 @@ func TestReplicateStreamAndCommit(t *testing.T) {
expectReplResponse(t, stream, &pb.ReplicateResponse{Status: pb.Status_OK})

// Post-condition: content is now observable.
_, eo, _ = broker.replica("a/journal").index.OffsetRange()
_, eo, _ = broker.replica("a/journal").index.Summary()
require.Equal(t, int64(13), eo)

// Send EOF and expect its returned.
Expand Down