Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[test]Add test for array data type #231

Merged
merged 1 commit into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ jobs:
matrix:
deploy_tools: [docker-compose]
milvus_mode: [standalone]
case_tag: [L0, L1]
case_tag: [L0, L1, L2]
exclude:
- deploy_tools: helm
milvus_mode: cluster
Expand Down Expand Up @@ -518,7 +518,7 @@ jobs:
if: ${{ ! success() }}
uses: actions/upload-artifact@v2
with:
name: api-test-logs-${{ matrix.deploy_tools }}-${{ matrix.milvus_mode }}
name: api-test-logs-${{ matrix.deploy_tools }}-${{ matrix.milvus_mode }}-${{ matrix.case_tag }}
path: |
./logs
./server.log
Expand Down
61 changes: 48 additions & 13 deletions tests/base/client_base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
import time
from pymilvus import DefaultConfig, DataType, db

sys.path.append("..")
Expand Down Expand Up @@ -338,37 +339,71 @@ def is_binary_by_schema(self, schema):
if field.dtype == DataType.FLOAT_VECTOR:
return False

def compare_collections(self, src_name, dist_name, output_fields=None):
def compare_collections(self, src_name, dist_name, output_fields=None, verify_by_query=False):
if output_fields is None:
output_fields = [ct.default_int64_field_name, ct.default_json_field_name]
output_fields = ["*"]
collection_src, _ = self.collection_wrap.init_collection(name=src_name)
collection_dist, _ = self.collection_wrap.init_collection(name=dist_name)
assert collection_src.num_entities == collection_dist.num_entities, \
f"collection_src {src_name} num_entities: {collection_src.num_entities} != " \
f"collection_dist {dist_name} num_entities: {collection_dist.num_entities}"
log.info(f"collection_src schema: {collection_src.schema}")
log.info(f"collection_dist schema: {collection_dist.schema}")
assert collection_src.schema == collection_dist.schema
# get partitions
partitions_src = collection_src.partitions
partitions_dist = collection_dist.partitions
log.info(f"partitions_src: {partitions_src}, partitions_dist: {partitions_dist}")
assert len(partitions_src) == len(partitions_dist)

# get num entities
src_num = collection_src.num_entities
dist_num = collection_dist.num_entities
log.info(f"src_num: {src_num}, dist_num: {dist_num}")
if not verify_by_query:
assert src_num == dist_num
return
for coll in [collection_src, collection_dist]:
is_binary = self.is_binary_by_schema(coll.schema)
if is_binary:
coll.create_index(ct.default_binary_vec_field_name, ct.default_bin_flat_index,
index_name=cf.gen_unique_str())
else:
coll.create_index(ct.default_float_vec_field_name, ct.default_index, index_name=cf.gen_unique_str())
try:
if is_binary:
coll.create_index(ct.default_binary_vec_field_name, ct.default_bin_flat_index,
index_name=cf.gen_unique_str())
else:
coll.create_index(ct.default_float_vec_field_name, ct.default_index, index_name=cf.gen_unique_str())
except Exception as e:
log.error(f"collection {coll.name} create index failed with error: {e}")
coll.load()
time.sleep(5)
# get entities by count
src_count = collection_src.query(
expr="",
output_fields=["count(*)"]
)
dist_count = collection_dist.query(
expr="",
output_fields=["count(*)"]
)
log.info(f"src count: {src_count}, dist count: {dist_count}")
src_res = collection_src.query(expr=f'{ct.default_int64_field_name} >= 0',
output_fields=output_fields)
log.info(f"src res: {len(src_res)}")
# log.info(f"src res: {len(src_res)}, src res: {src_res[-1]}")
dist_res = collection_dist.query(expr=f'{ct.default_int64_field_name} >= 0',
output_fields=output_fields)
log.info(f"dist res: {len(dist_res)}")
# log.info(f"dist res: {len(dist_res)}, dist res: {dist_res[-1]}")
assert len(dist_res) == len(src_res)

# sort by primary key and compare
src_res = sorted(src_res, key=lambda x: x[ct.default_int64_field_name])
dist_res = sorted(dist_res, key=lambda x: x[ct.default_int64_field_name])
src_pk = [r[ct.default_int64_field_name] for r in src_res]
dist_pk = [r[ct.default_int64_field_name] for r in dist_res]
diff = list(set(src_pk).difference(set(dist_pk)))
log.info(f"pk diff: {diff}")
for i in range(len(src_res)):
assert src_res[i] == dist_res[i]
for coll in [collection_src, collection_dist]:
try:
coll.release()
except Exception as e:
log.error(f"collection {coll.name} release failed with error: {e}")

def check_collection_binary(self, name):
collection_w, _ = self.collection_wrap.init_collection(name=name)
field_types = [field.dtype for field in collection_w.schema.fields]
Expand Down
12 changes: 12 additions & 0 deletions tests/base/collection_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,18 @@ def insert(self, data, partition_name=None, check_task=None, check_items=None, *
**kwargs).run()
return res, check_result

@trace()
def upsert(self, data, partition_name=None, check_task=None, check_items=None, **kwargs):
timeout = kwargs.get("timeout", TIMEOUT)
kwargs.update({"timeout": timeout})

func_name = sys._getframe().f_code.co_name
res, check = api_request([self.collection.upsert, data, partition_name], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, check,
dat=data, partition_name=partition_name,
**kwargs).run()
return res, check_result

# @trace()
# def flush(self, check_task=None, check_items=None, **kwargs):
# #TODO:currently, flush is not supported by sdk in milvus
Expand Down
5 changes: 5 additions & 0 deletions tests/common/common_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ def gen_json_field(name=ct.default_json_field_name, is_primary=False, descriptio
description=description, is_primary=is_primary)
return json_field

def gen_array_field(name=ct.default_array_field_name, is_primary=False, element_type=DataType.VARCHAR ,description=ct.default_desc):
array_field, _ = ApiFieldSchemaWrapper().init_field_schema(name=name, dtype=DataType.ARRAY,
description=description, is_primary=is_primary, element_type=element_type, max_capacity=2000, max_length=1500)
return array_field


def gen_float_vec_field(name=ct.default_float_vec_field_name, is_primary=False, dim=ct.default_dim,
description=ct.default_desc):
Expand Down
5 changes: 3 additions & 2 deletions tests/common/common_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
default_double_field_name = "double"
default_string_field_name = "varchar"
default_json_field_name = "json"
default_array_field_name = "array"
default_float_vec_field_name = "float_vector"
another_float_vec_field_name = "float_vector1"
default_binary_vec_field_name = "binary_vector"
Expand Down Expand Up @@ -73,8 +74,8 @@
err_msg = "err_msg"
in_cluster_env = "IN_CLUSTER"

default_flat_index = {"index_type": "FLAT", "params": {}, "metric_type": "L2"}
default_bin_flat_index = {"index_type": "BIN_FLAT", "params": {}, "metric_type": "JACCARD"}
default_flat_index = {"index_type": "IVF_SQ8", "metric_type": "COSINE", "params": {"nlist": 64}}
default_bin_flat_index = {"index_type": "BIN_IVF_FLAT", "params": {"nlist": 128}, "metric_type": "JACCARD"}

"""" List of parameters used to pass """
get_invalid_strs = [
Expand Down
2 changes: 1 addition & 1 deletion tests/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pytest-print==0.2.1
pytest-level==0.1.1
pytest-xdist==2.5.0
pytest-loguru==0.2.0
pymilvus==2.2.9.dev18
pymilvus==2.3.2
pytest-rerunfailures==9.1.1
git+https://github.com/Projectplace/pytest-tags
ndg-httpsclient
Expand Down
85 changes: 82 additions & 3 deletions tests/testcases/test_create_backup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import time
import pytest

from pymilvus import Collection
from base.client_base import TestcaseBase
from common import common_func as cf
from common import common_type as ct
Expand All @@ -16,7 +16,7 @@

class TestCreateBackup(TestcaseBase):
""" Test case of end to end"""
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.tags(CaseLabel.L0)
@pytest.mark.parametrize("is_async", [True, False])
@pytest.mark.parametrize("collection_need_to_backup", [1, 2, 3])
@pytest.mark.parametrize("collection_type", ["binary", "float", "all"])
Expand Down Expand Up @@ -67,9 +67,88 @@ def test_milvus_create_backup(self, collection_type, collection_need_to_backup,
assert len(backup_collections) == collection_need_to_backup
assert set(names_to_backup).issubset(backup_collections)

@pytest.mark.parametrize("is_async", [False])
@pytest.mark.parametrize("collection_need_to_backup", ["all"])
@pytest.mark.parametrize("collection_type", ["binary", "float", "all"])
@pytest.mark.parametrize("collection_load_status", ["loaded", "not_loaded"])
def test_milvus_create_backup_with_indexed_and_loaded(self, collection_type, collection_need_to_backup, is_async, collection_load_status):
# prepare data
names_origin = []
back_up_name = cf.gen_unique_str(backup_prefix)
if collection_type == "all":
for is_binary in [True, False, False]:
names_origin.append(cf.gen_unique_str(prefix))
self.prepare_data(names_origin[-1], is_binary=is_binary, check_function=True)
if collection_type == "float":
for is_binary in [False, False, False]:
names_origin.append(cf.gen_unique_str(prefix))
self.prepare_data(names_origin[-1], is_binary=is_binary, check_function=True)
if collection_type == "binary":
for is_binary in [True, True, True]:
names_origin.append(cf.gen_unique_str(prefix))
self.prepare_data(names_origin[-1], is_binary=is_binary, check_function=True)
log.info(f"name_origin:{names_origin}, back_up_name: {back_up_name}")
if collection_load_status == "loaded":
for name in names_origin:
c = Collection(name=name)
c.load()
if collection_load_status == "not_loaded":
for name in names_origin:
c = Collection(name=name)
c.load()
c.release()
collection_info = {}
for name in names_origin:
d = {}
res, _ = self.utility_wrap.has_collection(name)
assert res is True
c = Collection(name=name)
index_info = [x.to_dict() for x in c.indexes]

loaded = "NotLoad"
try:
c.get_replicas()
loaded = "Loaded"
except Exception as e:
log.error(f"get replicas failed: {e}")
collection_info[name] = {
"index_info": index_info,
"load_state": loaded
}
log.info(f"collection_info: {collection_info}")


# create backup
names_to_backup = []
if collection_need_to_backup == "all":
names_to_backup = names_origin
payload = {"async": is_async, "backup_name": back_up_name}
else:
names_need_backup = names_origin[:collection_need_to_backup]
payload = {"async": is_async, "backup_name": back_up_name, "collection_names": names_need_backup}
res = client.create_backup(payload)
log.info(f"create backup response: {res}")
if is_async:
res = client.wait_create_backup_complete(back_up_name)
assert res is True
backup_info = res["data"]["collection_backups"]
# check load state and index info in backup
for backup in backup_info:
c_name = backup["collection_name"]
assert backup["load_state"] == collection_info[c_name]["load_state"]
assert len(backup["index_infos"]) == len(collection_info[c_name]["index_info"])
res = client.list_backup()
log.info(f"list backup response: {res}")
if "data" in res:
all_backup = [r["name"] for r in res["data"]]
else:
all_backup = []
assert back_up_name in all_backup
backup = client.get_backup(back_up_name)
assert backup["data"]["name"] == back_up_name
backup_collections = [backup["collection_name"]for backup in backup["data"]["collection_backups"]]
if isinstance(collection_need_to_backup, int):
assert len(backup_collections) == collection_need_to_backup
assert set(names_to_backup).issubset(backup_collections)



Expand Down
2 changes: 1 addition & 1 deletion tests/testcases/test_get_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

class TestGetBackup(TestcaseBase):
""" Test case of end to end"""
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("is_async", [True, False])
@pytest.mark.parametrize("backup_num", [1, 2, 3])
def test_milvus_get_backup(self, backup_num, is_async):
Expand Down
2 changes: 1 addition & 1 deletion tests/testcases/test_get_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

class TestGetRestore(TestcaseBase):
""" Test case of end to end"""
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("is_async", [True, False])
@pytest.mark.parametrize("restore_num", [1, 2, 3])
def test_milvus_get_restore(self, restore_num, is_async):
Expand Down
2 changes: 1 addition & 1 deletion tests/testcases/test_list_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

class TestListBackup(TestcaseBase):
""" Test case of end to end"""
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("is_async", [True, False])
@pytest.mark.parametrize("backup_num", [1, 2, 3])
def test_milvus_list_backup(self, backup_num, is_async):
Expand Down
Loading
Loading