Skip to content

Commit

Permalink
fix: upload status reponse check and smoke/load should reuse batches …
Browse files Browse the repository at this point in the history
…based on label (#421)
  • Loading branch information
istae authored Oct 21, 2024
1 parent 330a70c commit 7e17fa5
Show file tree
Hide file tree
Showing 18 changed files with 63 additions and 105 deletions.
11 changes: 8 additions & 3 deletions pkg/bee/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,20 @@ func (c *Client) requestWithHeader(ctx context.Context, method, path string, hea
return err
}

if err = responseErrorHandler(r); err != nil {
return err
}

if v != nil && strings.Contains(r.Header.Get("Content-Type"), "application/json") {
_ = json.NewDecoder(r.Body).Decode(&v)
if err := json.NewDecoder(r.Body).Decode(&v); err != nil {
return err
}
for _, parser := range headerParser {
parser(r.Header)
}
return err
}

return err
return nil
}

// drain discards all of the remaining data from the reader and closes it,
Expand Down
7 changes: 5 additions & 2 deletions pkg/bee/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func (c *Client) CreatePostageBatch(ctx context.Context, amount int64, depth uin
return id, nil
}

func (c *Client) GetOrCreateBatch(ctx context.Context, amount int64, depth uint64, label string) (string, error) {
func (c *Client) GetOrCreateMutableBatch(ctx context.Context, amount int64, depth uint64, label string) (string, error) {
batches, err := c.PostageBatches(ctx)
if err != nil {
return "", err
Expand All @@ -445,8 +445,11 @@ func (c *Client) GetOrCreateBatch(ctx context.Context, amount int64, depth uint6
if b.ImmutableFlag { // skip immutable batches
continue
}
if b.Label != label {
continue
}

if b.Usable && (b.BatchTTL == -1 || b.BatchTTL > 0) {
if b.Usable && (b.BatchTTL == -1 || b.BatchTTL > 0) && b.Utilization < (1<<(b.Depth-b.BucketDepth)) {
return b.BatchID, nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/check/act/act.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int

file := bee.NewRandomFile(rnds[0], fileName, o.FileSize)

batchID, err := upClient.GetOrCreateBatch(ctx, o.PostageAmount, o.PostageDepth, postagelabel)
batchID, err := upClient.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, postagelabel)
if err != nil {
return fmt.Errorf("created batched id %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/check/manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int

client := clients[node]

batchID, err := client.GetOrCreateBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
batchID, err := client.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
if err != nil {
return fmt.Errorf("node %s: batch id %w", node, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/check/networkavailability/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ iteration:
var chunks []swarm.Chunk
for _, n := range neighborhoods(int(storageRadius)) {

batch, err := uploadClient.GetOrCreateBatch(ctx, o.PostageAmount, o.PostageDepth, "net-avail-check")
batch, err := uploadClient.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, "net-avail-check")
if err != nil {
c.logger.Errorf("create batch failed failed")
continue iteration
Expand Down
2 changes: 1 addition & 1 deletion pkg/check/pss/pss.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (c *Check) testPss(nodeAName, nodeBName string, clients map[string]*bee.Cli
return err
}

batchID, err := nodeA.GetOrCreateBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
batchID, err := nodeA.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
if err != nil {
cancel()
return fmt.Errorf("node %s: batched id %w", nodeAName, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/check/pushsync/check_chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func checkChunks(ctx context.Context, c orchestration.Cluster, o Options, l logg

uploader := clients[nodeName]

batchID, err := uploader.GetOrCreateBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
batchID, err := uploader.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
if err != nil {
return fmt.Errorf("node %s: batch id %w", nodeName, err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/check/pushsync/check_lightnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func checkLightChunks(ctx context.Context, cluster orchestration.Cluster, o Opti
// prepare postage batches
for i := 0; i < len(lightNodes); i++ {
nodeName := lightNodes[i]
batchID, err := clients[nodeName].GetOrCreateBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
batchID, err := clients[nodeName].GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
if err != nil {
return fmt.Errorf("node %s: batch id %w", nodeName, err)
}
Expand All @@ -46,7 +46,7 @@ func checkLightChunks(ctx context.Context, cluster orchestration.Cluster, o Opti
nodeName := lightNodes[i]

uploader := clients[nodeName]
batchID, err := uploader.GetOrCreateBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
batchID, err := uploader.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
if err != nil {
return fmt.Errorf("node %s: batch id %w", nodeName, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/check/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (c *Check) defaultCheck(ctx context.Context, cluster orchestration.Cluster,
nodeName := sortedNodes[i]
client := clients[nodeName]

batchID, err := client.GetOrCreateBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
batchID, err := client.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
if err != nil {
return fmt.Errorf("node %s: batch id %w", nodeName, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/check/redundancy/redundancy.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, o interf
}
c.logger.Infof("root hash: %s, chunks: %d", root.String(), len(chunks))

batchID, err := uploadClient.GetOrCreateBatch(ctx, opts.PostageAmount, opts.PostageDepth, "ci-redundancy")
batchID, err := uploadClient.GetOrCreateMutableBatch(ctx, opts.PostageAmount, opts.PostageDepth, "ci-redundancy")
if err != nil {
return fmt.Errorf("get or create batch: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/check/settlements/settlements.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int
client := clients[uNode]

c.logger.Info("node", uNode)
batchID, err := client.GetOrCreateBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
batchID, err := client.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
if err != nil {
return fmt.Errorf("node %s: batch id %w", uNode, err)
}
Expand Down
46 changes: 0 additions & 46 deletions pkg/check/smoke/batchstore.go

This file was deleted.

74 changes: 35 additions & 39 deletions pkg/check/smoke/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ var _ beekeeper.Action = (*LoadCheck)(nil)
// Check instance
type LoadCheck struct {
metrics metrics
log logging.Logger
logger logging.Logger
}

// NewCheck returns new check
func NewLoadCheck(log logging.Logger) beekeeper.Action {
return &LoadCheck{
metrics: newMetrics("check_load"),
log: log,
logger: log,
}
}

Expand All @@ -52,11 +52,11 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
return errors.New("max storage radius is not set")
}

c.log.Infof("random seed: %v", o.RndSeed)
c.log.Infof("content size: %v", o.ContentSize)
c.log.Infof("max batch lifespan: %v", o.MaxUseBatch)
c.log.Infof("max storage radius: %v", o.MaxStorageRadius)
c.log.Infof("storage radius check wait time: %v", o.StorageRadiusCheckWait)
c.logger.Infof("random seed: %v", o.RndSeed)
c.logger.Infof("content size: %v", o.ContentSize)
c.logger.Infof("max batch lifespan: %v", o.MaxUseBatch)
c.logger.Infof("max storage radius: %v", o.MaxStorageRadius)
c.logger.Infof("storage radius check wait time: %v", o.StorageRadiusCheckWait)

clients, err := cluster.NodesClients(ctx)
if err != nil {
Expand All @@ -66,20 +66,18 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
ctx, cancel := context.WithTimeout(ctx, o.Duration)
defer cancel()

test := &test{clients: clients, logger: c.log}
test := &test{clients: clients, logger: c.logger}

uploaders := selectNames(cluster, o.UploadGroups...)
downloaders := selectNames(cluster, o.DownloadGroups...)

batches := NewStore(o.MaxUseBatch)

for i := 0; true; i++ {
select {
case <-ctx.Done():
c.log.Info("we are done")
c.logger.Info("we are done")
return nil
default:
c.log.Infof("starting iteration: #%d", i)
c.logger.Infof("starting iteration: #%d", i)
}

var (
Expand All @@ -90,13 +88,13 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts

txData = make([]byte, o.ContentSize)
if _, err := crand.Read(txData); err != nil {
c.log.Infof("unable to create random content: %v", err)
c.logger.Infof("unable to create random content: %v", err)
continue
}

txNames := pickRandom(o.UploaderCount, uploaders)

c.log.Infof("uploader: %s", txNames)
c.logger.Infof("uploader: %s", txNames)

var (
upload sync.WaitGroup
Expand All @@ -115,7 +113,7 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
for retries := 10; txDuration == 0 && retries > 0; retries-- {
select {
case <-ctx.Done():
c.log.Info("we are done")
c.logger.Info("we are done")
return
default:
}
Expand All @@ -126,23 +124,21 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts

c.metrics.UploadAttempts.Inc()
var duration time.Duration
c.log.Infof("uploading to: %s", txName)

batchID := batches.Get(txName)
if batchID == "" {
batchID, err = clients[txName].CreatePostageBatch(ctx, o.PostageAmount, o.PostageDepth, "load-test", true)
if err != nil {
c.log.Errorf("create new batch: %v", err)
return
}
batches.Store(txName, batchID)
c.logger.Infof("uploading to: %s", txName)

batchID, err := clients[txName].GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, "load-test")
if err != nil {
c.logger.Errorf("create new batch: %v", err)
return
}

c.logger.Info("using batch", "batch_id", batchID)

address, duration, err = test.upload(ctx, txName, txData, batchID)
if err != nil {
c.metrics.UploadErrors.Inc()
c.log.Infof("upload failed: %v", err)
c.log.Infof("retrying in: %v", o.TxOnErrWait)
c.logger.Infof("upload failed: %v", err)
c.logger.Infof("retrying in: %v", o.TxOnErrWait)
time.Sleep(o.TxOnErrWait)
return
}
Expand All @@ -157,12 +153,12 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
continue
}

c.log.Infof("sleeping for: %v seconds", o.NodesSyncWait.Seconds())
c.logger.Infof("sleeping for: %v seconds", o.NodesSyncWait.Seconds())
time.Sleep(o.NodesSyncWait) // Wait for nodes to sync.

// pick a batch of downloaders
rxNames := pickRandom(o.DownloaderCount, downloaders)
c.log.Infof("downloaders: %s", rxNames)
c.logger.Infof("downloaders: %s", rxNames)

var wg sync.WaitGroup

Expand All @@ -180,7 +176,7 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
for retries := 10; rxDuration == 0 && retries > 0; retries-- {
select {
case <-ctx.Done():
c.log.Infof("context done in retry: %v", retries)
c.logger.Infof("context done in retry: %v", retries)
return
default:
}
Expand All @@ -190,8 +186,8 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
rxData, rxDuration, err = test.download(ctx, rxName, address)
if err != nil {
c.metrics.DownloadErrors.Inc()
c.log.Infof("download failed: %v", err)
c.log.Infof("retrying in: %v", o.RxOnErrWait)
c.logger.Infof("download failed: %v", err)
c.logger.Infof("retrying in: %v", o.RxOnErrWait)
time.Sleep(o.RxOnErrWait)
}
}
Expand All @@ -202,15 +198,15 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
}

if !bytes.Equal(rxData, txData) {
c.log.Info("uploaded data does not match downloaded data")
c.logger.Info("uploaded data does not match downloaded data")

c.metrics.DownloadMismatch.Inc()

rxLen, txLen := len(rxData), len(txData)
if rxLen != txLen {
c.log.Infof("length mismatch: download length %d; upload length %d", rxLen, txLen)
c.logger.Infof("length mismatch: download length %d; upload length %d", rxLen, txLen)
if txLen < rxLen {
c.log.Info("length mismatch: rx length is bigger then tx length")
c.logger.Info("length mismatch: rx length is bigger then tx length")
}
return
}
Expand All @@ -221,7 +217,7 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
diff++
}
}
c.log.Infof("data mismatch: found %d different bytes, ~%.2f%%", diff, float64(diff)/float64(txLen)*100)
c.logger.Infof("data mismatch: found %d different bytes, ~%.2f%%", diff, float64(diff)/float64(txLen)*100)
return
}

Expand All @@ -242,17 +238,17 @@ func (c *LoadCheck) checkStorageRadius(ctx context.Context, client *bee.Client,
for {
rs, err := client.ReserveState(ctx)
if err != nil {
c.log.Infof("error getting state: %v", err)
c.logger.Infof("error getting state: %v", err)
return false
}
if rs.StorageRadius < maxRadius {
return true
}
c.log.Infof("waiting %v for StorageRadius to decrease. Current: %d, Max: %d", wait, rs.StorageRadius, maxRadius)
c.logger.Infof("waiting %v for StorageRadius to decrease. Current: %d, Max: %d", wait, rs.StorageRadius, maxRadius)

select {
case <-ctx.Done():
c.log.Infof("context done in StorageRadius check: %v", ctx.Err())
c.logger.Infof("context done in StorageRadius check: %v", ctx.Err())
return false
case <-time.After(wait):
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/check/smoke/smoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int

c.metrics.BatchCreateAttempts.Inc()

batchID, err = clients[txName].GetOrCreateBatch(txCtx, o.PostageAmount, o.PostageDepth, "smoke-test")
batchID, err = clients[txName].GetOrCreateMutableBatch(txCtx, o.PostageAmount, o.PostageDepth, "smoke-test")
if err != nil {
c.logger.Errorf("create new batch: %v", err)
c.metrics.BatchCreateErrors.Inc()
Expand Down
Loading

0 comments on commit 7e17fa5

Please sign in to comment.