Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 275 additions & 1 deletion tests/python_client/milvus_client/test_milvus_client_geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3096,6 +3096,193 @@ def test_st_dwithin_edge_cases(self, with_geo_index):
"Larger distances should return same or more results"
)

@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("with_geo_index", [True, False])
def test_st_isvalid_with_invalid_geometries(self, with_geo_index):
"""
target: test ST_ISVALID operator can detect invalid geometries
method: insert both valid and invalid geometries (self-intersecting polygons), use ST_ISVALID to filter
expected: ST_ISVALID returns only valid geometries, NOT ST_ISVALID returns invalid geometries
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()

# Create collection with geometry field
schema, _ = self.create_schema(client, auto_id=False)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field("geo", DataType.GEOMETRY)

self.create_collection(client, collection_name, schema=schema)

# Valid geometries
valid_geometries = [
("POINT (10 20)", True),
("LINESTRING (0 0, 10 10, 20 20)", True),
("POLYGON ((0 0, 10 0, 10 10, 0 10, 0 0))", True), # Simple valid polygon
]

# Invalid geometries - self-intersecting polygons (bowtie/figure-8 shape)
invalid_geometries = [
("POLYGON ((0 0, 10 10, 10 0, 0 10, 0 0))", False), # Self-intersecting (bowtie)
("POLYGON ((0 0, 20 10, 10 0, 20 0, 0 10, 0 0))", False), # Another self-intersecting
]

all_geometries = valid_geometries + invalid_geometries

data = []
for i, (geo_wkt, is_valid) in enumerate(all_geometries):
data.append({
"id": i,
"vector": [random.random() for _ in range(default_dim)],
"geo": geo_wkt
})

self.insert(client, collection_name, data)
self.flush(client, collection_name)

# Create indexes
index_params, _ = self.prepare_index_params(client)
index_params.add_index(
field_name="vector", index_type="IVF_FLAT", metric_type="L2", nlist=128
)
if with_geo_index:
index_params.add_index(field_name="geo", index_type="RTREE")

self.create_index(client, collection_name, index_params=index_params)
self.load_collection(client, collection_name)

# Test ST_ISVALID - should return only valid geometries
results_valid, _ = self.query(
client,
collection_name=collection_name,
filter="ST_ISVALID(geo)",
output_fields=["id", "geo"],
)

valid_ids = {i for i, (_, is_valid) in enumerate(all_geometries) if is_valid}
result_ids = {r["id"] for r in results_valid}
assert result_ids == valid_ids, (
f"ST_ISVALID should return valid geometry IDs {valid_ids}, "
f"got {result_ids} (index={with_geo_index})"
)

# Test NOT ST_ISVALID - should return only invalid geometries
results_invalid, _ = self.query(
client,
collection_name=collection_name,
filter="not ST_ISVALID(geo)",
output_fields=["id", "geo"],
)

invalid_ids = {i for i, (_, is_valid) in enumerate(all_geometries) if not is_valid}
result_invalid_ids = {r["id"] for r in results_invalid}
assert result_invalid_ids == invalid_ids, (
f"NOT ST_ISVALID should return invalid geometry IDs {invalid_ids}, "
f"got {result_invalid_ids} (index={with_geo_index})"
)

@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("with_geo_index", [True, False])
def test_st_isvalid_basic(self, with_geo_index):
"""
target: test ST_ISVALID operator basic functionality
method: insert valid WKT geometries, query using ST_ISVALID, test NOT ST_ISVALID and combined conditions
expected: ST_ISVALID returns true for all valid geometries, correct filtering with logical operators
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()

# Create collection with geometry field and category
schema, _ = self.create_schema(client, auto_id=False)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field("geo", DataType.GEOMETRY)
schema.add_field("category", DataType.INT64)

self.create_collection(client, collection_name, schema=schema)

# Insert valid geometry data of various types
valid_geometries = [
"POINT (10.5 20.3)",
"LINESTRING (0 0, 10 10, 20 25)",
"POLYGON ((0 0, 10 0, 10 10, 0 10, 0 0))",
"MULTIPOINT ((0 0), (10 10), (20 20))",
"MULTILINESTRING ((0 0, 10 10), (20 20, 30 30))",
"MULTIPOLYGON (((0 0, 5 0, 5 5, 0 5, 0 0)), ((10 10, 15 10, 15 15, 10 15, 10 10)))",
"GEOMETRYCOLLECTION (POINT(10 10), LINESTRING(0 0, 5 5))",
]

data = []
for i, geo_wkt in enumerate(valid_geometries):
data.append({
"id": i,
"vector": [random.random() for _ in range(default_dim)],
"geo": geo_wkt,
"category": i % 2
})

self.insert(client, collection_name, data)
self.flush(client, collection_name)

# Create indexes
index_params, _ = self.prepare_index_params(client)
index_params.add_index(
field_name="vector", index_type="IVF_FLAT", metric_type="L2", nlist=128
)
if with_geo_index:
index_params.add_index(field_name="geo", index_type="RTREE")

self.create_index(client, collection_name, index_params=index_params)
self.load_collection(client, collection_name)

# Test 1: ST_ISVALID returns all valid geometries
results, _ = self.query(
client,
collection_name=collection_name,
filter="ST_ISVALID(geo)",
output_fields=["id", "geo"],
)
assert len(results) == len(valid_geometries), (
f"ST_ISVALID should return all {len(valid_geometries)} valid geometries, "
f"got {len(results)} (index={with_geo_index})"
)

# Test 2: NOT ST_ISVALID returns empty for all valid geometries
results_not, _ = self.query(
client,
collection_name=collection_name,
filter="not ST_ISVALID(geo)",
output_fields=["id", "geo"],
)
assert len(results_not) == 0, (
f"NOT ST_ISVALID should return 0 results, got {len(results_not)} (index={with_geo_index})"
)

# Test 3: ST_ISVALID combined with other condition
results_and, _ = self.query(
client,
collection_name=collection_name,
filter="ST_ISVALID(geo) and category == 1",
output_fields=["id", "geo", "category"],
)
expected_category_1 = {i for i in range(len(valid_geometries)) if i % 2 == 1}
result_ids = {r["id"] for r in results_and}
assert result_ids == expected_category_1, (
f"ST_ISVALID AND category==1 should return IDs {expected_category_1}, "
f"got {result_ids} (index={with_geo_index})"
)

# Test 4: Case insensitive (lowercase)
results_lower, _ = self.query(
client,
collection_name=collection_name,
filter="st_isvalid(geo)",
output_fields=["id"],
)
assert len(results_lower) == len(valid_geometries), (
f"st_isvalid (lowercase) should return all records (index={with_geo_index})"
)



Expand Down Expand Up @@ -3714,4 +3901,91 @@ def test_st_dwithin_invalid_base_data_coordinates(self):
[invalid_data],
check_task=CheckTasks.err_res,
check_items=error_invalid_insert,
)
)

@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("with_geo_index", [True, False])
def test_st_isvalid_invalid_usage(self, with_geo_index):
"""
target: test ST_ISVALID error handling for invalid usage
method: test ST_ISVALID on non-geometry fields, with wrong parameters, and nonexistent fields
expected: should raise appropriate errors
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()

# Create collection with geometry and non-geometry fields
schema, _ = self.create_schema(client, auto_id=False)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field("geo", DataType.GEOMETRY)
schema.add_field("int_field", DataType.INT64)

self.create_collection(client, collection_name, schema=schema)

# Insert test data
data = [{
"id": 0,
"vector": [random.random() for _ in range(default_dim)],
"geo": "POINT (10 20)",
"int_field": 100
}]

self.insert(client, collection_name, data)
self.flush(client, collection_name)

# Create indexes
index_params, _ = self.prepare_index_params(client)
index_params.add_index(
field_name="vector", index_type="IVF_FLAT", metric_type="L2", nlist=128
)
if with_geo_index:
index_params.add_index(field_name="geo", index_type="RTREE")

self.create_index(client, collection_name, index_params=index_params)
self.load_collection(client, collection_name)

error = {
ct.err_code: 1100,
ct.err_msg: "failed to create query plan: cannot parse expression",
}

# Test 1: ST_ISVALID on non-geometry field
self.query(
client,
collection_name=collection_name,
filter="ST_ISVALID(int_field)",
output_fields=["id"],
check_task=CheckTasks.err_res,
check_items=error,
)

# Test 2: ST_ISVALID with no parameters
self.query(
client,
collection_name=collection_name,
filter="ST_ISVALID()",
output_fields=["id", "geo"],
check_task=CheckTasks.err_res,
check_items=error,
)

# Test 3: ST_ISVALID with too many parameters
self.query(
client,
collection_name=collection_name,
filter="ST_ISVALID(geo, 1)",
output_fields=["id", "geo"],
check_task=CheckTasks.err_res,
check_items=error,
)

# Test 4: ST_ISVALID with non-existent field
self.query(
client,
collection_name=collection_name,
filter="ST_ISVALID(nonexistent_geo_field)",
output_fields=["id", "geo"],
check_task=CheckTasks.err_res,
check_items=error,
)