|
84 | 84 | import static org.openmetadata.service.util.TestUtils.assertResponseContains; |
85 | 85 | import static org.openmetadata.service.util.TestUtils.validateEntityReference; |
86 | 86 |
|
87 | | -import com.fasterxml.jackson.databind.JsonNode; |
88 | | -import com.github.fge.jsonpatch.diff.JsonDiff; |
89 | 87 | import com.google.common.collect.Lists; |
90 | 88 | import es.org.elasticsearch.client.Request; |
91 | 89 | import es.org.elasticsearch.client.Response; |
92 | 90 | import es.org.elasticsearch.client.RestClient; |
93 | 91 | import jakarta.ws.rs.client.WebTarget; |
94 | | -import jakarta.ws.rs.core.MediaType; |
95 | 92 | import jakarta.ws.rs.core.Response.Status; |
96 | 93 | import java.io.IOException; |
97 | 94 | import java.net.URLEncoder; |
|
109 | 106 | import java.util.Optional; |
110 | 107 | import java.util.Set; |
111 | 108 | import java.util.UUID; |
112 | | -import java.util.concurrent.CountDownLatch; |
113 | | -import java.util.concurrent.TimeUnit; |
114 | | -import java.util.concurrent.atomic.AtomicReference; |
115 | 109 | import java.util.function.Predicate; |
116 | 110 | import java.util.stream.Collectors; |
117 | 111 | import lombok.SneakyThrows; |
|
223 | 217 | import org.openmetadata.service.resources.tags.TagResourceTest; |
224 | 218 | import org.openmetadata.service.resources.teams.TeamResourceTest; |
225 | 219 | import org.openmetadata.service.resources.teams.UserResourceTest; |
226 | | -import org.openmetadata.service.security.SecurityUtil; |
227 | | -import org.openmetadata.service.util.EntityETag; |
228 | 220 | import org.openmetadata.service.util.EntityUtil; |
229 | 221 | import org.openmetadata.service.util.EntityUtil.Fields; |
230 | 222 | import org.openmetadata.service.util.FullyQualifiedName; |
@@ -4552,233 +4544,6 @@ void test_updateColumn_dataStewardCanUpdateDescriptionAndTags(TestInfo test) thr |
4552 | 4544 | assertEquals("PersonalData.Personal", updatedColumn.getTags().get(0).getTagFQN()); |
4553 | 4545 | } |
4554 | 4546 |
|
4555 | | - @Test |
4556 | | - @Execution(ExecutionMode.CONCURRENT) |
4557 | | - void test_concurrentColumnUpdates_reproduceDataLoss(TestInfo test) throws Exception { |
4558 | | - // This test verifies that ETag-based optimistic locking prevents concurrent updates |
4559 | | - // from silently overwriting each other's changes. Two different users try to update |
4560 | | - // the same entity simultaneously using the SAME stale ETag: |
4561 | | - // - One request succeeds (whoever gets to the server first) |
4562 | | - // - The other request fails with 412 Precondition Failed due to stale ETag |
4563 | | - // This prevents the "lost update" problem where one user's changes silently overwrite |
4564 | | - // another's. |
4565 | | - |
4566 | | - Table table = createAndCheckEntity(createRequest(test), ADMIN_AUTH_HEADERS); |
4567 | | - |
4568 | | - // Get the table with ETag - both threads will use this SAME ETag (simulating stale reads) |
4569 | | - WebTarget getTarget = getResource(table.getId()).queryParam("fields", "columns,tags"); |
4570 | | - jakarta.ws.rs.core.Response getResponse = |
4571 | | - SecurityUtil.addHeaders(getTarget, ADMIN_AUTH_HEADERS).get(); |
4572 | | - String baseETag = getResponse.getHeaderString(EntityETag.ETAG_HEADER); |
4573 | | - Table baseTableState = getResponse.readEntity(Table.class); |
4574 | | - String baseTableJson = JsonUtils.pojoToJson(baseTableState); |
4575 | | - Double baseVersion = baseTableState.getVersion(); |
4576 | | - LOG.info("Base table version: {}, ETag: {}", baseVersion, baseETag); |
4577 | | - |
4578 | | - // Set up for truly concurrent updates from different users |
4579 | | - CountDownLatch startLatch = new CountDownLatch(1); |
4580 | | - CountDownLatch completionLatch = new CountDownLatch(2); |
4581 | | - AtomicReference<Integer> statusA = new AtomicReference<>(); |
4582 | | - AtomicReference<Integer> statusB = new AtomicReference<>(); |
4583 | | - AtomicReference<Table> resultA = new AtomicReference<>(); |
4584 | | - AtomicReference<Table> resultB = new AtomicReference<>(); |
4585 | | - AtomicReference<Exception> errorRef = new AtomicReference<>(); |
4586 | | - |
4587 | | - // Prepare auth headers for two different users |
4588 | | - Map<String, String> user1Headers = authHeaders(USER1.getName()); |
4589 | | - Map<String, String> user2Headers = authHeaders(USER2.getName()); |
4590 | | - |
4591 | | - // Request A (User1): Add description to a column |
4592 | | - Thread threadA = |
4593 | | - new Thread( |
4594 | | - () -> { |
4595 | | - try { |
4596 | | - startLatch.await(); |
4597 | | - |
4598 | | - Table tableForA = JsonUtils.readValue(baseTableJson, Table.class); |
4599 | | - |
4600 | | - // Add description to column at index 2 |
4601 | | - if (tableForA.getColumns() != null && tableForA.getColumns().size() > 2) { |
4602 | | - Column eventIdColumn = tableForA.getColumns().get(2); |
4603 | | - eventIdColumn.setDescription( |
4604 | | - "Unique identifier for the event, used to capture and track changes affecting the customer-address relationship."); |
4605 | | - } |
4606 | | - |
4607 | | - // Compute JSON patch |
4608 | | - String updatedJson = JsonUtils.pojoToJson(tableForA); |
4609 | | - JsonNode patch = |
4610 | | - JsonDiff.asJson( |
4611 | | - JsonUtils.getObjectMapper().readTree(baseTableJson), |
4612 | | - JsonUtils.getObjectMapper().readTree(updatedJson)); |
4613 | | - |
4614 | | - // PATCH with ETag using User1's credentials |
4615 | | - WebTarget patchTarget = getResource(tableForA.getId()); |
4616 | | - Map<String, String> headers = new HashMap<>(user1Headers); |
4617 | | - headers.put(EntityETag.IF_MATCH_HEADER, baseETag); |
4618 | | - |
4619 | | - jakarta.ws.rs.core.Response patchResponse = |
4620 | | - SecurityUtil.addHeaders(patchTarget, headers) |
4621 | | - .method( |
4622 | | - "PATCH", |
4623 | | - jakarta.ws.rs.client.Entity.entity( |
4624 | | - patch.toString(), MediaType.APPLICATION_JSON_PATCH_JSON_TYPE)); |
4625 | | - |
4626 | | - statusA.set(patchResponse.getStatus()); |
4627 | | - if (patchResponse.getStatus() == OK.getStatusCode()) { |
4628 | | - resultA.set(patchResponse.readEntity(Table.class)); |
4629 | | - LOG.info( |
4630 | | - "Request A (User1) succeeded with status: {}", patchResponse.getStatus()); |
4631 | | - } else { |
4632 | | - LOG.info( |
4633 | | - "Request A (User1) failed with status: {} (expected if B won the race)", |
4634 | | - patchResponse.getStatus()); |
4635 | | - } |
4636 | | - |
4637 | | - } catch (Exception e) { |
4638 | | - LOG.error("Request A failed with exception", e); |
4639 | | - errorRef.compareAndSet(null, e); |
4640 | | - } finally { |
4641 | | - completionLatch.countDown(); |
4642 | | - } |
4643 | | - }); |
4644 | | - |
4645 | | - // Request B (User2): Add tags to columns - using the SAME stale ETag |
4646 | | - Thread threadB = |
4647 | | - new Thread( |
4648 | | - () -> { |
4649 | | - try { |
4650 | | - startLatch.await(); |
4651 | | - |
4652 | | - Table tableForB = JsonUtils.readValue(baseTableJson, Table.class); |
4653 | | - |
4654 | | - // Add tags to table |
4655 | | - List<TagLabel> tableTags = new ArrayList<>(); |
4656 | | - tableTags.add(TIER2_TAG_LABEL); |
4657 | | - tableForB.setTags(tableTags); |
4658 | | - |
4659 | | - // Add tags to columns |
4660 | | - if (tableForB.getColumns() != null && tableForB.getColumns().size() >= 2) { |
4661 | | - Column col0 = tableForB.getColumns().get(0); |
4662 | | - List<TagLabel> col0Tags = new ArrayList<>(); |
4663 | | - col0Tags.add( |
4664 | | - new TagLabel() |
4665 | | - .withTagFQN("PersonalData.Personal") |
4666 | | - .withSource(TagLabel.TagSource.CLASSIFICATION)); |
4667 | | - col0Tags.add( |
4668 | | - new TagLabel() |
4669 | | - .withTagFQN("PII.Sensitive") |
4670 | | - .withSource(TagLabel.TagSource.CLASSIFICATION)); |
4671 | | - col0.setTags(col0Tags); |
4672 | | - |
4673 | | - Column col1 = tableForB.getColumns().get(1); |
4674 | | - List<TagLabel> col1Tags = new ArrayList<>(); |
4675 | | - col1Tags.add( |
4676 | | - new TagLabel() |
4677 | | - .withTagFQN("PII.Sensitive") |
4678 | | - .withSource(TagLabel.TagSource.CLASSIFICATION)); |
4679 | | - col1.setTags(col1Tags); |
4680 | | - } |
4681 | | - |
4682 | | - // Compute JSON patch from the SAME base state (stale) |
4683 | | - String updatedJson = JsonUtils.pojoToJson(tableForB); |
4684 | | - JsonNode patch = |
4685 | | - JsonDiff.asJson( |
4686 | | - JsonUtils.getObjectMapper().readTree(baseTableJson), |
4687 | | - JsonUtils.getObjectMapper().readTree(updatedJson)); |
4688 | | - |
4689 | | - // PATCH with the SAME stale ETag using User2's credentials |
4690 | | - WebTarget patchTarget = getResource(tableForB.getId()); |
4691 | | - Map<String, String> headers = new HashMap<>(user2Headers); |
4692 | | - headers.put(EntityETag.IF_MATCH_HEADER, baseETag); |
4693 | | - |
4694 | | - jakarta.ws.rs.core.Response patchResponse = |
4695 | | - SecurityUtil.addHeaders(patchTarget, headers) |
4696 | | - .method( |
4697 | | - "PATCH", |
4698 | | - jakarta.ws.rs.client.Entity.entity( |
4699 | | - patch.toString(), MediaType.APPLICATION_JSON_PATCH_JSON_TYPE)); |
4700 | | - |
4701 | | - statusB.set(patchResponse.getStatus()); |
4702 | | - if (patchResponse.getStatus() == OK.getStatusCode()) { |
4703 | | - resultB.set(patchResponse.readEntity(Table.class)); |
4704 | | - LOG.info( |
4705 | | - "Request B (User2) succeeded with status: {}", patchResponse.getStatus()); |
4706 | | - } else { |
4707 | | - LOG.info( |
4708 | | - "Request B (User2) failed with status: {} (expected if A won the race)", |
4709 | | - patchResponse.getStatus()); |
4710 | | - } |
4711 | | - |
4712 | | - } catch (Exception e) { |
4713 | | - LOG.error("Request B failed with exception", e); |
4714 | | - errorRef.compareAndSet(null, e); |
4715 | | - } finally { |
4716 | | - completionLatch.countDown(); |
4717 | | - } |
4718 | | - }); |
4719 | | - |
4720 | | - // Start both threads |
4721 | | - threadA.start(); |
4722 | | - threadB.start(); |
4723 | | - |
4724 | | - // Release both threads simultaneously to create true concurrency |
4725 | | - startLatch.countDown(); |
4726 | | - |
4727 | | - // Wait for completion |
4728 | | - assertTrue( |
4729 | | - completionLatch.await(30, TimeUnit.SECONDS), "Requests should complete within timeout"); |
4730 | | - |
4731 | | - // Check for unexpected exceptions |
4732 | | - if (errorRef.get() != null) { |
4733 | | - throw new AssertionError("Request execution failed with exception", errorRef.get()); |
4734 | | - } |
4735 | | - |
4736 | | - // Log the results |
4737 | | - LOG.info("Request A status: {}, Request B status: {}", statusA.get(), statusB.get()); |
4738 | | - |
4739 | | - // KEY ASSERTION: With ETags, exactly one request should succeed (200 OK) |
4740 | | - // and the other should fail with 412 Precondition Failed |
4741 | | - int successCount = 0; |
4742 | | - int preconditionFailedCount = 0; |
4743 | | - |
4744 | | - if (statusA.get() == OK.getStatusCode()) successCount++; |
4745 | | - if (statusB.get() == OK.getStatusCode()) successCount++; |
4746 | | - if (statusA.get() == 412) preconditionFailedCount++; |
4747 | | - if (statusB.get() == 412) preconditionFailedCount++; |
4748 | | - |
4749 | | - assertEquals( |
4750 | | - 1, successCount, "Exactly one request should succeed when both use the same stale ETag"); |
4751 | | - assertEquals( |
4752 | | - 1, preconditionFailedCount, "Exactly one request should fail with 412 Precondition Failed"); |
4753 | | - |
4754 | | - // Verify the winning request's changes are in the final state |
4755 | | - Table finalTable = getEntity(table.getId(), "columns,tags", ADMIN_AUTH_HEADERS); |
4756 | | - assertNotNull(finalTable.getColumns()); |
4757 | | - assertTrue(finalTable.getColumns().size() > 2); |
4758 | | - |
4759 | | - // Check which request won and verify its changes are present |
4760 | | - if (statusA.get() == OK.getStatusCode()) { |
4761 | | - // Request A won - verify column description is updated |
4762 | | - Column eventIdColumn = finalTable.getColumns().get(2); |
4763 | | - assertEquals( |
4764 | | - "Unique identifier for the event, used to capture and track changes affecting the customer-address relationship.", |
4765 | | - eventIdColumn.getDescription(), |
4766 | | - "Column description from winning Request A should be present"); |
4767 | | - LOG.info("Request A (User1) won the race - column description updated"); |
4768 | | - } else { |
4769 | | - // Request B won - verify tags are present |
4770 | | - assertNotNull(finalTable.getTags()); |
4771 | | - assertTrue( |
4772 | | - finalTable.getTags().stream() |
4773 | | - .anyMatch(tag -> tag.getTagFQN().equals(TIER2_TAG_LABEL.getTagFQN())), |
4774 | | - "Table should have Tier2 tag from winning Request B"); |
4775 | | - LOG.info("Request B (User2) won the race - tags updated"); |
4776 | | - } |
4777 | | - |
4778 | | - assertTrue(finalTable.getVersion() > table.getVersion(), "Version should be incremented"); |
4779 | | - LOG.info("Final table version: {}", finalTable.getVersion()); |
4780 | | - } |
4781 | | - |
4782 | 4547 | @Test |
4783 | 4548 | void test_updateColumn_dataConsumerCannotUpdateColumns(TestInfo test) throws IOException { |
4784 | 4549 | // Temporarily remove Organization's default roles to ensure USER3 has no permissions |
|
0 commit comments