Skip to content

Commit

Permalink
fix: Fix goroutine leak in queryrange downstreamer (#15665)
Browse files Browse the repository at this point in the history
  • Loading branch information
yincongcyincong authored Jan 9, 2025
1 parent e347eb7 commit 5f476a3
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions pkg/querier/queryrange/downstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,18 +170,20 @@ func (in instance) For(
go func() {
err := concurrency.ForEachJob(ctx, len(queries), in.parallelism, func(ctx context.Context, i int) error {
res, err := fn(queries[i])
if err != nil {
return err
}
response := logql.Resp{
I: i,
Res: res,
Err: err,
}

// Feed the result into the channel unless the work has completed.
select {
case <-ctx.Done():
case ch <- response:
}
return err
return nil
})
if err != nil {
ch <- logql.Resp{
Expand All @@ -192,15 +194,19 @@ func (in instance) For(
close(ch)
}()

var err error
for resp := range ch {
if resp.Err != nil {
return nil, resp.Err
if err != nil {
continue
}
if err := acc.Accumulate(ctx, resp.Res, resp.I); err != nil {
return nil, err
if resp.Err != nil {
err = resp.Err
continue
}
err = acc.Accumulate(ctx, resp.Res, resp.I)
}
return acc.Result(), nil

return acc.Result(), err
}

// convert to matrix
Expand Down

0 comments on commit 5f476a3

Please sign in to comment.