From 88eca35f054ffa4a8fbe538aaca020633eb667cf Mon Sep 17 00:00:00 2001 From: zhuwenxing Date: Tue, 9 Dec 2025 17:35:57 +0800 Subject: [PATCH] test: add ST_ISVALID geometry function test cases Signed-off-by: zhuwenxing --- .../test_milvus_client_geometry.py | 276 +++++++++++++++++- 1 file changed, 275 insertions(+), 1 deletion(-) diff --git a/tests/python_client/milvus_client/test_milvus_client_geometry.py b/tests/python_client/milvus_client/test_milvus_client_geometry.py index 8fb05132670bd..a0f51d2ea2592 100644 --- a/tests/python_client/milvus_client/test_milvus_client_geometry.py +++ b/tests/python_client/milvus_client/test_milvus_client_geometry.py @@ -3096,6 +3096,193 @@ def test_st_dwithin_edge_cases(self, with_geo_index): "Larger distances should return same or more results" ) + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("with_geo_index", [True, False]) + def test_st_isvalid_with_invalid_geometries(self, with_geo_index): + """ + target: test ST_ISVALID operator can detect invalid geometries + method: insert both valid and invalid geometries (self-intersecting polygons), use ST_ISVALID to filter + expected: ST_ISVALID returns only valid geometries, NOT ST_ISVALID returns invalid geometries + """ + client = self._client() + collection_name = cf.gen_collection_name_by_testcase_name() + + # Create collection with geometry field + schema, _ = self.create_schema(client, auto_id=False) + schema.add_field("id", DataType.INT64, is_primary=True) + schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim) + schema.add_field("geo", DataType.GEOMETRY) + + self.create_collection(client, collection_name, schema=schema) + + # Valid geometries + valid_geometries = [ + ("POINT (10 20)", True), + ("LINESTRING (0 0, 10 10, 20 20)", True), + ("POLYGON ((0 0, 10 0, 10 10, 0 10, 0 0))", True), # Simple valid polygon + ] + + # Invalid geometries - self-intersecting polygons (bowtie/figure-8 shape) + invalid_geometries = [ + ("POLYGON ((0 0, 10 10, 10 0, 0 10, 0 0))", False), # Self-intersecting (bowtie) + ("POLYGON ((0 0, 20 10, 10 0, 20 0, 0 10, 0 0))", False), # Another self-intersecting + ] + + all_geometries = valid_geometries + invalid_geometries + + data = [] + for i, (geo_wkt, is_valid) in enumerate(all_geometries): + data.append({ + "id": i, + "vector": [random.random() for _ in range(default_dim)], + "geo": geo_wkt + }) + + self.insert(client, collection_name, data) + self.flush(client, collection_name) + + # Create indexes + index_params, _ = self.prepare_index_params(client) + index_params.add_index( + field_name="vector", index_type="IVF_FLAT", metric_type="L2", nlist=128 + ) + if with_geo_index: + index_params.add_index(field_name="geo", index_type="RTREE") + + self.create_index(client, collection_name, index_params=index_params) + self.load_collection(client, collection_name) + + # Test ST_ISVALID - should return only valid geometries + results_valid, _ = self.query( + client, + collection_name=collection_name, + filter="ST_ISVALID(geo)", + output_fields=["id", "geo"], + ) + + valid_ids = {i for i, (_, is_valid) in enumerate(all_geometries) if is_valid} + result_ids = {r["id"] for r in results_valid} + assert result_ids == valid_ids, ( + f"ST_ISVALID should return valid geometry IDs {valid_ids}, " + f"got {result_ids} (index={with_geo_index})" + ) + + # Test NOT ST_ISVALID - should return only invalid geometries + results_invalid, _ = self.query( + client, + collection_name=collection_name, + filter="not ST_ISVALID(geo)", + output_fields=["id", "geo"], + ) + + invalid_ids = {i for i, (_, is_valid) in enumerate(all_geometries) if not is_valid} + result_invalid_ids = {r["id"] for r in results_invalid} + assert result_invalid_ids == invalid_ids, ( + f"NOT ST_ISVALID should return invalid geometry IDs {invalid_ids}, " + f"got {result_invalid_ids} (index={with_geo_index})" + ) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("with_geo_index", [True, False]) + def test_st_isvalid_basic(self, with_geo_index): + """ + target: test ST_ISVALID operator basic functionality + method: insert valid WKT geometries, query using ST_ISVALID, test NOT ST_ISVALID and combined conditions + expected: ST_ISVALID returns true for all valid geometries, correct filtering with logical operators + """ + client = self._client() + collection_name = cf.gen_collection_name_by_testcase_name() + + # Create collection with geometry field and category + schema, _ = self.create_schema(client, auto_id=False) + schema.add_field("id", DataType.INT64, is_primary=True) + schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim) + schema.add_field("geo", DataType.GEOMETRY) + schema.add_field("category", DataType.INT64) + + self.create_collection(client, collection_name, schema=schema) + + # Insert valid geometry data of various types + valid_geometries = [ + "POINT (10.5 20.3)", + "LINESTRING (0 0, 10 10, 20 25)", + "POLYGON ((0 0, 10 0, 10 10, 0 10, 0 0))", + "MULTIPOINT ((0 0), (10 10), (20 20))", + "MULTILINESTRING ((0 0, 10 10), (20 20, 30 30))", + "MULTIPOLYGON (((0 0, 5 0, 5 5, 0 5, 0 0)), ((10 10, 15 10, 15 15, 10 15, 10 10)))", + "GEOMETRYCOLLECTION (POINT(10 10), LINESTRING(0 0, 5 5))", + ] + + data = [] + for i, geo_wkt in enumerate(valid_geometries): + data.append({ + "id": i, + "vector": [random.random() for _ in range(default_dim)], + "geo": geo_wkt, + "category": i % 2 + }) + + self.insert(client, collection_name, data) + self.flush(client, collection_name) + + # Create indexes + index_params, _ = self.prepare_index_params(client) + index_params.add_index( + field_name="vector", index_type="IVF_FLAT", metric_type="L2", nlist=128 + ) + if with_geo_index: + index_params.add_index(field_name="geo", index_type="RTREE") + + self.create_index(client, collection_name, index_params=index_params) + self.load_collection(client, collection_name) + + # Test 1: ST_ISVALID returns all valid geometries + results, _ = self.query( + client, + collection_name=collection_name, + filter="ST_ISVALID(geo)", + output_fields=["id", "geo"], + ) + assert len(results) == len(valid_geometries), ( + f"ST_ISVALID should return all {len(valid_geometries)} valid geometries, " + f"got {len(results)} (index={with_geo_index})" + ) + + # Test 2: NOT ST_ISVALID returns empty for all valid geometries + results_not, _ = self.query( + client, + collection_name=collection_name, + filter="not ST_ISVALID(geo)", + output_fields=["id", "geo"], + ) + assert len(results_not) == 0, ( + f"NOT ST_ISVALID should return 0 results, got {len(results_not)} (index={with_geo_index})" + ) + + # Test 3: ST_ISVALID combined with other condition + results_and, _ = self.query( + client, + collection_name=collection_name, + filter="ST_ISVALID(geo) and category == 1", + output_fields=["id", "geo", "category"], + ) + expected_category_1 = {i for i in range(len(valid_geometries)) if i % 2 == 1} + result_ids = {r["id"] for r in results_and} + assert result_ids == expected_category_1, ( + f"ST_ISVALID AND category==1 should return IDs {expected_category_1}, " + f"got {result_ids} (index={with_geo_index})" + ) + + # Test 4: Case insensitive (lowercase) + results_lower, _ = self.query( + client, + collection_name=collection_name, + filter="st_isvalid(geo)", + output_fields=["id"], + ) + assert len(results_lower) == len(valid_geometries), ( + f"st_isvalid (lowercase) should return all records (index={with_geo_index})" + ) @@ -3714,4 +3901,91 @@ def test_st_dwithin_invalid_base_data_coordinates(self): [invalid_data], check_task=CheckTasks.err_res, check_items=error_invalid_insert, - ) \ No newline at end of file + ) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("with_geo_index", [True, False]) + def test_st_isvalid_invalid_usage(self, with_geo_index): + """ + target: test ST_ISVALID error handling for invalid usage + method: test ST_ISVALID on non-geometry fields, with wrong parameters, and nonexistent fields + expected: should raise appropriate errors + """ + client = self._client() + collection_name = cf.gen_collection_name_by_testcase_name() + + # Create collection with geometry and non-geometry fields + schema, _ = self.create_schema(client, auto_id=False) + schema.add_field("id", DataType.INT64, is_primary=True) + schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim) + schema.add_field("geo", DataType.GEOMETRY) + schema.add_field("int_field", DataType.INT64) + + self.create_collection(client, collection_name, schema=schema) + + # Insert test data + data = [{ + "id": 0, + "vector": [random.random() for _ in range(default_dim)], + "geo": "POINT (10 20)", + "int_field": 100 + }] + + self.insert(client, collection_name, data) + self.flush(client, collection_name) + + # Create indexes + index_params, _ = self.prepare_index_params(client) + index_params.add_index( + field_name="vector", index_type="IVF_FLAT", metric_type="L2", nlist=128 + ) + if with_geo_index: + index_params.add_index(field_name="geo", index_type="RTREE") + + self.create_index(client, collection_name, index_params=index_params) + self.load_collection(client, collection_name) + + error = { + ct.err_code: 1100, + ct.err_msg: "failed to create query plan: cannot parse expression", + } + + # Test 1: ST_ISVALID on non-geometry field + self.query( + client, + collection_name=collection_name, + filter="ST_ISVALID(int_field)", + output_fields=["id"], + check_task=CheckTasks.err_res, + check_items=error, + ) + + # Test 2: ST_ISVALID with no parameters + self.query( + client, + collection_name=collection_name, + filter="ST_ISVALID()", + output_fields=["id", "geo"], + check_task=CheckTasks.err_res, + check_items=error, + ) + + # Test 3: ST_ISVALID with too many parameters + self.query( + client, + collection_name=collection_name, + filter="ST_ISVALID(geo, 1)", + output_fields=["id", "geo"], + check_task=CheckTasks.err_res, + check_items=error, + ) + + # Test 4: ST_ISVALID with non-existent field + self.query( + client, + collection_name=collection_name, + filter="ST_ISVALID(nonexistent_geo_field)", + output_fields=["id", "geo"], + check_task=CheckTasks.err_res, + check_items=error, + ) \ No newline at end of file