Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: upload status reponse check and smoke/load should reuse batches based on label #421

Merged
merged 4 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions pkg/bee/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,20 @@ func (c *Client) requestWithHeader(ctx context.Context, method, path string, hea
return err
}

if err = responseErrorHandler(r); err != nil {
return err
}

if v != nil && strings.Contains(r.Header.Get("Content-Type"), "application/json") {
_ = json.NewDecoder(r.Body).Decode(&v)
if err := json.NewDecoder(r.Body).Decode(&v); err != nil {
return err
}
for _, parser := range headerParser {
parser(r.Header)
}
return err
}

return err
return nil
}

// drain discards all of the remaining data from the reader and closes it,
Expand Down
7 changes: 5 additions & 2 deletions pkg/bee/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func (c *Client) CreatePostageBatch(ctx context.Context, amount int64, depth uin
return id, nil
}

func (c *Client) GetOrCreateBatch(ctx context.Context, amount int64, depth uint64, label string) (string, error) {
func (c *Client) GetOrCreateMutableBatch(ctx context.Context, amount int64, depth uint64, label string) (string, error) {
batches, err := c.PostageBatches(ctx)
if err != nil {
return "", err
Expand All @@ -445,8 +445,11 @@ func (c *Client) GetOrCreateBatch(ctx context.Context, amount int64, depth uint6
if b.ImmutableFlag { // skip immutable batches
continue
}
if b.Label != label {
continue
}

if b.Usable && (b.BatchTTL == -1 || b.BatchTTL > 0) {
if b.Usable && (b.BatchTTL == -1 || b.BatchTTL > 0) && b.Utilization < (1<<(b.Depth-b.BucketDepth)) {
return b.BatchID, nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/check/act/act.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int

file := bee.NewRandomFile(rnds[0], fileName, o.FileSize)

batchID, err := upClient.GetOrCreateBatch(ctx, o.PostageAmount, o.PostageDepth, postagelabel)
batchID, err := upClient.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, postagelabel)
if err != nil {
return fmt.Errorf("created batched id %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/check/manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int

client := clients[node]

batchID, err := client.GetOrCreateBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
batchID, err := client.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
if err != nil {
return fmt.Errorf("node %s: batch id %w", node, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/check/networkavailability/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ iteration:
var chunks []swarm.Chunk
for _, n := range neighborhoods(int(storageRadius)) {

batch, err := uploadClient.GetOrCreateBatch(ctx, o.PostageAmount, o.PostageDepth, "net-avail-check")
batch, err := uploadClient.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, "net-avail-check")
if err != nil {
c.logger.Errorf("create batch failed failed")
continue iteration
Expand Down
2 changes: 1 addition & 1 deletion pkg/check/pss/pss.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (c *Check) testPss(nodeAName, nodeBName string, clients map[string]*bee.Cli
return err
}

batchID, err := nodeA.GetOrCreateBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
batchID, err := nodeA.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
if err != nil {
cancel()
return fmt.Errorf("node %s: batched id %w", nodeAName, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/check/pushsync/check_chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func checkChunks(ctx context.Context, c orchestration.Cluster, o Options, l logg

uploader := clients[nodeName]

batchID, err := uploader.GetOrCreateBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
batchID, err := uploader.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
if err != nil {
return fmt.Errorf("node %s: batch id %w", nodeName, err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/check/pushsync/check_lightnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func checkLightChunks(ctx context.Context, cluster orchestration.Cluster, o Opti
// prepare postage batches
for i := 0; i < len(lightNodes); i++ {
nodeName := lightNodes[i]
batchID, err := clients[nodeName].GetOrCreateBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
batchID, err := clients[nodeName].GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
if err != nil {
return fmt.Errorf("node %s: batch id %w", nodeName, err)
}
Expand All @@ -46,7 +46,7 @@ func checkLightChunks(ctx context.Context, cluster orchestration.Cluster, o Opti
nodeName := lightNodes[i]

uploader := clients[nodeName]
batchID, err := uploader.GetOrCreateBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
batchID, err := uploader.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
if err != nil {
return fmt.Errorf("node %s: batch id %w", nodeName, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/check/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (c *Check) defaultCheck(ctx context.Context, cluster orchestration.Cluster,
nodeName := sortedNodes[i]
client := clients[nodeName]

batchID, err := client.GetOrCreateBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
batchID, err := client.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
if err != nil {
return fmt.Errorf("node %s: batch id %w", nodeName, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/check/redundancy/redundancy.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, o interf
}
c.logger.Infof("root hash: %s, chunks: %d", root.String(), len(chunks))

batchID, err := uploadClient.GetOrCreateBatch(ctx, opts.PostageAmount, opts.PostageDepth, "ci-redundancy")
batchID, err := uploadClient.GetOrCreateMutableBatch(ctx, opts.PostageAmount, opts.PostageDepth, "ci-redundancy")
if err != nil {
return fmt.Errorf("get or create batch: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/check/settlements/settlements.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int
client := clients[uNode]

c.logger.Info("node", uNode)
batchID, err := client.GetOrCreateBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
batchID, err := client.GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel)
if err != nil {
return fmt.Errorf("node %s: batch id %w", uNode, err)
}
Expand Down
46 changes: 0 additions & 46 deletions pkg/check/smoke/batchstore.go

This file was deleted.

74 changes: 35 additions & 39 deletions pkg/check/smoke/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ var _ beekeeper.Action = (*LoadCheck)(nil)
// Check instance
type LoadCheck struct {
metrics metrics
log logging.Logger
logger logging.Logger
}

// NewCheck returns new check
func NewLoadCheck(log logging.Logger) beekeeper.Action {
return &LoadCheck{
metrics: newMetrics("check_load"),
log: log,
logger: log,
}
}

Expand All @@ -52,11 +52,11 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
return errors.New("max storage radius is not set")
}

c.log.Infof("random seed: %v", o.RndSeed)
c.log.Infof("content size: %v", o.ContentSize)
c.log.Infof("max batch lifespan: %v", o.MaxUseBatch)
c.log.Infof("max storage radius: %v", o.MaxStorageRadius)
c.log.Infof("storage radius check wait time: %v", o.StorageRadiusCheckWait)
c.logger.Infof("random seed: %v", o.RndSeed)
c.logger.Infof("content size: %v", o.ContentSize)
c.logger.Infof("max batch lifespan: %v", o.MaxUseBatch)
c.logger.Infof("max storage radius: %v", o.MaxStorageRadius)
c.logger.Infof("storage radius check wait time: %v", o.StorageRadiusCheckWait)

clients, err := cluster.NodesClients(ctx)
if err != nil {
Expand All @@ -66,20 +66,18 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
ctx, cancel := context.WithTimeout(ctx, o.Duration)
defer cancel()

test := &test{clients: clients, logger: c.log}
test := &test{clients: clients, logger: c.logger}

uploaders := selectNames(cluster, o.UploadGroups...)
downloaders := selectNames(cluster, o.DownloadGroups...)

batches := NewStore(o.MaxUseBatch)

for i := 0; true; i++ {
select {
case <-ctx.Done():
c.log.Info("we are done")
c.logger.Info("we are done")
return nil
default:
c.log.Infof("starting iteration: #%d", i)
c.logger.Infof("starting iteration: #%d", i)
}

var (
Expand All @@ -90,13 +88,13 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts

txData = make([]byte, o.ContentSize)
if _, err := crand.Read(txData); err != nil {
c.log.Infof("unable to create random content: %v", err)
c.logger.Infof("unable to create random content: %v", err)
continue
}

txNames := pickRandom(o.UploaderCount, uploaders)

c.log.Infof("uploader: %s", txNames)
c.logger.Infof("uploader: %s", txNames)

var (
upload sync.WaitGroup
Expand All @@ -115,7 +113,7 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
for retries := 10; txDuration == 0 && retries > 0; retries-- {
select {
case <-ctx.Done():
c.log.Info("we are done")
c.logger.Info("we are done")
return
default:
}
Expand All @@ -126,23 +124,21 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts

c.metrics.UploadAttempts.Inc()
var duration time.Duration
c.log.Infof("uploading to: %s", txName)

batchID := batches.Get(txName)
if batchID == "" {
batchID, err = clients[txName].CreatePostageBatch(ctx, o.PostageAmount, o.PostageDepth, "load-test", true)
if err != nil {
c.log.Errorf("create new batch: %v", err)
return
}
batches.Store(txName, batchID)
c.logger.Infof("uploading to: %s", txName)

batchID, err := clients[txName].GetOrCreateMutableBatch(ctx, o.PostageAmount, o.PostageDepth, "load-test")
if err != nil {
c.logger.Errorf("create new batch: %v", err)
return
}

c.logger.Info("using batch", "batch_id", batchID)

address, duration, err = test.upload(ctx, txName, txData, batchID)
if err != nil {
c.metrics.UploadErrors.Inc()
c.log.Infof("upload failed: %v", err)
c.log.Infof("retrying in: %v", o.TxOnErrWait)
c.logger.Infof("upload failed: %v", err)
c.logger.Infof("retrying in: %v", o.TxOnErrWait)
time.Sleep(o.TxOnErrWait)
return
}
Expand All @@ -157,12 +153,12 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
continue
}

c.log.Infof("sleeping for: %v seconds", o.NodesSyncWait.Seconds())
c.logger.Infof("sleeping for: %v seconds", o.NodesSyncWait.Seconds())
time.Sleep(o.NodesSyncWait) // Wait for nodes to sync.

// pick a batch of downloaders
rxNames := pickRandom(o.DownloaderCount, downloaders)
c.log.Infof("downloaders: %s", rxNames)
c.logger.Infof("downloaders: %s", rxNames)

var wg sync.WaitGroup

Expand All @@ -180,7 +176,7 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
for retries := 10; rxDuration == 0 && retries > 0; retries-- {
select {
case <-ctx.Done():
c.log.Infof("context done in retry: %v", retries)
c.logger.Infof("context done in retry: %v", retries)
return
default:
}
Expand All @@ -190,8 +186,8 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
rxData, rxDuration, err = test.download(ctx, rxName, address)
if err != nil {
c.metrics.DownloadErrors.Inc()
c.log.Infof("download failed: %v", err)
c.log.Infof("retrying in: %v", o.RxOnErrWait)
c.logger.Infof("download failed: %v", err)
c.logger.Infof("retrying in: %v", o.RxOnErrWait)
time.Sleep(o.RxOnErrWait)
}
}
Expand All @@ -202,15 +198,15 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
}

if !bytes.Equal(rxData, txData) {
c.log.Info("uploaded data does not match downloaded data")
c.logger.Info("uploaded data does not match downloaded data")

c.metrics.DownloadMismatch.Inc()

rxLen, txLen := len(rxData), len(txData)
if rxLen != txLen {
c.log.Infof("length mismatch: download length %d; upload length %d", rxLen, txLen)
c.logger.Infof("length mismatch: download length %d; upload length %d", rxLen, txLen)
if txLen < rxLen {
c.log.Info("length mismatch: rx length is bigger then tx length")
c.logger.Info("length mismatch: rx length is bigger then tx length")
}
return
}
Expand All @@ -221,7 +217,7 @@ func (c *LoadCheck) Run(ctx context.Context, cluster orchestration.Cluster, opts
diff++
}
}
c.log.Infof("data mismatch: found %d different bytes, ~%.2f%%", diff, float64(diff)/float64(txLen)*100)
c.logger.Infof("data mismatch: found %d different bytes, ~%.2f%%", diff, float64(diff)/float64(txLen)*100)
return
}

Expand All @@ -242,17 +238,17 @@ func (c *LoadCheck) checkStorageRadius(ctx context.Context, client *bee.Client,
for {
rs, err := client.ReserveState(ctx)
if err != nil {
c.log.Infof("error getting state: %v", err)
c.logger.Infof("error getting state: %v", err)
return false
}
if rs.StorageRadius < maxRadius {
return true
}
c.log.Infof("waiting %v for StorageRadius to decrease. Current: %d, Max: %d", wait, rs.StorageRadius, maxRadius)
c.logger.Infof("waiting %v for StorageRadius to decrease. Current: %d, Max: %d", wait, rs.StorageRadius, maxRadius)

select {
case <-ctx.Done():
c.log.Infof("context done in StorageRadius check: %v", ctx.Err())
c.logger.Infof("context done in StorageRadius check: %v", ctx.Err())
return false
case <-time.After(wait):
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/check/smoke/smoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int

c.metrics.BatchCreateAttempts.Inc()

batchID, err = clients[txName].GetOrCreateBatch(txCtx, o.PostageAmount, o.PostageDepth, "smoke-test")
batchID, err = clients[txName].GetOrCreateMutableBatch(txCtx, o.PostageAmount, o.PostageDepth, "smoke-test")
if err != nil {
c.logger.Errorf("create new batch: %v", err)
c.metrics.BatchCreateErrors.Inc()
Expand Down
Loading