Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into merge-major-100
Browse files Browse the repository at this point in the history
  • Loading branch information
wayblink committed Jun 6, 2024
2 parents 1be0672 + 27cc9f2 commit d83a840
Show file tree
Hide file tree
Showing 69 changed files with 2,183 additions and 2,374 deletions.
12 changes: 12 additions & 0 deletions internal/core/src/index/IndexFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@ IndexFactory::CreateVectorIndex(
return std::make_unique<VectorDiskAnnIndex<bfloat16>>(
index_type, metric_type, version, file_manager_context);
}
case DataType::VECTOR_BINARY: {
return std::make_unique<VectorDiskAnnIndex<bin1>>(
index_type, metric_type, version, file_manager_context);
}
default:
throw SegcoreError(
DataTypeInvalid,
Expand Down Expand Up @@ -316,6 +320,14 @@ IndexFactory::CreateVectorIndex(
space,
file_manager_context);
}
case DataType::VECTOR_BINARY: {
return std::make_unique<VectorDiskAnnIndex<bin1>>(
index_type,
metric_type,
version,
space,
file_manager_context);
}
default:
throw SegcoreError(
DataTypeInvalid,
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/index/VectorDiskIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,5 +513,6 @@ VectorDiskAnnIndex<T>::update_load_json(const Config& config) {
template class VectorDiskAnnIndex<float>;
template class VectorDiskAnnIndex<float16>;
template class VectorDiskAnnIndex<bfloat16>;
template class VectorDiskAnnIndex<bin1>;

} // namespace milvus::index
7 changes: 0 additions & 7 deletions internal/core/src/storage/DataCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ DeserializeFileData(const std::shared_ptr<uint8_t[]> input_data,
int64_t length) {
auto binlog_reader = std::make_shared<BinlogReader>(input_data, length);
auto medium_type = ReadMediumType(binlog_reader);
auto start_deserialize = std::chrono::system_clock::now();
std::unique_ptr<DataCodec> res;
switch (medium_type) {
case StorageType::Remote: {
Expand All @@ -118,12 +117,6 @@ DeserializeFileData(const std::shared_ptr<uint8_t[]> input_data,
PanicInfo(DataFormatBroken,
fmt::format("unsupported medium type {}", medium_type));
}
auto deserialize_duration =
std::chrono::system_clock::now() - start_deserialize;
LOG_INFO("DeserializeFileData_deserialize_duration_ms:{}",
std::chrono::duration_cast<std::chrono::milliseconds>(
deserialize_duration)
.count());
return res;
}

Expand Down
6 changes: 6 additions & 0 deletions internal/core/src/storage/DiskFileManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,9 @@ template std::string
DiskFileManagerImpl::CacheRawDataToDisk<bfloat16>(
std::vector<std::string> remote_files);
template std::string
DiskFileManagerImpl::CacheRawDataToDisk<bin1>(
std::vector<std::string> remote_files);
template std::string
DiskFileManagerImpl::CacheRawDataToDisk<float>(
std::shared_ptr<milvus_storage::Space> space);
template std::string
Expand All @@ -846,5 +849,8 @@ DiskFileManagerImpl::CacheRawDataToDisk<float16>(
template std::string
DiskFileManagerImpl::CacheRawDataToDisk<bfloat16>(
std::shared_ptr<milvus_storage::Space> space);
template std::string
DiskFileManagerImpl::CacheRawDataToDisk<bin1>(
std::shared_ptr<milvus_storage::Space> space);

} // namespace milvus::storage
2 changes: 1 addition & 1 deletion internal/core/thirdparty/knowhere/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#-------------------------------------------------------------------------------

# Update KNOWHERE_VERSION for the first occurrence
set( KNOWHERE_VERSION 74997917 )
set( KNOWHERE_VERSION d2047097 )
set( GIT_REPOSITORY "https://github.com/zilliztech/knowhere.git")
message(STATUS "Knowhere repo: ${GIT_REPOSITORY}")
message(STATUS "Knowhere version: ${KNOWHERE_VERSION}")
Expand Down
22 changes: 16 additions & 6 deletions internal/datacoord/compaction_task_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,17 +288,21 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
func (t *l0CompactionTask) processMetaSaved() bool {
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
if err == nil {
t.resetSegmentCompacting()
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
log.Info("handleCompactionResult: success to handle l0 compaction result")
return t.processCompleted()
}
return err == nil
return false
}

func (t *l0CompactionTask) processCompleted() bool {
for _, segmentBinlogs := range t.GetPlan().GetSegmentBinlogs() {
t.meta.SetSegmentCompacting(segmentBinlogs.GetSegmentID(), false)
if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
}); err != nil {
return false
}

t.resetSegmentCompacting()
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
log.Info("handleCompactionResult: success to handle l0 compaction result")
return true
}

Expand All @@ -314,6 +318,12 @@ func (t *l0CompactionTask) processTimeout() bool {
}

func (t *l0CompactionTask) processFailed() bool {
if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
}); err != nil {
return false
}

t.resetSegmentCompacting()
return true
}
Expand Down
34 changes: 23 additions & 11 deletions internal/datacoord/compaction_task_mix.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,9 @@ func (t *mixCompactionTask) processPipelining() bool {
func (t *mixCompactionTask) processMetaSaved() bool {
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
if err == nil {
t.resetSegmentCompacting()
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
log.Info("handleCompactionResult: success to handle merge compaction result")
return t.processCompleted()
}
return err == nil
return false
}

func (t *mixCompactionTask) processExecuting() bool {
Expand All @@ -76,11 +74,13 @@ func (t *mixCompactionTask) processExecuting() bool {
return false
case datapb.CompactionTaskState_completed:
t.result = result
result := t.result
if len(result.GetSegments()) == 0 || len(result.GetSegments()) > 1 {
log.Info("illegal compaction results")
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
return err == nil
if err != nil {
return false
}
return t.processFailed()
}
saveSuccess := t.saveSegmentMeta()
if !saveSuccess {
Expand Down Expand Up @@ -165,10 +165,16 @@ func (t *mixCompactionTask) NeedReAssignNodeID() bool {
}

func (t *mixCompactionTask) processCompleted() bool {
for _, segmentBinlogs := range t.GetPlan().GetSegmentBinlogs() {
t.meta.SetSegmentCompacting(segmentBinlogs.GetSegmentID(), false)
err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
})
if err == nil {
t.resetSegmentCompacting()
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
log.Info("handleCompactionResult: success to handle merge compaction result")
}
return true

return err == nil
}

func (t *mixCompactionTask) resetSegmentCompacting() {
Expand Down Expand Up @@ -211,8 +217,14 @@ func (t *mixCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compa
}

func (t *mixCompactionTask) processFailed() bool {
t.resetSegmentCompacting()
return true
err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
})
if err == nil {
t.resetSegmentCompacting()
}

return err == nil
}

func (t *mixCompactionTask) checkTimeout() bool {
Expand Down
3 changes: 3 additions & 0 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,8 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() {
Segments: []*datapb.CompactionSegment{{PlanID: 6}},
}, nil).Once()

s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)

inTasks := map[int64]CompactionTask{
1: &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
Expand Down Expand Up @@ -774,6 +776,7 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() {
}

s.mockSessMgr.EXPECT().GetCompactionPlanResult(UniqueID(111), int64(1)).Return(&compactionResult, nil).Once()
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)

s.handler.submitTask(task)
s.handler.doSchedule()
Expand Down
3 changes: 2 additions & 1 deletion internal/datacoord/garbage_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,8 @@ func (gc *garbageCollector) recycleUnusedIndexFiles(ctx context.Context) {
logger.Warn("garbageCollector recycleUnusedIndexFiles parseIndexFileKey", zap.Error(err))
return true
}
log.Info("garbageCollector will recycle index files", zap.Int64("buildID", buildID))
logger = logger.With(zap.Int64("buildID", buildID))
logger.Info("garbageCollector will recycle index files")
canRecycle, segIdx := gc.meta.indexMeta.CheckCleanSegmentIndex(buildID)
if !canRecycle {
// Even if the index is marked as deleted, the index file will not be recycled, wait for the next gc,
Expand Down
43 changes: 43 additions & 0 deletions internal/datacoord/mock_session_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions internal/datacoord/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,10 @@ func (c *mockDataNodeClient) QuerySlot(ctx context.Context, req *datapb.QuerySlo
return &datapb.QuerySlotResponse{Status: merr.Success()}, nil
}

func (c *mockDataNodeClient) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
return merr.Success(), nil
}

func (c *mockDataNodeClient) Stop() error {
c.state = commonpb.StateCode_Abnormal
return nil
Expand Down
39 changes: 39 additions & 0 deletions internal/datacoord/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"time"

"github.com/cockroachdb/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -70,6 +71,7 @@ type SessionManager interface {
DropImport(nodeID int64, in *datapb.DropImportRequest) error
CheckHealth(ctx context.Context) error
QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error)
DropCompactionPlan(nodeID int64, req *datapb.DropCompactionPlanRequest) error
Close()
}

Expand Down Expand Up @@ -547,6 +549,43 @@ func (c *SessionManagerImpl) QuerySlot(nodeID int64) (*datapb.QuerySlotResponse,
return resp, nil
}

func (c *SessionManagerImpl) DropCompactionPlan(nodeID int64, req *datapb.DropCompactionPlanRequest) error {
log := log.With(
zap.Int64("nodeID", nodeID),
zap.Int64("planID", req.GetPlanID()),
)
ctx, cancel := context.WithTimeout(context.Background(), Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second))
defer cancel()
cli, err := c.getClient(ctx, nodeID)
if err != nil {
if errors.Is(err, merr.ErrNodeNotFound) {
log.Info("node not found, skip dropping compaction plan")
return nil
}
log.Warn("failed to get client", zap.Error(err))
return err
}

err = retry.Do(context.Background(), func() error {
ctx, cancel := context.WithTimeout(context.Background(), Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second))
defer cancel()

resp, err := cli.DropCompactionPlan(ctx, req)
if err := VerifyResponse(resp, err); err != nil {
log.Warn("failed to drop compaction plan", zap.Error(err))
return err
}
return nil
})
if err != nil {
log.Warn("failed to drop compaction plan after retry", zap.Error(err))
return err
}

log.Info("success to drop compaction plan")
return nil
}

// Close release sessions
func (c *SessionManagerImpl) Close() {
c.sessions.Lock()
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/sync_segments_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (sss *SyncSegmentsScheduler) SyncSegments(collectionID, partitionID int64,
log := log.With(zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID),
zap.String("channelName", channelName), zap.Int64("nodeID", nodeID))
segments := sss.meta.SelectSegments(WithChannel(channelName), SegmentFilterFunc(func(info *SegmentInfo) bool {
return info.GetPartitionID() == partitionID && isSegmentHealthy(info)
return info.GetPartitionID() == partitionID && isSegmentHealthy(info) && info.GetLevel() != datapb.SegmentLevel_L0
}))
req := &datapb.SyncSegmentsRequest{
ChannelName: channelName,
Expand Down
Loading

0 comments on commit d83a840

Please sign in to comment.