-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
br/storage: enable async prefetch data #48587
Conversation
Signed-off-by: lance6716 <[email protected]>
Signed-off-by: lance6716 <[email protected]>
Signed-off-by: lance6716 <[email protected]>
Signed-off-by: lance6716 <[email protected]>
This reverts commit 49a8440.
Hi @lance6716. Thanks for your PR. PRs from untrusted users cannot be marked as trusted with I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## master #48587 +/- ##
================================================
+ Coverage 71.4085% 73.1794% +1.7708%
================================================
Files 1404 1438 +34
Lines 407209 418451 +11242
================================================
+ Hits 290782 306220 +15438
+ Misses 96471 93351 -3120
+ Partials 19956 18880 -1076
Flags with carried forward coverage won't be shown. Click here to find out more.
|
Signed-off-by: lance6716 <[email protected]>
Signed-off-by: lance6716 <[email protected]>
Signed-off-by: lance6716 <[email protected]>
Signed-off-by: lance6716 <[email protected]>
Signed-off-by: lance6716 <[email protected]>
Signed-off-by: lance6716 <[email protected]>
Signed-off-by: lance6716 <[email protected]>
Signed-off-by: lance6716 <[email protected]>
Signed-off-by: lance6716 <[email protected]>
Signed-off-by: lance6716 <[email protected]>
Signed-off-by: lance6716 <[email protected]>
Signed-off-by: lance6716 <[email protected]>
Signed-off-by: lance6716 <[email protected]>
/cc @ywqzzy |
pkg/util/prefetch/reader.go
Outdated
buf := r.buf[r.bufIdx] | ||
n, err := r.r.Read(buf) | ||
buf = buf[:n] | ||
r.bufCh <- buf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might block forever if we call close before read
and we should wait this routine exist on close
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rest lgtm
Signed-off-by: lance6716 <[email protected]>
pkg/util/prefetch/reader.go
Outdated
return r.r.Close() | ||
ret := r.r.Close() | ||
r.closeOnce.Do(func() { | ||
close(r.closed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't close reader twice, seems no need this closeOnce
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I simply checked s3.go. In some cases s3ObjectReader will be called by line 893 and line 911
Lines 872 to 905 in 3543275
func (r *s3ObjectReader) Read(p []byte) (n int, err error) { | |
maxCnt := r.rangeInfo.End + 1 - r.pos | |
if maxCnt > int64(len(p)) { | |
maxCnt = int64(len(p)) | |
} | |
n, err = r.reader.Read(p[:maxCnt]) | |
// TODO: maybe we should use !errors.Is(err, io.EOF) here to avoid error lint, but currently, pingcap/errors | |
// doesn't implement this method yet. | |
if err != nil && errors.Cause(err) != io.EOF && r.retryCnt < maxErrorRetries { //nolint:errorlint | |
// if can retry, reopen a new reader and try read again | |
end := r.rangeInfo.End + 1 | |
if end == r.rangeInfo.Size { | |
end = 0 | |
} | |
_ = r.reader.Close() | |
newReader, _, err1 := r.storage.open(r.ctx, r.name, r.pos, end) | |
if err1 != nil { | |
log.Warn("open new s3 reader failed", zap.String("file", r.name), zap.Error(err1)) | |
return | |
} | |
r.reader = newReader | |
r.retryCnt++ | |
n, err = r.reader.Read(p[:maxCnt]) | |
} | |
r.pos += int64(n) | |
return | |
} | |
// Close implement the io.Closer interface. | |
func (r *s3ObjectReader) Close() error { | |
return r.reader.Close() | |
} |
So I think we should tolerate the caller call Close() for multiple times. But all reader methods are not thread safe, I will change it to a bool rather than sync.Once
@@ -88,7 +100,12 @@ func (r *Reader) Read(data []byte) (int, error) { | |||
} | |||
} | |||
|
|||
// Close implements io.Closer. | |||
// Close implements io.Closer. Close should not be called concurrently with Read. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems most io.Reader is thread unsafe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
io.Reader interface does not limit the invocation, for example, the reader of io.Pipe is thread safe. So I want to notice the caller about safety.
func (r *Reader) Close() error { | ||
return r.r.Close() | ||
ret := r.r.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if the undering reader is safe to call close/read
concurrently
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can't assume it so I added the comment
require.EqualValues(t, 11, n) | ||
_, err = r.Read(buf) | ||
require.ErrorIs(t, err, io.EOF) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a case that len(read buf) > len(source []byte)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Signed-off-by: lance6716 <[email protected]>
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: 3pointer, D3Hunter The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
/retest |
@lance6716: Cannot trigger testing until a trusted user reviews the PR and leaves an In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
/retest |
@lance6716: Cannot trigger testing until a trusted user reviews the PR and leaves an In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
What problem does this PR solve?
Issue Number: close #48781
Problem Summary:
What is changed and how it works?
offload the network reading from the main goroutine of merge iterator
Still need to improve the performance in future.
Check List
Tests
use this command to run the specific "benchmark" unit test
On master with ks3
This PR with ks3
This PR with memstore
And I checked the heap usage, I have set 64MB in the unit test, the heap does not exceed it.
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.