Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat]: Packed reader multi files test #161

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ on:
jobs:
test:
name: Test
runs-on: ubuntu-latest
runs-on: ubuntu-22.04

steps:
- name: Checkout code
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/cpp-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
- '!go/**'
jobs:
unittest:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3

Expand Down
16 changes: 9 additions & 7 deletions cpp/include/milvus-storage/common/macro.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,18 @@ namespace milvus_storage {
#undef RETURN_NOT_OK
#define RETURN_NOT_OK(status) \
do { \
if (!(status).ok()) { \
return status; \
auto _s = (status); \
if (!_s.ok()) { \
return _s; \
} \
} while (false)

#define RETURN_ARROW_NOT_OK(status) \
do { \
if (!(status).ok()) { \
return Status::ArrowError((status).ToString()); \
} \
#define RETURN_ARROW_NOT_OK(status) \
do { \
auto _s = (status); \
if (!_s.ok()) { \
return Status::ArrowError((_s).ToString()); \
} \
} while (false)

#define RETURN_ARROW_NOT_OK_WITH_PREFIX(msg, staus) \
Expand Down
6 changes: 6 additions & 0 deletions cpp/include/milvus-storage/packed/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
arrow::Status Close() override;

private:
void initialize(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
const std::set<int>& needed_columns,
const int64_t buffer_size);

Status initializeColumnOffsets(arrow::fs::FileSystem& fs, const std::set<int>& needed_columns, size_t num_fields);
// Advance buffer to fill the expected buffer size
arrow::Status advanceBuffer();
Expand Down
23 changes: 21 additions & 2 deletions cpp/include/milvus-storage/packed/reader_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,34 @@ typedef void* CPackedReader;
typedef void* CArrowArray;
typedef void* CArrowSchema;

int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out);

/**
* @brief Open a packed reader to read needed columns in the specified path.
*
* @param path The root path of the packed files to read.
* @param schema The original schema of data.
* @param buffer_size The max buffer size of the packed reader.
* @param c_packed_reader The output pointer of the packed reader.
*/
int NewPackedReader(const char* path,
struct ArrowSchema* schema,
const int64_t buffer_size,
CPackedReader* c_packed_reader);

/**
* @brief Read the next record batch from the packed reader.
* By default, the maximum return batch is 1024 rows.
*
* @param c_packed_reader The packed reader to read.
* @param out_array The output pointer of the arrow array.
* @param out_schema The output pointer of the arrow schema.
*/
int ReadNext(CPackedReader c_packed_reader, CArrowArray* out_array, CArrowSchema* out_schema);

/**
* @brief Close the packed reader and release the resources.
*
* @param c_packed_reader The packed reader to close.
*/
int CloseReader(CPackedReader c_packed_reader);

#ifdef __cplusplus
Expand Down
2 changes: 0 additions & 2 deletions cpp/src/packed/column_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
#include <arrow/table.h>
#include "common/status.h"

using namespace std;

namespace milvus_storage {

ColumnGroup::ColumnGroup(GroupId group_id, const std::vector<int>& origin_column_indices)
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/packed/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
row_limit_(0),
absolute_row_position_(0),
read_count_(0) {
initialize(fs, file_path_, schema_, needed_columns, buffer_size);
}

void PackedRecordBatchReader::initialize(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
const std::set<int>& needed_columns,
const int64_t buffer_size) {
auto status = initializeColumnOffsets(fs, needed_columns, schema->num_fields());
if (!status.ok()) {
throw std::runtime_error(status.ToString());
Expand Down
26 changes: 0 additions & 26 deletions cpp/src/packed/reader_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,6 @@
#include <arrow/status.h>
#include <memory>

int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out) {
auto truePath = std::string(path);
auto factory = std::make_shared<milvus_storage::FileSystemFactory>();
auto conf = milvus_storage::StorageConfig();
conf.uri = "file:///tmp/";
auto r = factory->BuildFileSystem(conf, &truePath);
if (!r.ok()) {
LOG_STORAGE_ERROR_ << "Error building filesystem: " << path;
return -2;
}
auto trueFs = r.value();
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
std::set<int> needed_columns;
for (int i = 0; i < trueSchema->num_fields(); i++) {
needed_columns.emplace(i);
}
auto reader =
std::make_shared<milvus_storage::PackedRecordBatchReader>(*trueFs, path, trueSchema, needed_columns, buffer_size);
auto status = ExportRecordBatchReader(reader, out);
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Error exporting record batch reader" << status.ToString();
return static_cast<int>(status.code());
}
return 0;
}

int NewPackedReader(const char* path,
struct ArrowSchema* schema,
const int64_t buffer_size,
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/packed/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ Status PackedRecordBatchWriter::writeWithSplitIndex(const std::shared_ptr<arrow:
// Flush column groups until there's enough room for the new column groups
// to ensure that memory usage stays strictly below the limit
while (current_memory_usage_ + next_batch_size >= memory_limit_ && !max_heap_.empty()) {
LOG_STORAGE_DEBUG_ << "Current memory usage: " << current_memory_usage_
LOG_STORAGE_DEBUG_ << "Current memory usage: " << current_memory_usage_ / 1024 / 1024 << " MB, "
<< ", flushing column group: " << max_heap_.top().first;
auto max_group = max_heap_.top();
max_heap_.pop();
current_memory_usage_ -= max_group.second;

ColumnGroupWriter* writer = group_writers_[max_group.first].get();
max_heap_.pop();
RETURN_NOT_OK(writer->Flush());
}

Expand Down
4 changes: 0 additions & 4 deletions cpp/test/packed/packed_integration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ TEST_F(PackedIntegrationTest, TestOneFile) {
}
EXPECT_TRUE(writer.Close().ok());

std::vector<std::string> paths = {file_path_ + "/0"};

std::set<int> needed_columns = {0, 1, 2};

PackedRecordBatchReader pr(*fs_, file_path_, schema_, needed_columns, reader_memory_);
Expand All @@ -47,8 +45,6 @@ TEST_F(PackedIntegrationTest, TestSplitColumnGroup) {
}
EXPECT_TRUE(writer.Close().ok());

std::vector<std::string> paths = {file_path_ + "/0", file_path_ + "/1"};

std::set<int> needed_columns = {0, 1, 2};

PackedRecordBatchReader pr(*fs_, file_path_, schema_, needed_columns, reader_memory_);
Expand Down
2 changes: 1 addition & 1 deletion go/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ test:
LD_LIBRARY_PATH=$(MILVUS_STORAGE_LD_DIR):$$LD_LIBRARY_PATH \
CGO_CFLAGS="$(CPPFLAGS)" \
CGO_LDFLAGS="$(LDFLAGS) -lmilvus-storage" \
go test -count=1 -timeout 30s ./...
go test -count=1 -timeout 30s ./... -gcflags "all=-N -l" -o gdb/

proto:
mkdir -p proto/manifest_proto
Expand Down
6 changes: 3 additions & 3 deletions go/packed/packed_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/apache/arrow/go/v12/arrow/cdata"
)

func newPackedReader(path string, schema *arrow.Schema, bufferSize int) (*PackedReader, error) {
func NewPackedReader(path string, schema *arrow.Schema, bufferSize int) (*PackedReader, error) {
var cas cdata.CArrowSchema
cdata.ExportArrowSchema(schema, &cas)
cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas))
Expand All @@ -49,7 +49,7 @@ func newPackedReader(path string, schema *arrow.Schema, bufferSize int) (*Packed
return &PackedReader{cPackedReader: cPackedReader, schema: schema}, nil
}

func (pr *PackedReader) readNext() (arrow.Record, error) {
func (pr *PackedReader) ReadNext() (arrow.Record, error) {
var cArr C.CArrowArray
var cSchema C.CArrowSchema
status := C.ReadNext(pr.cPackedReader, &cArr, &cSchema)
Expand All @@ -73,7 +73,7 @@ func (pr *PackedReader) readNext() (arrow.Record, error) {
return recordBatch, nil
}

func (pr *PackedReader) close() error {
func (pr *PackedReader) Close() error {
status := C.CloseReader(pr.cPackedReader)
if status != 0 {
return errors.New("PackedReader: failed to close file")
Expand Down
87 changes: 81 additions & 6 deletions go/packed/packed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/stretchr/testify/assert"
"golang.org/x/exp/rand"
)

func TestPacked(t *testing.T) {
func TestPackedOneFile(t *testing.T) {
batches := 100
schema := arrow.NewSchema([]arrow.Field{
{Name: "a", Type: arrow.PrimitiveTypes.Int32},
Expand Down Expand Up @@ -53,19 +54,93 @@ func TestPacked(t *testing.T) {
defer rec.Release()
path := "/tmp"
bufferSize := 10 * 1024 * 1024 // 10MB
pw, err := newPackedWriter(path, schema, bufferSize)
pw, err := NewPackedWriter(path, schema, bufferSize)
assert.NoError(t, err)
for i := 0; i < batches; i++ {
err = pw.writeRecordBatch(rec)
err = pw.WriteRecordBatch(rec)
assert.NoError(t, err)
}
err = pw.close()
err = pw.Close()
assert.NoError(t, err)

reader, err := newPackedReader(path, schema, bufferSize)
reader, err := NewPackedReader(path, schema, bufferSize)
assert.NoError(t, err)
rr, err := reader.readNext()
rr, err := reader.ReadNext()
assert.NoError(t, err)
defer rr.Release()
assert.Equal(t, int64(3*batches), rr.NumRows())
}

func TestPackedMultiFiles(t *testing.T) {
batches := 1000
schema := arrow.NewSchema([]arrow.Field{
{Name: "a", Type: arrow.PrimitiveTypes.Int32},
{Name: "b", Type: arrow.PrimitiveTypes.Int64},
{Name: "c", Type: arrow.BinaryTypes.String},
}, nil)

b := array.NewRecordBuilder(memory.DefaultAllocator, schema)
strLen := 1000
arrLen := 30
defer b.Release()
for idx := range schema.Fields() {
switch idx {
case 0:
values := make([]int32, arrLen)
for i := 0; i < arrLen; i++ {
values[i] = int32(i + 1)
}
b.Field(idx).(*array.Int32Builder).AppendValues(values, nil)
case 1:
values := make([]int64, arrLen)
for i := 0; i < arrLen; i++ {
values[i] = int64(i + 1)
}
b.Field(idx).(*array.Int64Builder).AppendValues(values, nil)
case 2:
values := make([]string, arrLen)
for i := 0; i < arrLen; i++ {
values[i] = randomString(strLen)
}
b.Field(idx).(*array.StringBuilder).AppendValues(values, nil)
}
}
rec := b.NewRecord()
defer rec.Release()
path := "/tmp"
bufferSize := 10 * 1024 * 1024 // 10MB
pw, err := NewPackedWriter(path, schema, bufferSize)
assert.NoError(t, err)
for i := 0; i < batches; i++ {
err = pw.WriteRecordBatch(rec)
assert.NoError(t, err)
}
err = pw.Close()
assert.NoError(t, err)

reader, err := NewPackedReader(path, schema, bufferSize)
assert.NoError(t, err)
var rows int64 = 0
var rr arrow.Record
for {
rr, err = reader.ReadNext()
assert.NoError(t, err)
if rr == nil {
// end of file
break
}

rows += rr.NumRows()
}

assert.Equal(t, int64(arrLen*batches), rows)
}

func randomString(length int) string {
const charset = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
result := make([]byte, length)
for i := range result {
result[i] = charset[rand.Intn(len(charset))]
}
return string(result)
}
6 changes: 3 additions & 3 deletions go/packed/packed_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/apache/arrow/go/v12/arrow/cdata"
)

func newPackedWriter(path string, schema *arrow.Schema, bufferSize int) (*PackedWriter, error) {
func NewPackedWriter(path string, schema *arrow.Schema, bufferSize int) (*PackedWriter, error) {
var cas cdata.CArrowSchema
cdata.ExportArrowSchema(schema, &cas)
cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas))
Expand All @@ -49,7 +49,7 @@ func newPackedWriter(path string, schema *arrow.Schema, bufferSize int) (*Packed
return &PackedWriter{cPackedWriter: cPackedWriter}, nil
}

func (pw *PackedWriter) writeRecordBatch(recordBatch arrow.Record) error {
func (pw *PackedWriter) WriteRecordBatch(recordBatch arrow.Record) error {
var caa cdata.CArrowArray
var cas cdata.CArrowSchema

Expand All @@ -66,7 +66,7 @@ func (pw *PackedWriter) writeRecordBatch(recordBatch arrow.Record) error {
return nil
}

func (pw *PackedWriter) close() error {
func (pw *PackedWriter) Close() error {
status := C.CloseWriter(pw.cPackedWriter)
if status != 0 {
return errors.New("PackedWriter: failed to close file")
Expand Down
Loading