Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br/storage: enable async prefetch data #48587

Merged
merged 21 commits into from
Nov 24, 2023
85 changes: 42 additions & 43 deletions br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ func openTestingStorage(t *testing.T) storage.ExternalStorage {
if *testingStorageURI == "" {
t.Skip("testingStorageURI is not set")
}
b, err := storage.ParseBackend(*testingStorageURI, nil)
intest.Assert(err == nil)
s, err := storage.New(context.Background(), b, nil)
intest.Assert(err == nil)
s, err := storage.NewFromURL(context.Background(), *testingStorageURI, nil)
intest.AssertNoError(err)
return s
}

Expand Down Expand Up @@ -111,7 +109,7 @@ func writePlainFile(s *writeTestSuite) {
offset := 0
flush := func(w storage.ExternalFileWriter) {
n, err := w.Write(ctx, buf[:offset])
intest.Assert(err == nil)
intest.AssertNoError(err)
intest.Assert(offset == n)
offset = 0
}
Expand All @@ -120,7 +118,7 @@ func writePlainFile(s *writeTestSuite) {
s.beforeCreateWriter()
}
writer, err := s.store.Create(ctx, "test/plain_file", nil)
intest.Assert(err == nil)
intest.AssertNoError(err)
key, val, _ := s.source.next()
for key != nil {
if offset+len(key)+len(val) > len(buf) {
Expand All @@ -135,7 +133,7 @@ func writePlainFile(s *writeTestSuite) {
s.beforeWriterClose()
}
err = writer.Close(ctx)
intest.Assert(err == nil)
intest.AssertNoError(err)
if s.afterWriterClose != nil {
s.afterWriterClose()
}
Expand All @@ -153,14 +151,14 @@ func writeExternalFile(s *writeTestSuite) {
key, val, h := s.source.next()
for key != nil {
err := writer.WriteRow(ctx, key, val, h)
intest.Assert(err == nil)
intest.AssertNoError(err)
key, val, h = s.source.next()
}
if s.beforeWriterClose != nil {
s.beforeWriterClose()
}
err := writer.Close(ctx)
intest.Assert(err == nil)
intest.AssertNoError(err)
if s.afterWriterClose != nil {
s.afterWriterClose()
}
Expand All @@ -180,17 +178,17 @@ func TestCompareWriter(t *testing.T) {
beforeTest := func() {
fileIdx++
file, err = os.Create(fmt.Sprintf("cpu-profile-%d.prof", fileIdx))
intest.Assert(err == nil)
intest.AssertNoError(err)
err = pprof.StartCPUProfile(file)
intest.Assert(err == nil)
intest.AssertNoError(err)
now = time.Now()
}
beforeClose := func() {
file, err = os.Create(fmt.Sprintf("heap-profile-%d.prof", fileIdx))
intest.Assert(err == nil)
intest.AssertNoError(err)
// check heap profile to see the memory usage is expected
err = pprof.WriteHeapProfile(file)
intest.Assert(err == nil)
intest.AssertNoError(err)
}
afterClose := func() {
elapsed = time.Since(now)
Expand Down Expand Up @@ -228,16 +226,16 @@ type readTestSuite struct {

func readFileSequential(s *readTestSuite) {
ctx := context.Background()
files, _, err := GetAllFileNames(ctx, s.store, "evenly_distributed")
intest.Assert(err == nil)
files, _, err := GetAllFileNames(ctx, s.store, "/evenly_distributed")
intest.AssertNoError(err)

buf := make([]byte, s.memoryLimit)
if s.beforeCreateReader != nil {
s.beforeCreateReader()
}
for i, file := range files {
reader, err := s.store.Open(ctx, file, nil)
intest.Assert(err == nil)
intest.AssertNoError(err)
_, err = reader.Read(buf)
for err == nil {
_, err = reader.Read(buf)
Expand All @@ -249,7 +247,7 @@ func readFileSequential(s *readTestSuite) {
}
}
err = reader.Close()
intest.Assert(err == nil)
intest.AssertNoError(err)
}
if s.afterReaderClose != nil {
s.afterReaderClose()
Expand All @@ -258,8 +256,8 @@ func readFileSequential(s *readTestSuite) {

func readFileConcurrently(s *readTestSuite) {
ctx := context.Background()
files, _, err := GetAllFileNames(ctx, s.store, "evenly_distributed")
intest.Assert(err == nil)
files, _, err := GetAllFileNames(ctx, s.store, "/evenly_distributed")
intest.AssertNoError(err)

conc := min(s.concurrency, len(files))
var eg errgroup.Group
Expand All @@ -273,7 +271,7 @@ func readFileConcurrently(s *readTestSuite) {
eg.Go(func() error {
buf := make([]byte, s.memoryLimit/conc)
reader, err := s.store.Open(ctx, file, nil)
intest.Assert(err == nil)
intest.AssertNoError(err)
_, err = reader.Read(buf)
for err == nil {
_, err = reader.Read(buf)
Expand All @@ -285,12 +283,12 @@ func readFileConcurrently(s *readTestSuite) {
}
})
err = reader.Close()
intest.Assert(err == nil)
intest.AssertNoError(err)
return nil
})
}
err = eg.Wait()
intest.Assert(err == nil)
intest.AssertNoError(err)
if s.afterReaderClose != nil {
s.afterReaderClose()
}
Expand All @@ -303,12 +301,12 @@ func createEvenlyDistributedFiles(
store := openTestingStorage(t)
ctx := context.Background()

files, statFiles, err := GetAllFileNames(ctx, store, "evenly_distributed")
intest.Assert(err == nil)
files, statFiles, err := GetAllFileNames(ctx, store, "/evenly_distributed")
intest.AssertNoError(err)
err = store.DeleteFiles(ctx, files)
intest.Assert(err == nil)
intest.AssertNoError(err)
err = store.DeleteFiles(ctx, statFiles)
intest.Assert(err == nil)
intest.AssertNoError(err)

value := make([]byte, 100)
kvCnt := 0
Expand All @@ -317,7 +315,7 @@ func createEvenlyDistributedFiles(
SetMemorySizeLimit(uint64(float64(fileSize) * 1.1))
writer := builder.Build(
store,
"evenly_distributed",
"/evenly_distributed",
fmt.Sprintf("%d", i),
)

Expand All @@ -326,21 +324,21 @@ func createEvenlyDistributedFiles(
for totalSize < fileSize {
key := fmt.Sprintf("key_%09d", keyIdx)
err := writer.WriteRow(ctx, []byte(key), value, nil)
intest.Assert(err == nil)
intest.AssertNoError(err)
keyIdx += fileCount
totalSize += len(key) + len(value)
kvCnt++
}
err := writer.Close(ctx)
intest.Assert(err == nil)
intest.AssertNoError(err)
}
return store, kvCnt
}

func readMergeIter(s *readTestSuite) {
ctx := context.Background()
files, _, err := GetAllFileNames(ctx, s.store, "evenly_distributed")
intest.Assert(err == nil)
files, _, err := GetAllFileNames(ctx, s.store, "/evenly_distributed")
intest.AssertNoError(err)

if s.beforeCreateReader != nil {
s.beforeCreateReader()
Expand All @@ -349,7 +347,7 @@ func readMergeIter(s *readTestSuite) {
readBufSize := s.memoryLimit / len(files)
zeroOffsets := make([]uint64, len(files))
iter, err := NewMergeKVIter(ctx, files, zeroOffsets, s.store, readBufSize, false)
intest.Assert(err == nil)
intest.AssertNoError(err)

kvCnt := 0
for iter.Next() {
Expand All @@ -362,7 +360,7 @@ func readMergeIter(s *readTestSuite) {
}
intest.Assert(kvCnt == s.totalKVCnt)
err = iter.Close()
intest.Assert(err == nil)
intest.AssertNoError(err)
if s.afterReaderClose != nil {
s.afterReaderClose()
}
Expand All @@ -383,17 +381,17 @@ func TestCompareReader(t *testing.T) {
beforeTest := func() {
fileIdx++
file, err = os.Create(fmt.Sprintf("cpu-profile-%d.prof", fileIdx))
intest.Assert(err == nil)
intest.AssertNoError(err)
err = pprof.StartCPUProfile(file)
intest.Assert(err == nil)
intest.AssertNoError(err)
now = time.Now()
}
beforeClose := func() {
file, err = os.Create(fmt.Sprintf("heap-profile-%d.prof", fileIdx))
intest.Assert(err == nil)
intest.AssertNoError(err)
// check heap profile to see the memory usage is expected
err = pprof.WriteHeapProfile(file)
intest.Assert(err == nil)
intest.AssertNoError(err)
}
afterClose := func() {
elapsed = time.Since(now)
Expand All @@ -409,23 +407,24 @@ func TestCompareReader(t *testing.T) {
beforeReaderClose: beforeClose,
afterReaderClose: afterClose,
}
readFileSequential(suite)

readMergeIter(suite)
t.Logf(
"sequential read speed for %d bytes: %.2f MB/s",
"merge iter read speed for %d bytes: %.2f MB/s",
fileSize*fileCnt,
float64(fileSize*fileCnt)/elapsed.Seconds()/1024/1024,
)

readFileConcurrently(suite)
readFileSequential(suite)
t.Logf(
"concurrent read speed for %d bytes: %.2f MB/s",
"sequential read speed for %d bytes: %.2f MB/s",
fileSize*fileCnt,
float64(fileSize*fileCnt)/elapsed.Seconds()/1024/1024,
)

readMergeIter(suite)
readFileConcurrently(suite)
t.Logf(
"merge iter read speed for %d bytes: %.2f MB/s",
"concurrent read speed for %d bytes: %.2f MB/s",
fileSize*fileCnt,
float64(fileSize*fileCnt)/elapsed.Seconds()/1024/1024,
)
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/external/byte_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ func openStoreReaderAndSeek(
store storage.ExternalStorage,
name string,
initFileOffset uint64,
prefetchSize int,
) (storage.ExternalFileReader, error) {
storageReader, err := store.Open(ctx, name, nil)
storageReader, err := store.Open(ctx, name, &storage.ReaderOption{PrefetchSize: prefetchSize})
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/lightning/backend/external/kv_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ func newKVReader(
initFileOffset uint64,
bufSize int,
) (*kvReader, error) {
sr, err := openStoreReaderAndSeek(ctx, store, name, initFileOffset)
oneThird := bufSize / 3
sr, err := openStoreReaderAndSeek(ctx, store, name, initFileOffset, oneThird*2)
if err != nil {
return nil, err
}
br, err := newByteReader(ctx, sr, bufSize)
br, err := newByteReader(ctx, sr, oneThird)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/stat_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type statsReader struct {
}

func newStatsReader(ctx context.Context, store storage.ExternalStorage, name string, bufSize int) (*statsReader, error) {
sr, err := openStoreReaderAndSeek(ctx, store, name, 0)
sr, err := openStoreReaderAndSeek(ctx, store, name, 0, 0)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions br/pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"//br/pkg/logutil",
"//pkg/sessionctx/variable",
"//pkg/util/intest",
"//pkg/util/prefetch",
"@com_github_aliyun_alibaba_cloud_sdk_go//sdk/auth/credentials",
"@com_github_aliyun_alibaba_cloud_sdk_go//sdk/auth/credentials/providers",
"@com_github_aws_aws_sdk_go//aws",
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/pkg/util/prefetch"
"github.com/spf13/pflag"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -735,6 +736,9 @@ func (rs *S3Storage) Open(ctx context.Context, path string, o *ReaderOption) (Ex
if err != nil {
return nil, errors.Trace(err)
}
if o != nil && o.PrefetchSize > 0 {
reader = prefetch.NewReader(reader, o.PrefetchSize)
}
return &s3ObjectReader{
storage: rs,
name: path,
Expand Down Expand Up @@ -874,6 +878,9 @@ func (r *s3ObjectReader) Read(p []byte) (n int, err error) {
if maxCnt > int64(len(p)) {
maxCnt = int64(len(p))
}
if maxCnt == 0 {
return 0, io.EOF
}
n, err = r.reader.Read(p[:maxCnt])
// TODO: maybe we should use !errors.Is(err, io.EOF) here to avoid error lint, but currently, pingcap/errors
// doesn't implement this method yet.
Expand Down
21 changes: 21 additions & 0 deletions br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ type ReaderOption struct {
StartOffset *int64
// EndOffset is exclusive. And it's incompatible with Seek.
EndOffset *int64
// PrefetchSize will switch to NewPrefetchReader if value is positive.
PrefetchSize int
}

// ExternalStorage represents a kind of file system storage.
Expand Down Expand Up @@ -195,6 +197,25 @@ func NewWithDefaultOpt(ctx context.Context, backend *backuppb.StorageBackend) (E
return New(ctx, backend, &opts)
}

// NewFromURL creates an ExternalStorage from URL.
func NewFromURL(ctx context.Context, uri string, opts *ExternalStorageOptions) (ExternalStorage, error) {
if len(uri) == 0 {
return nil, errors.Annotate(berrors.ErrStorageInvalidConfig, "empty store is not allowed")
}
u, err := ParseRawURL(uri)
if err != nil {
return nil, errors.Trace(err)
}
if u.Scheme == "memstore" {
return NewMemStorage(), nil
}
b, err := parseBackend(u, uri, nil)
if err != nil {
return nil, errors.Trace(err)
}
return New(ctx, b, opts)
}

// New creates an ExternalStorage with options.
func New(ctx context.Context, backend *backuppb.StorageBackend, opts *ExternalStorageOptions) (ExternalStorage, error) {
if opts == nil {
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package storage_test

import (
"context"
"net/http"
"testing"

Expand All @@ -24,3 +25,10 @@ func TestDefaultHttpClient(t *testing.T) {
require.Equal(t, int(concurrency), transport.MaxIdleConnsPerHost)
require.Equal(t, int(concurrency), transport.MaxIdleConns)
}

func TestNewMemStorage(t *testing.T) {
url := "memstore://"
s, err := storage.NewFromURL(context.Background(), url, nil)
require.NoError(t, err)
require.IsType(t, (*storage.MemStorage)(nil), s)
}
Loading
Loading